import os, sys, json, subprocess import boto3 from urllib.parse import urlparse from flask import Flask, request, jsonify from flask_cors import CORS app = Flask(__name__, static_folder="../dist", static_url_path="") CORS(app) def parse_connection_string(conn_str, default_user='', default_pass=''): if '://' in conn_str: parsed = urlparse(conn_str) return { 'host': parsed.hostname, 'user': parsed.username or default_user, 'pass': parsed.password or default_pass, 'db': parsed.path.lstrip('/') if parsed.path else '' } else: return { 'host': conn_str, 'user': default_user, 'pass': default_pass, 'db': '' } @app.after_request def add_no_cache_headers(response): return response # ========================= # React # ========================= @app.route("/") def frontend(): return app.send_static_file("index.html") @app.route("/") def assets(path): return app.send_static_file(path) # ========================= # CONFIG # ========================= MIGRATORS = { "mysql_mysql": { "script": "mysql_migrator.py", "progress": "mysql_progress.json" }, "psql_psql": { "script": "psql_psql_migrator.py", "progress": "pg_progress.json" }, "psql_s3": { "script": "pg_s3_migrator.py", "progress": "psql_progress.json" }, "s3_s3": { "script": "s3_s3_migrator.py", "progress": "migration_progress.json" } } # ========================= # GET SCHEMAS # ========================= @app.route("/api/get_schemas", methods=["POST"]) def get_schemas(): data = request.json mtype = data.get("type") if mtype == "mysql_mysql": conn = parse_connection_string(data['host'], data['user'], data['pass']) cmd = f"mysql -h {conn['host']} -u {conn['user']} -p{conn['pass']} -e \"SHOW DATABASES;\"" try: res = subprocess.check_output(cmd, shell=True).decode().splitlines() ignore = ("Database","information_schema","mysql","performance_schema","sys") return jsonify({"schemas": [r for r in res if r and r not in ignore], "error": None}) except Exception as e: return jsonify({"schemas": [], "error": str(e)}) if mtype == "psql_psql": conn = parse_connection_string(data['host'], data['user'], data['pass']) cmd = ( f"PGPASSWORD='{conn['pass']}' psql -h {conn['host']} -U {conn['user']} -t -c " "\"SELECT schema_name FROM information_schema.schemata " "WHERE schema_name NOT IN ('information_schema','pg_catalog');\"" ) try: res = subprocess.check_output(cmd, shell=True).decode().splitlines() return jsonify({"schemas": [r.strip() for r in res if r.strip()], "error": None}) except Exception as e: return jsonify({"schemas": [], "error": str(e)}) return jsonify([]) # ========================= # GET TABLES # ========================= @app.route("/api/get_tables", methods=["POST"]) def get_tables(): data = request.json mtype = data.get("type") tables = [] if mtype == "mysql_mysql": conn = parse_connection_string(data['host'], data['user'], data['pass']) for db in data["schemas"]: cmd = f"mysql -h {conn['host']} -u {conn['user']} -p{conn['pass']} -D {db} -e \"SHOW TABLES;\"" try: res = subprocess.check_output(cmd, shell=True).decode().splitlines() tables += [f"{db}.{t}" for t in res if not t.startswith("Tables")] except: pass if mtype == "psql_psql": conn = parse_connection_string(data['host'], data['user'], data['pass']) for schema in data["schemas"]: cmd = ( f"PGPASSWORD='{conn['pass']}' psql -h {conn['host']} -U {conn['user']} -t -c " f"\"SELECT table_name FROM information_schema.tables WHERE table_schema='{schema}';\"" ) try: res = subprocess.check_output(cmd, shell=True).decode().splitlines() tables += [f"{schema}.{t.strip()}" for t in res if t.strip()] except: pass return jsonify({"tables": tables, "error": None}) # ========================= # LIST S3 BUCKETS # ========================= @app.route("/api/list_buckets", methods=["POST"]) def list_buckets(): data = request.json if not data: return jsonify({"success": False, "error": "No JSON data provided"}), 400 required_keys = ["AWS_SRC_ACCESS_KEY", "AWS_SRC_SECRET_KEY"] for key in required_keys: if key not in data or not data[key]: return jsonify({"success": False, "error": f"Missing or empty required field: {key}"}), 400 try: s3 = boto3.client( "s3", aws_access_key_id=data["AWS_SRC_ACCESS_KEY"], aws_secret_access_key=data["AWS_SRC_SECRET_KEY"], region_name=data.get("AWS_SRC_REGION", "us-east-1") ) return jsonify({"success": True, "buckets": [b["Name"] for b in s3.list_buckets()["Buckets"]]}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 400 # ========================= # MIGRATE # ========================= @app.route("/api/migrate", methods=["POST"]) def migrate(): data = request.json mtype = data.get("type") if mtype not in MIGRATORS: return jsonify({"error": "Invalid migrator type"}), 400 # Parse connection string for mysql_mysql if mtype == "mysql_mysql": if 'SRC_HOST' in data: conn = parse_connection_string(data['SRC_HOST'], data.get('SRC_USER', ''), data.get('SRC_PASS', '')) data['SRC_HOST'] = conn['host'] if conn['user']: data['SRC_USER'] = conn['user'] if conn['pass']: data['SRC_PASS'] = conn['pass'] if 'DEST_HOST' in data: conn = parse_connection_string(data['DEST_HOST'], data.get('DEST_USER', ''), data.get('DEST_PASS', '')) data['DEST_HOST'] = conn['host'] if conn['user']: data['DEST_USER'] = conn['user'] if conn['pass']: data['DEST_PASS'] = conn['pass'] # Parse connection string for psql_psql if mtype == "psql_psql": if 'DB_HOST' in data: conn = parse_connection_string(data['DB_HOST'], data.get('DB_USER', ''), data.get('DB_PASS', '')) data['DB_HOST'] = conn['host'] if conn['user']: data['DB_USER'] = conn['user'] if conn['pass']: data['DB_PASS'] = conn['pass'] if conn['db']: data['DB_NAME'] = conn['db'] if 'DEST_HOST' in data: conn = parse_connection_string(data['DEST_HOST'], data.get('DEST_USER', ''), data.get('DEST_PASS', '')) data['DEST_HOST'] = conn['host'] if conn['user']: data['DEST_USER'] = conn['user'] if conn['pass']: data['DEST_PASS'] = conn['pass'] if conn['db']: data['DEST_NAME'] = conn['db'] env = os.environ.copy() for k, v in data.items(): env[k] = ",".join(v) if isinstance(v, list) else str(v) progress_file = MIGRATORS[mtype]["progress"] with open(progress_file, "w") as f: json.dump({"percent": 0, "message": "بانتظار البدء...", "status": "idle"}, f) subprocess.Popen([sys.executable, MIGRATORS[mtype]["script"]], env=env) return jsonify({"status": "started", "type": mtype}) # ========================= # PROGRESS # ========================= @app.route("/api/progress/") def progress(mtype): if mtype not in MIGRATORS: return jsonify({"error": "Invalid type"}), 400 try: with open(MIGRATORS[mtype]["progress"]) as f: return jsonify(json.load(f)) except: return jsonify({"percent": 0, "message": "بانتظار البدء...", "status": "waiting"}) # ========================= # MAIN # ========================= if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.getenv("PORT", 8001)))