#!/usr/bin/env python3 """ Unified Migration API - Flask API wrapper Supports PostgreSQL, S3 to S3, and PostgreSQL to S3 migrations """ from flask import Flask, request, jsonify, send_from_directory, session from flask_cors import CORS import threading import uuid import os import json import socket import stat import time import atexit import signal import sys import secrets import hashlib import base64 from datetime import datetime, timedelta from pathlib import Path from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # تصحيح الاستيراد from backend.migrator import ( postgres_migrator, s3_to_s3_migrator, postgres_to_s3_migrator ) app = Flask(__name__, static_folder='../dist') app.secret_key = os.urandom(32) # مفتاح سري قوي للجلسات CORS(app, supports_credentials=True) # Enable CORS with credentials # ============================================================================ # إعدادات الأمان المحسنة # ============================================================================ # قاموس لتتبع المتغيرات المؤقتة في الذاكرة فقط (مشفر) secure_vault = {} # قاموس لتتبع جلسات المستخدمين user_sessions = {} # وقت انتهاء الصلاحية (10 دقائق فقط - أقصر للأمان) ENV_EXPIRY_MINUTES = 3600 # مفتاح التشفير الرئيسي (يتم إنشاؤه مرة واحدة) MASTER_KEY = Fernet.generate_key() cipher = Fernet(MASTER_KEY) # الحد الأقصى للمفاتيح لكل جلسة MAX_KEYS_PER_SESSION = 100 def get_session_id(): """الحصول على معرف الجلسة الحالية أو إنشاء جديد""" if 'session_id' not in session: session['session_id'] = secrets.token_urlsafe(32) session['created_at'] = datetime.now().isoformat() session['last_activity'] = datetime.now().isoformat() else: # تحديث وقت آخر نشاط session['last_activity'] = datetime.now().isoformat() return session['session_id'] def encrypt_value(value): """تشفير قيمة حساسة""" if isinstance(value, str): value = value.encode('utf-8') return cipher.encrypt(value).decode('utf-8') def decrypt_value(encrypted_value): """فك تشفير قيمة""" try: return cipher.decrypt(encrypted_value.encode('utf-8')).decode('utf-8') except Exception as e: print(f"🔐 Decryption error: {e}") return None def secure_store(key, value, session_id=None, migration_id=None): """تخزين آمن في الذاكرة مع حد أقصى للمفاتيح""" if session_id is None: session_id = get_session_id() # التحقق من حد المفاتيح if session_id in secure_vault and len(secure_vault[session_id]) >= MAX_KEYS_PER_SESSION: # حذف أقدم المفاتيح oldest_key = min(secure_vault[session_id].keys(), key=lambda k: secure_vault[session_id][k].get('created_at', '')) del secure_vault[session_id][oldest_key] encrypted_value = encrypt_value(value) if session_id not in secure_vault: secure_vault[session_id] = {} secure_vault[session_id][key] = { 'value': encrypted_value, 'created_at': datetime.now().isoformat(), 'expires_at': (datetime.now() + timedelta(minutes=ENV_EXPIRY_MINUTES)).isoformat(), 'migration_id': migration_id, 'access_count': 0 } return True def secure_retrieve(key, session_id=None): """استرجاع آمن من الذاكرة""" if session_id is None: session_id = get_session_id() if session_id not in secure_vault: return None if key not in secure_vault[session_id]: return None data = secure_vault[session_id][key] # التحقق من انتهاء الصلاحية try: expires_at = datetime.fromisoformat(data['expires_at']) if datetime.now() > expires_at: # حذف تلقائي للمتغيرات منتهية الصلاحية del secure_vault[session_id][key] return None except: # إذا كان هناك خطأ في تنسيق التاريخ، نحذف المفتاح احتياطياً if key in secure_vault[session_id]: del secure_vault[session_id][key] return None # تحديث عداد الوصول data['access_count'] += 1 data['last_accessed'] = datetime.now().isoformat() return decrypt_value(data['value']) def secure_delete(session_id=None, key=None, migration_id=None): """حذف آمن من الذاكرة""" if session_id is None: session_id = get_session_id() if session_id not in secure_vault: return 0 if key: # حذف مفتاح محدد if key in secure_vault[session_id]: # الكتابة فوق القيمة قبل الحذف (للأمان) secure_vault[session_id][key]['value'] = encrypt_value('DELETED') del secure_vault[session_id][key] return 1 elif migration_id: # حذف جميع مفاتيح ترحيل معين count = 0 keys_to_delete = [] for k, data in secure_vault[session_id].items(): if data.get('migration_id') == migration_id: keys_to_delete.append(k) count += 1 for k in keys_to_delete: # الكتابة فوق القيمة قبل الحذف secure_vault[session_id][k]['value'] = encrypt_value('DELETED') del secure_vault[session_id][k] return count else: # حذف كل شيء للجلسة count = len(secure_vault[session_id]) # الكتابة فوق جميع القيم قبل الحذف for k in list(secure_vault[session_id].keys()): secure_vault[session_id][k]['value'] = encrypt_value('DELETED') del secure_vault[session_id] return count return 0 def cleanup_expired_sessions(): """تنظيف الجلسات منتهية الصلاحية""" try: current_time = datetime.now() expired_sessions = [] for session_id, data in list(secure_vault.items()): # التحقق من جميع مفاتيح الجلسة expired_keys = [] for key, key_data in list(data.items()): try: expires_at = datetime.fromisoformat(key_data['expires_at']) if current_time > expires_at: expired_keys.append(key) except: # إذا كان التاريخ غير صالح، نعتبره منتهي الصلاحية expired_keys.append(key) # حذف المفاتيح منتهية الصلاحية for key in expired_keys: if key in data: # الكتابة فوق القيمة قبل الحذف data[key]['value'] = encrypt_value('EXPIRED') del data[key] # إذا لم يتبق شيء في الجلسة، ضعها في قائمة الحذف if len(data) == 0: expired_sessions.append(session_id) # حذف الجلسات الفارغة for session_id in expired_sessions: if session_id in secure_vault: del secure_vault[session_id] except Exception as e: print(f"⚠️ Cleanup error: {e}") def get_env_file_path(): """Get the path to the environment file with secure naming""" # إنشاء اسم ملف مشفر بناءً على التاريخ today = datetime.now().strftime('%Y%m%d') # داخل Docker if os.path.exists('/app'): base_path = '/app' else: base_path = str(Path(__file__).parent.parent) # إنشاء مجلد آمن إذا لم يكن موجوداً secure_dir = os.path.join(base_path, '.secure_env') try: os.makedirs(secure_dir, mode=0o700, exist_ok=True) except: # إذا فشل إنشاء المجلد، استخدم مجلد tmp secure_dir = '/tmp/secure_env' os.makedirs(secure_dir, mode=0o700, exist_ok=True) # اسم الملف مع تاريخ اليوم (يتغير يومياً) env_file = os.path.join(secure_dir, f'env_{today}.enc') return env_file def secure_file_write(data): """كتابة آمنة مشفرة للملف""" env_file = get_env_file_path() try: # تشفير البيانات قبل الكتابة json_data = json.dumps(data) encrypted_data = cipher.encrypt(json_data.encode('utf-8')) # كتابة الملف بصلاحيات آمنة with open(env_file, 'wb') as f: f.write(encrypted_data) # تعيين صلاحيات صارمة (فقط المالك) try: os.chmod(env_file, stat.S_IRUSR | stat.S_IWUSR) except: pass return True except Exception as e: print(f"⚠️ Secure file write error: {e}") return False def secure_file_read(): """قراءة آمنة مشفرة من الملف""" env_file = get_env_file_path() if not os.path.exists(env_file): return {} try: with open(env_file, 'rb') as f: encrypted_data = f.read() # فك التشفير decrypted_data = cipher.decrypt(encrypted_data) data = json.loads(decrypted_data.decode('utf-8')) return data except Exception as e: print(f"⚠️ Secure file read error: {e}") return {} def rotate_env_file(): """تدوير ملف البيئة - إنشاء ملف جديد وحذف القديم""" env_file = get_env_file_path() if os.path.exists(env_file): try: # قراءة البيانات القديمة data = secure_file_read() # حذف الملف القديم os.remove(env_file) # كتابة البيانات في ملف جديد secure_file_write(data) return True except Exception as e: print(f"⚠️ Rotate env file error: {e}") return False return False def wipe_all_secure_data(): """مسح جميع البيانات الآمنة نهائياً""" global secure_vault # مسح الذاكرة count = len(secure_vault) # الكتابة فوق البيانات قبل المسح for session_id in list(secure_vault.keys()): for key in list(secure_vault[session_id].keys()): secure_vault[session_id][key]['value'] = encrypt_value('WIPED') secure_vault.clear() # مسح ملفات البيئة try: env_file = get_env_file_path() if os.path.exists(env_file): # الكتابة فوق الملف ببيانات عشوائية قبل الحذف (للأمان) with open(env_file, 'wb') as f: f.write(os.urandom(1024)) os.remove(env_file) except: pass return count # ============================================================================ # تنظيف عند الإغلاق # ============================================================================ def cleanup_on_exit(): """تنظيف المتغيرات عند إغلاق التطبيق""" print("\n🛑 Shutting down API server...") # مسح جميع البيانات الآمنة count = wipe_all_secure_data() print(f"🧹 Securely wiped {count} sessions") # تسجيل دالة التنظيف atexit.register(cleanup_on_exit) # معالجة إشارات النظام def signal_handler(sig, frame): print(f"\n⚠️ Received signal {sig}") cleanup_on_exit() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # ============================================================================ # Middleware للأمان # ============================================================================ @app.before_request def before_request(): """تنفيذ قبل كل طلب""" # تنظيف الجلسات منتهية الصلاحية cleanup_expired_sessions() # تنظيف الجلسات القديمة من Flask if 'session_id' in session: last_activity = session.get('last_activity') if last_activity: try: last_time = datetime.fromisoformat(last_activity) if datetime.now() - last_time > timedelta(hours=1): # جلسة منتهية - أكثر من ساعة session.clear() except: pass @app.after_request def add_security_headers(response): """إضافة رؤوس أمان للاستجابة""" response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' return response # ============================================================================ # Helper Functions for Connection Diagnostics # ============================================================================ def diagnose_connection(host, port): """تشخيص مشكلة الاتصال بشكل دقيق""" diagnostic = { 'success': False, 'reason': None, 'details': {} } if not host: diagnostic['reason'] = 'MISSING_HOST' diagnostic['details']['message'] = 'Host address is missing' return diagnostic try: ip = socket.gethostbyname(host) diagnostic['details']['ip'] = ip diagnostic['details']['dns_resolved'] = True except socket.gaierror: diagnostic['reason'] = 'INVALID_HOST' diagnostic['details']['message'] = f'Cannot resolve hostname: {host}' diagnostic['details']['dns_resolved'] = False return diagnostic try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(3) result = sock.connect_ex((host, port)) sock.close() if result != 0: diagnostic['reason'] = 'PORT_CLOSED' diagnostic['details']['message'] = f'Port {port} is closed or not accessible' diagnostic['details']['port'] = port diagnostic['details']['error_code'] = result return diagnostic else: diagnostic['details']['port_open'] = True except Exception as e: diagnostic['reason'] = 'PORT_CHECK_FAILED' diagnostic['details']['message'] = f'Error checking port: {str(e)}' return diagnostic diagnostic['success'] = True diagnostic['details']['message'] = 'Host and port are accessible' return diagnostic def analyze_postgres_error(e, host, user, database): """تحليل خطأ PostgreSQL""" error_str = str(e).lower() if "authentication failed" in error_str or "password" in error_str: return { 'reason': 'WRONG_PASSWORD', 'message': f'Wrong password for user "{user}"', 'details': {'user': user} } elif "does not exist" in error_str and "database" in error_str: return { 'reason': 'UNKNOWN_DATABASE', 'message': f'Database "{database}" does not exist', 'details': {'database': database} } elif "timeout" in error_str: return { 'reason': 'CONNECTION_TIMEOUT', 'message': f'Connection timeout to {host}', 'details': {'host': host} } elif "refused" in error_str: return { 'reason': 'CONNECTION_REFUSED', 'message': f'Connection refused by {host}', 'details': {'host': host} } else: return { 'reason': 'UNKNOWN_ERROR', 'message': str(e), 'details': {} } def analyze_mysql_error(e, host, user, database): """تحليل خطأ MySQL""" error_str = str(e).lower() if "access denied" in error_str: if "using password: yes" in error_str: return { 'reason': 'WRONG_PASSWORD', 'message': f'Wrong password for user "{user}"', 'details': {'user': user} } elif "using password: no" in error_str: return { 'reason': 'MISSING_PASSWORD', 'message': f'Password required for user "{user}" but not provided', 'details': {'user': user} } else: return { 'reason': 'ACCESS_DENIED', 'message': f'Access denied for user "{user}"', 'details': {'user': user} } elif "unknown database" in error_str: return { 'reason': 'UNKNOWN_DATABASE', 'message': f'Database "{database}" does not exist', 'details': {'database': database} } elif "can't connect" in error_str: if "timeout" in error_str: return { 'reason': 'CONNECTION_TIMEOUT', 'message': f'Connection timeout to {host}', 'details': {'host': host} } else: return { 'reason': 'CONNECTION_REFUSED', 'message': f'Connection refused by {host}', 'details': {'host': host} } elif "host" in error_str and "not allowed" in error_str: return { 'reason': 'HOST_NOT_ALLOWED', 'message': f'Host is not allowed to connect to MySQL server', 'details': {'host': host} } else: return { 'reason': 'UNKNOWN_ERROR', 'message': str(e), 'details': {} } def analyze_s3_error(e, endpoint_url): """تحليل خطأ S3""" error_str = str(e).lower() if "403" in error_str or "access denied" in error_str: return { 'reason': 'ACCESS_DENIED', 'message': 'Access denied - check your credentials', 'details': {'endpoint': endpoint_url} } elif "404" in error_str or "not found" in error_str: return { 'reason': 'NOT_FOUND', 'message': 'Resource not found - check bucket name', 'details': {'endpoint': endpoint_url} } elif "timeout" in error_str: return { 'reason': 'CONNECTION_TIMEOUT', 'message': f'Connection timeout to endpoint', 'details': {'endpoint': endpoint_url} } elif "invalidaccesskeyid" in error_str: return { 'reason': 'INVALID_ACCESS_KEY', 'message': 'Invalid AWS Access Key ID', 'details': {} } elif "signaturedoesnotmatch" in error_str: return { 'reason': 'INVALID_SECRET_KEY', 'message': 'Invalid AWS Secret Access Key', 'details': {} } else: return { 'reason': 'UNKNOWN_ERROR', 'message': str(e), 'details': {} } # ============================================================================ # PostgreSQL Endpoints (مبسطة ومحسنة) # ============================================================================ @app.route('/api/postgres/test-connection', methods=['POST']) def api_test_postgres_connection(): """Test PostgreSQL connection with detailed error diagnosis""" data = request.json if not data: return jsonify({'success': False, 'error': 'No data provided'}) uri = data.get('uri', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) try: result = postgres_migrator.test_connection(uri) if not result.get('success', False): parsed = postgres_migrator.parse_postgres_uri(uri) if parsed: diagnostic = diagnose_connection(parsed['host'], parsed['port']) if not diagnostic['success']: result['diagnostic'] = diagnostic else: error_analysis = analyze_postgres_error( result.get('error', ''), parsed['host'], parsed['user'], parsed['database'] ) result['diagnostic'] = error_analysis return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'error': str(e), 'diagnostic': {'reason': 'UNKNOWN_ERROR', 'message': str(e)} }) @app.route('/api/postgres/get-schemas', methods=['POST']) def api_postgres_get_schemas(): """Get PostgreSQL schemas""" data = request.json uri = data.get('uri', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) result = postgres_migrator.get_schemas(uri) return jsonify(result) @app.route('/api/postgres/get-tables', methods=['POST']) def api_postgres_get_tables(): """Get PostgreSQL tables""" data = request.json uri = data.get('uri', '') schema = data.get('schema', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) result = postgres_migrator.get_tables(uri, schema) return jsonify(result) @app.route('/api/postgres/get-table-counts', methods=['POST']) def api_postgres_get_table_counts(): """Get PostgreSQL table counts""" data = request.json uri = data.get('uri', '') schema = data.get('schema', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) result = postgres_migrator.get_table_counts(uri, schema) return jsonify({'success': True, 'counts': result}) @app.route('/api/postgres/parse-uri', methods=['POST']) def api_postgres_parse_uri(): """Parse PostgreSQL URI""" data = request.json uri = data.get('uri', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) parsed = postgres_migrator.parse_postgres_uri(uri) env_vars = postgres_migrator.parse_uri_to_env(uri) if parsed: return jsonify({ 'success': True, 'parsed': parsed, 'environment_variables': env_vars }) else: return jsonify({'success': False, 'error': 'Failed to parse PostgreSQL URI'}) @app.route('/api/postgres/start-migration', methods=['POST']) def api_postgres_start_migration(): """Start PostgreSQL to PostgreSQL migration""" data = request.json source_uri = data.get('source_uri', '') dest_uri = data.get('dest_uri', '') schemas = data.get('schemas') tables = data.get('tables') if not source_uri or not dest_uri: return jsonify({'success': False, 'error': 'Source and destination URIs are required'}) migration_id = str(uuid.uuid4())[:8] postgres_migrator.start_migration_async( migration_id=migration_id, source_uri=source_uri, dest_uri=dest_uri, schemas=schemas, tables=tables ) return jsonify({ 'success': True, 'migration_id': migration_id, 'message': f'PostgreSQL migration {migration_id} started' }) # ============================================================================ # S3 to S3 Endpoints (مبسطة) # ============================================================================ @app.route('/api/s3-source/test-connection', methods=['POST']) def api_s3_source_test_connection(): """Test source S3 connection""" data = request.json access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') session_token = data.get('session_token') if not access_key_id or not secret_access_key: return jsonify({ 'success': False, 'error': 'AWS credentials are required', 'diagnostic': {'reason': 'MISSING_CREDENTIALS'} }) try: result = s3_to_s3_migrator.test_source_connection( access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url, session_token=session_token ) if not result.get('success', False): if 'endpoint' in str(result.get('error', '')).lower() and endpoint_url: host = endpoint_url.replace('https://', '').replace('http://', '').split('/')[0] diagnostic = diagnose_connection(host, 443) result['diagnostic'] = diagnostic else: error_analysis = analyze_s3_error(result.get('error', ''), endpoint_url) result['diagnostic'] = error_analysis return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'error': str(e), 'diagnostic': {'reason': 'UNKNOWN_ERROR', 'message': str(e)} }) @app.route('/api/s3-destination/test-connection', methods=['POST']) def api_s3_destination_test_connection(): """Test destination S3 connection""" data = request.json access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') session_token = data.get('session_token') if not access_key_id or not secret_access_key: return jsonify({ 'success': False, 'error': 'AWS credentials are required', 'diagnostic': {'reason': 'MISSING_CREDENTIALS'} }) try: result = s3_to_s3_migrator.test_destination_connection( access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url, session_token=session_token ) if not result.get('success', False): if 'endpoint' in str(result.get('error', '')).lower() and endpoint_url: host = endpoint_url.replace('https://', '').replace('http://', '').split('/')[0] diagnostic = diagnose_connection(host, 443) result['diagnostic'] = diagnostic else: error_analysis = analyze_s3_error(result.get('error', ''), endpoint_url) result['diagnostic'] = error_analysis return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'error': str(e), 'diagnostic': {'reason': 'UNKNOWN_ERROR', 'message': str(e)} }) @app.route('/api/s3-source/list-buckets', methods=['POST']) def api_s3_source_list_buckets(): """List source S3 buckets""" data = request.json access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') session_token = data.get('session_token') if not access_key_id or not secret_access_key: return jsonify({'success': False, 'error': 'AWS credentials are required'}) result = s3_to_s3_migrator.list_buckets( access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url, session_token=session_token, is_source=True ) return jsonify(result) @app.route('/api/s3-destination/list-buckets', methods=['POST']) def api_s3_destination_list_buckets(): """List destination S3 buckets""" data = request.json access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') session_token = data.get('session_token') if not access_key_id or not secret_access_key: return jsonify({'success': False, 'error': 'AWS credentials are required'}) result = s3_to_s3_migrator.list_buckets( access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url, session_token=session_token, is_source=False ) return jsonify(result) @app.route('/api/s3/list-objects', methods=['POST']) def api_s3_list_objects(): """List objects in S3 bucket""" data = request.json bucket = data.get('bucket', '') prefix = data.get('prefix', '') is_source = data.get('is_source', True) access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') session_token = data.get('session_token') max_keys = data.get('max_keys', 1000) if not bucket: return jsonify({'success': False, 'error': 'Bucket name is required'}) result = s3_to_s3_migrator.list_objects( bucket_name=bucket, prefix=prefix, max_keys=max_keys, is_source=is_source, access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url, session_token=session_token ) return jsonify(result) @app.route('/api/s3-destination/create-bucket', methods=['POST']) def api_s3_create_bucket(): """Create S3 bucket""" data = request.json bucket = data.get('bucket', '') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') session_token = data.get('session_token') if not bucket: return jsonify({'success': False, 'error': 'Bucket name is required'}) result = s3_to_s3_migrator.create_bucket( bucket_name=bucket, region=region, endpoint_url=endpoint_url, access_key_id=access_key_id, secret_access_key=secret_access_key, session_token=session_token ) return jsonify(result) @app.route('/api/s3/migrate-object', methods=['POST']) def api_s3_migrate_object(): """Migrate single S3 object""" data = request.json source_bucket = data.get('source_bucket', '') source_key = data.get('source_key', '') dest_bucket = data.get('dest_bucket', '') dest_key = data.get('dest_key', source_key) source_credentials = { 'access_key_id': data.get('source_access_key_id'), 'secret_access_key': data.get('source_secret_access_key'), 'region': data.get('source_region', 'us-east-1'), 'endpoint_url': data.get('source_endpoint_url'), 'session_token': data.get('source_session_token') } dest_credentials = { 'access_key_id': data.get('dest_access_key_id'), 'secret_access_key': data.get('dest_secret_access_key'), 'region': data.get('dest_region', 'us-east-1'), 'endpoint_url': data.get('dest_endpoint_url'), 'session_token': data.get('dest_session_token') } preserve_metadata = data.get('preserve_metadata', True) storage_class = data.get('storage_class', 'STANDARD') result = s3_to_s3_migrator.migrate_object( source_bucket=source_bucket, source_key=source_key, dest_bucket=dest_bucket, dest_key=dest_key, source_credentials=source_credentials, dest_credentials=dest_credentials, preserve_metadata=preserve_metadata, storage_class=storage_class ) return jsonify(result) @app.route('/api/s3/migrate-batch', methods=['POST']) def api_s3_migrate_batch(): """Migrate multiple S3 objects in batch""" data = request.json objects = data.get('objects', []) source_bucket = data.get('source_bucket', '') dest_bucket = data.get('dest_bucket', '') source_credentials = { 'access_key_id': data.get('source_access_key_id'), 'secret_access_key': data.get('source_secret_access_key'), 'region': data.get('source_region', 'us-east-1'), 'endpoint_url': data.get('source_endpoint_url'), 'session_token': data.get('source_session_token') } dest_credentials = { 'access_key_id': data.get('dest_access_key_id'), 'secret_access_key': data.get('dest_secret_access_key'), 'region': data.get('dest_region', 'us-east-1'), 'endpoint_url': data.get('dest_endpoint_url'), 'session_token': data.get('dest_session_token') } preserve_metadata = data.get('preserve_metadata', True) storage_class = data.get('storage_class', 'STANDARD') max_concurrent = data.get('max_concurrent', 5) result = s3_to_s3_migrator.migrate_objects_batch( objects=objects, source_bucket=source_bucket, dest_bucket=dest_bucket, source_credentials=source_credentials, dest_credentials=dest_credentials, preserve_metadata=preserve_metadata, storage_class=storage_class, max_concurrent=max_concurrent ) return jsonify(result) @app.route('/api/s3/start-migration', methods=['POST']) def api_s3_start_migration(): """Start full S3 to S3 migration""" data = request.json source_bucket = data.get('source_bucket', '') dest_bucket = data.get('dest_bucket', '') prefix = data.get('prefix', '') source_config = { 'access_key_id': data.get('source_access_key_id'), 'secret_access_key': data.get('source_secret_access_key'), 'region': data.get('source_region', 'us-east-1'), 'endpoint_url': data.get('source_endpoint_url'), 'session_token': data.get('source_session_token') } dest_config = { 'access_key_id': data.get('dest_access_key_id'), 'secret_access_key': data.get('dest_secret_access_key'), 'region': data.get('dest_region', 'us-east-1'), 'endpoint_url': data.get('dest_endpoint_url'), 'session_token': data.get('dest_session_token') } include_patterns = data.get('include_patterns') exclude_patterns = data.get('exclude_patterns') preserve_metadata = data.get('preserve_metadata', True) storage_class = data.get('storage_class', 'STANDARD') create_dest_bucket = data.get('create_dest_bucket', True) max_concurrent = data.get('max_concurrent', 5) migration_id = str(uuid.uuid4())[:8] # تخزين آمن للبيانات الحساسة (اختياري - يمكن تعطيله للإنتاج) try: session_id = get_session_id() for key, value in source_config.items(): if value: secure_store(f'SOURCE_{key}', value, session_id, migration_id) for key, value in dest_config.items(): if value: secure_store(f'DEST_{key}', value, session_id, migration_id) except: pass # تجاهل أخطاء التخزين الآمن s3_to_s3_migrator.start_migration( migration_id=migration_id, source_config=source_config, dest_config=dest_config, source_bucket=source_bucket, dest_bucket=dest_bucket, prefix=prefix, include_patterns=include_patterns, exclude_patterns=exclude_patterns, preserve_metadata=preserve_metadata, storage_class=storage_class, create_dest_bucket=create_dest_bucket, max_concurrent=max_concurrent ) return jsonify({ 'success': True, 'migration_id': migration_id, 'message': f'S3 to S3 migration {migration_id} started' }) # ============================================================================ # PostgreSQL to S3 Endpoints (مبسطة) # ============================================================================ @app.route('/api/postgres-s3/test-postgres-connection', methods=['POST']) def api_postgres_s3_test_postgres(): """Test PostgreSQL connection for PostgreSQL to S3 migrator""" data = request.json uri = data.get('uri', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) result = postgres_to_s3_migrator.test_postgres_connection(uri) return jsonify(result) @app.route('/api/postgres-s3/test-s3-connection', methods=['POST']) def api_postgres_s3_test_s3(): """Test S3 connection for PostgreSQL to S3 migrator""" data = request.json access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') if not access_key_id or not secret_access_key: return jsonify({'success': False, 'error': 'AWS credentials are required'}) result = postgres_to_s3_migrator.test_s3_connection( access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url ) return jsonify(result) @app.route('/api/postgres-s3/get-schemas', methods=['POST']) def api_postgres_s3_get_schemas(): """Get PostgreSQL schemas for PostgreSQL to S3 migrator""" data = request.json uri = data.get('uri', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) result = postgres_to_s3_migrator.get_schemas(uri) return jsonify(result) @app.route('/api/postgres-s3/get-tables', methods=['POST']) def api_postgres_s3_get_tables(): """Get PostgreSQL tables for PostgreSQL to S3 migrator""" data = request.json uri = data.get('uri', '') schema = data.get('schema', '') if not uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) result = postgres_to_s3_migrator.get_tables(uri, schema) return jsonify(result) @app.route('/api/postgres-s3/export-table', methods=['POST']) def api_postgres_s3_export_table(): """Export PostgreSQL table to S3""" data = request.json postgres_uri = data.get('postgres_uri', '') schema = data.get('schema', '') table = data.get('table', '') s3_bucket = data.get('s3_bucket', '') s3_key = data.get('s3_key', '') compress = data.get('compress', True) format = data.get('format', 'csv') access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') if not postgres_uri or not schema or not table: return jsonify({'success': False, 'error': 'PostgreSQL URI, schema and table are required'}) if not s3_bucket or not s3_key: return jsonify({'success': False, 'error': 'S3 bucket and key are required'}) result = postgres_to_s3_migrator.export_table_to_s3( postgres_uri=postgres_uri, schema=schema, table=table, s3_bucket=s3_bucket, s3_key=s3_key, compress=compress, format=format, access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url ) return jsonify(result) @app.route('/api/postgres-s3/start-migration', methods=['POST']) def api_postgres_s3_start_migration(): """Start PostgreSQL to S3 migration""" data = request.json postgres_uri = data.get('postgres_uri', '') s3_bucket = data.get('s3_bucket', '') s3_prefix = data.get('s3_prefix', '') schemas = data.get('schemas') tables = data.get('tables') compress = data.get('compress', True) format = data.get('format', 'csv') access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') if not postgres_uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) if not s3_bucket: return jsonify({'success': False, 'error': 'S3 bucket is required'}) if not access_key_id or not secret_access_key: return jsonify({'success': False, 'error': 'AWS credentials are required'}) migration_id = str(uuid.uuid4())[:8] # تخزين آمن (اختياري) try: session_id = get_session_id() s3_vars = { 'AWS_ACCESS_KEY_ID': access_key_id, 'AWS_SECRET_ACCESS_KEY': secret_access_key, 'AWS_REGION': region, 'AWS_ENDPOINT_URL': endpoint_url } for key, value in s3_vars.items(): if value: secure_store(key, value, session_id, migration_id) except: pass postgres_to_s3_migrator.start_migration_async( migration_id=migration_id, postgres_uri=postgres_uri, s3_bucket=s3_bucket, s3_prefix=s3_prefix, schemas=schemas, tables=tables, compress=compress, format=format, access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url ) return jsonify({ 'success': True, 'migration_id': migration_id, 'message': f'PostgreSQL to S3 migration {migration_id} started' }) # ============================================================================ # Common Endpoints (Migration Status) # ============================================================================ @app.route('/api/postgres/migration-status/', methods=['GET']) def api_postgres_migration_status(migration_id): """Get PostgreSQL migration status""" status = postgres_migrator.get_migration_status(migration_id) if not status: return jsonify({'success': False, 'error': f'Migration {migration_id} not found'}) return jsonify({'success': True, 'migration_id': migration_id, 'status': status}) @app.route('/api/postgres/list-migrations', methods=['GET']) def api_postgres_list_migrations(): """List PostgreSQL migrations""" migrations = postgres_migrator.list_migrations() return jsonify({'success': True, 'migrations': migrations, 'count': len(migrations)}) @app.route('/api/s3/migration-status/', methods=['GET']) def api_s3_migration_status(migration_id): """Get S3 migration status""" status = s3_to_s3_migrator.get_migration_status(migration_id) if not status: return jsonify({'success': False, 'error': f'Migration {migration_id} not found'}) return jsonify({'success': True, 'migration_id': migration_id, 'status': status}) @app.route('/api/s3/list-migrations', methods=['GET']) def api_s3_list_migrations(): """List S3 migrations""" migrations = s3_to_s3_migrator.list_migrations() return jsonify({'success': True, 'migrations': migrations, 'count': len(migrations)}) @app.route('/api/s3/cancel-migration/', methods=['POST']) def api_s3_cancel_migration(migration_id): """Cancel S3 migration""" result = s3_to_s3_migrator.cancel_migration(migration_id) # تنظيف المتغيرات المرتبطة بالترحيل try: session_id = get_session_id() deleted = secure_delete(session_id, migration_id=migration_id) if deleted > 0: result['security'] = f'Cleaned up {deleted} secure variables' except: pass return jsonify(result) @app.route('/api/postgres-s3/migration-status/', methods=['GET']) def api_postgres_s3_migration_status(migration_id): """Get PostgreSQL to S3 migration status""" status = postgres_to_s3_migrator.get_migration_status(migration_id) if not status: return jsonify({'success': False, 'error': f'Migration {migration_id} not found'}) return jsonify({'success': True, 'migration_id': migration_id, 'status': status}) @app.route('/api/postgres-s3/list-migrations', methods=['GET']) def api_postgres_s3_list_migrations(): """List PostgreSQL to S3 migrations""" migrations = postgres_to_s3_migrator.list_migrations() return jsonify({'success': True, 'migrations': migrations, 'count': len(migrations)}) # ============================================================================ # Utility Endpoints # ============================================================================ @app.route('/api/s3/parse-uri', methods=['POST']) def api_s3_parse_uri(): """Parse S3 URI""" data = request.json s3_uri = data.get('s3_uri', '') if not s3_uri: return jsonify({'success': False, 'error': 'S3 URI is required'}) result = s3_to_s3_migrator.parse_s3_uri(s3_uri) if result: return jsonify({'success': True, 'parsed': result}) else: return jsonify({'success': False, 'error': 'Failed to parse S3 URI'}) @app.route('/api/s3/generate-presigned-url', methods=['POST']) def api_s3_generate_presigned_url(): """Generate presigned URL for S3 object""" data = request.json bucket = data.get('bucket', '') key = data.get('key', '') is_source = data.get('is_source', True) expiration = data.get('expiration', 3600) access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') session_token = data.get('session_token') if not bucket or not key: return jsonify({'success': False, 'error': 'Bucket and key are required'}) if not access_key_id or not secret_access_key: return jsonify({'success': False, 'error': 'AWS credentials are required'}) result = s3_to_s3_migrator.generate_presigned_url( bucket=bucket, key=key, expiration=expiration, is_source=is_source, access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url, session_token=session_token ) return jsonify(result) # ============================================================================ # Environment Variables Security Endpoints (مبسطة) # ============================================================================ # ============================================================================ # PostgreSQL Live Progress Tracking Endpoints # ============================================================================ @app.route('/api/postgres/live-progress/', methods=['GET']) def api_postgres_live_progress(migration_id): """Get live progress for PostgreSQL migration""" try: # استخدام دالة التتبع المباشر من PostgresMigrator progress = postgres_migrator.get_live_migration_progress(migration_id) return jsonify(progress) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/postgres/start-migration-with-progress', methods=['POST']) def api_postgres_start_migration_with_progress(): """Start PostgreSQL migration with live progress tracking""" data = request.json source_uri = data.get('source_uri', '') dest_uri = data.get('dest_uri', '') schemas = data.get('schemas') tables = data.get('tables') if not source_uri or not dest_uri: return jsonify({'success': False, 'error': 'Source and destination URIs are required'}) migration_id = str(uuid.uuid4())[:8] # تقدير حجم الترحيل قبل البدء size_estimate = postgres_migrator.estimate_migration_size(source_uri, schemas, tables) # بدء الترحيل مع التتبع المباشر postgres_migrator.migrate_tables_individually( migration_id=migration_id, source_uri=source_uri, dest_uri=dest_uri, schemas=schemas, tables=tables ) return jsonify({ 'success': True, 'migration_id': migration_id, 'message': f'PostgreSQL migration {migration_id} started with live tracking', 'estimate': size_estimate if size_estimate.get('success') else None }) @app.route('/api/postgres/format-status/', methods=['GET']) def api_postgres_format_status(migration_id): """Get formatted live status for PostgreSQL migration""" try: formatted_status = postgres_migrator.format_live_status(migration_id) return jsonify({ 'success': True, 'migration_id': migration_id, 'formatted_status': formatted_status }) except Exception as e: return jsonify({'success': False, 'error': str(e)}) # ============================================================================ # S3 to S3 Live Progress Tracking Endpoints # ============================================================================ @app.route('/api/s3/live-progress/', methods=['GET']) def api_s3_live_progress(migration_id): """Get live progress for S3 to S3 migration""" try: progress = s3_to_s3_migrator.calculate_migration_progress(migration_id) return jsonify(progress) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/s3/start-migration-with-progress', methods=['POST']) def api_s3_start_migration_with_progress(): """Start S3 to S3 migration with live progress tracking""" data = request.json source_bucket = data.get('source_bucket', '') dest_bucket = data.get('dest_bucket', '') prefix = data.get('prefix', '') source_config = { 'access_key_id': data.get('source_access_key_id'), 'secret_access_key': data.get('source_secret_access_key'), 'region': data.get('source_region', 'us-east-1'), 'endpoint_url': data.get('source_endpoint_url'), 'session_token': data.get('source_session_token') } dest_config = { 'access_key_id': data.get('dest_access_key_id'), 'secret_access_key': data.get('dest_secret_access_key'), 'region': data.get('dest_region', 'us-east-1'), 'endpoint_url': data.get('dest_endpoint_url'), 'session_token': data.get('dest_session_token') } include_patterns = data.get('include_patterns') exclude_patterns = data.get('exclude_patterns') preserve_metadata = data.get('preserve_metadata', True) storage_class = data.get('storage_class', 'STANDARD') create_dest_bucket = data.get('create_dest_bucket', True) max_concurrent = data.get('max_concurrent', 5) migration_id = str(uuid.uuid4())[:8] # الحصول على حجم المصدر قبل البدء للتقدير try: list_result = s3_to_s3_migrator.list_objects( bucket_name=source_bucket, prefix=prefix, max_keys=1000, is_source=True, **source_config ) if list_result.get('success'): total_objects = list_result.get('count', 0) total_size = list_result.get('total_size', 0) # تخزين التقديرات في الميجراتور with s3_to_s3_migrator._lock: if migration_id not in s3_to_s3_migrator.migrations: s3_to_s3_migrator.migrations[migration_id] = {} s3_to_s3_migrator.migrations[migration_id]['total_objects'] = total_objects s3_to_s3_migrator.migrations[migration_id]['total_size'] = total_size except: pass # بدء الترحيل s3_to_s3_migrator.start_migration( migration_id=migration_id, source_config=source_config, dest_config=dest_config, source_bucket=source_bucket, dest_bucket=dest_bucket, prefix=prefix, include_patterns=include_patterns, exclude_patterns=exclude_patterns, preserve_metadata=preserve_metadata, storage_class=storage_class, create_dest_bucket=create_dest_bucket, max_concurrent=max_concurrent ) return jsonify({ 'success': True, 'migration_id': migration_id, 'message': f'S3 to S3 migration {migration_id} started with live tracking' }) @app.route('/api/s3/format-status/', methods=['GET']) def api_s3_format_status(migration_id): """Get formatted live status for S3 migration""" try: progress = s3_to_s3_migrator.calculate_migration_progress(migration_id) if progress.get('success', True): # تنسيق الحالة بشكل مقروء if progress.get('status') == 'running': if progress.get('type') == 'live': status_lines = [ f"🚀 S3 Migration {migration_id} - {progress['status'].upper()}", f"📊 Progress: {progress['progress_bars']['size']}", f"📦 Objects: {progress['processed_objects']}/{progress['total_objects']} ({progress['objects_percentage']}%)", f"💾 Size: {progress['processed_size_formatted']}/{progress['total_size_formatted']} ({progress['size_percentage']}%)", f"⚡ Speed: {progress['current_speed_formatted']}", f"⏱️ Elapsed: {progress['elapsed_time_formatted']} | ETA: {progress['eta_formatted']}" ] if progress.get('current_object'): status_lines.append(f"📄 Current: {progress['current_object']}") formatted = '\n'.join(status_lines) else: formatted = f"⏳ Migration in progress (estimated {progress['progress_percentage']}%)" else: formatted = f"{'✅' if progress.get('success') else '❌'} Migration completed" return jsonify({ 'success': True, 'migration_id': migration_id, 'formatted_status': formatted, 'raw_progress': progress }) else: return jsonify(progress) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/s3/update-progress/', methods=['POST']) def api_s3_update_progress(migration_id): """Update progress for S3 migration (for custom tracking)""" data = request.json processed_objects = data.get('processed_objects', 1) processed_size = data.get('processed_size', 0) failed_objects = data.get('failed_objects', 0) failed_size = data.get('failed_size', 0) current_object = data.get('current_object') object_details = data.get('object_details') result = s3_to_s3_migrator.update_progress( migration_id=migration_id, processed_objects=processed_objects, processed_size=processed_size, failed_objects=failed_objects, failed_size=failed_size, current_object=current_object, object_details=object_details ) return jsonify({'success': result}) @app.route('/api/s3/speed-history/', methods=['GET']) def api_s3_speed_history(migration_id): """Get speed history for S3 migration""" history = s3_to_s3_migrator.get_migration_speed_history(migration_id) return jsonify(history) @app.route('/api/s3/estimate-time', methods=['POST']) def api_s3_estimate_time(): """Estimate transfer time based on size""" data = request.json total_size = data.get('total_size', 0) current_speed = data.get('current_speed') estimates = s3_to_s3_migrator.estimate_transfer_time(total_size, current_speed) return jsonify({ 'success': True, 'estimates': estimates }) # ============================================================================ # PostgreSQL to S3 Live Progress Tracking Endpoints # ============================================================================ @app.route('/api/postgres-s3/live-progress/', methods=['GET']) def api_postgres_s3_live_progress(migration_id): """Get live progress for PostgreSQL to S3 migration""" try: progress = postgres_to_s3_migrator.get_live_migration_progress(migration_id) return jsonify(progress) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/postgres-s3/start-migration-with-progress', methods=['POST']) def api_postgres_s3_start_migration_with_progress(): """Start PostgreSQL to S3 migration with live progress tracking""" data = request.json postgres_uri = data.get('postgres_uri', '') s3_bucket = data.get('s3_bucket', '') s3_prefix = data.get('s3_prefix', '') schemas = data.get('schemas') tables = data.get('tables') compress = data.get('compress', True) format = data.get('format', 'csv') access_key_id = data.get('access_key_id') secret_access_key = data.get('secret_access_key') region = data.get('region', 'us-east-1') endpoint_url = data.get('endpoint_url') if not postgres_uri: return jsonify({'success': False, 'error': 'PostgreSQL URI is required'}) if not s3_bucket: return jsonify({'success': False, 'error': 'S3 bucket is required'}) if not access_key_id or not secret_access_key: return jsonify({'success': False, 'error': 'AWS credentials are required'}) migration_id = str(uuid.uuid4())[:8] # بدء الترحيل مع التتبع المباشر postgres_to_s3_migrator.start_migration_with_progress( migration_id=migration_id, postgres_uri=postgres_uri, s3_bucket=s3_bucket, s3_prefix=s3_prefix, schemas=schemas, tables=tables, compress=compress, format=format, access_key_id=access_key_id, secret_access_key=secret_access_key, region=region, endpoint_url=endpoint_url ) return jsonify({ 'success': True, 'migration_id': migration_id, 'message': f'PostgreSQL to S3 migration {migration_id} started with live tracking' }) @app.route('/api/postgres-s3/format-status/', methods=['GET']) def api_postgres_s3_format_status(migration_id): """Get formatted live status for PostgreSQL to S3 migration""" try: formatted_status = postgres_to_s3_migrator.format_live_status(migration_id) return jsonify({ 'success': True, 'migration_id': migration_id, 'formatted_status': formatted_status }) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/postgres-s3/table-row-count', methods=['POST']) def api_postgres_s3_table_row_count(): """Get row count for a specific table""" data = request.json uri = data.get('uri', '') schema = data.get('schema', '') table = data.get('table', '') if not uri or not schema or not table: return jsonify({'success': False, 'error': 'URI, schema and table are required'}) count = postgres_to_s3_migrator._get_table_row_count(uri, schema, table) return jsonify({ 'success': True, 'count': count, 'formatted': f"{count:,}" if count else 'Unknown' }) # ============================================================================ # Unified Progress Dashboard Endpoints # ============================================================================ @app.route('/api/dashboard/active-migrations', methods=['GET']) def api_active_migrations(): """Get all active migrations across all migrators""" active_migrations = [] # PostgreSQL migrations pg_migrations = postgres_migrator.list_migrations() for mig in pg_migrations: if mig.get('status') == 'running': mig_data = mig.get('data', {}) progress = postgres_migrator.get_live_migration_progress(mig['id']) if hasattr(postgres_migrator, 'get_live_migration_progress') else None active_migrations.append({ 'id': mig['id'], 'type': 'postgres', 'source': mig_data.get('source'), 'destination': mig_data.get('destination'), 'started_at': mig_data.get('started_at'), 'progress': progress }) # S3 migrations s3_migrations = s3_to_s3_migrator.list_migrations() for mig in s3_migrations: if mig.get('status') == 'running': mig_data = mig.get('data', {}) progress = s3_to_s3_migrator.calculate_migration_progress(mig['id']) active_migrations.append({ 'id': mig['id'], 'type': 's3', 'source_bucket': mig_data.get('source_bucket'), 'dest_bucket': mig_data.get('dest_bucket'), 'started_at': mig_data.get('started_at'), 'progress': progress }) # PostgreSQL to S3 migrations pg_s3_migrations = postgres_to_s3_migrator.list_migrations() for mig in pg_s3_migrations: if mig.get('status') == 'running': mig_data = mig.get('data', {}) progress = postgres_to_s3_migrator.get_live_migration_progress(mig['id']) if hasattr(postgres_to_s3_migrator, 'get_live_migration_progress') else None active_migrations.append({ 'id': mig['id'], 'type': 'postgres-s3', 'postgres_uri': mig_data.get('postgres_uri'), 's3_bucket': mig_data.get('s3_bucket'), 'started_at': mig_data.get('started_at'), 'progress': progress }) return jsonify({ 'success': True, 'active_count': len(active_migrations), 'migrations': active_migrations }) @app.route('/api/dashboard/summary', methods=['GET']) def api_dashboard_summary(): """Get summary of all migrations""" pg_migrations = postgres_migrator.list_migrations() s3_migrations = s3_to_s3_migrator.list_migrations() pg_s3_migrations = postgres_to_s3_migrator.list_migrations() total = len(pg_migrations) + len(s3_migrations) + len(pg_s3_migrations) running = sum(1 for m in pg_migrations if m.get('status') == 'running') + \ sum(1 for m in s3_migrations if m.get('status') == 'running') + \ sum(1 for m in pg_s3_migrations if m.get('status') == 'running') completed = sum(1 for m in pg_migrations if m.get('status') == 'completed') + \ sum(1 for m in s3_migrations if m.get('status') == 'completed') + \ sum(1 for m in pg_s3_migrations if m.get('status') == 'completed') failed = sum(1 for m in pg_migrations if m.get('status') == 'failed') + \ sum(1 for m in s3_migrations if m.get('status') == 'failed') + \ sum(1 for m in pg_s3_migrations if m.get('status') == 'failed') return jsonify({ 'success': True, 'summary': { 'total': total, 'running': running, 'completed': completed, 'failed': failed, 'postgres_count': len(pg_migrations), 's3_count': len(s3_migrations), 'postgres_s3_count': len(pg_s3_migrations) } }) # ============================================================================ # WebSocket-like streaming endpoint (Server-Sent Events) # ============================================================================ @app.route('/api/stream-progress/', methods=['GET']) def stream_progress(migration_id): """Stream progress updates using Server-Sent Events""" # قراءة نوع الميجراتور من معاملات الطلب (خارج دالة المولد) migrator_type = request.args.get('type', 's3') def generate(): while True: if migrator_type == 's3': progress = s3_to_s3_migrator.calculate_migration_progress(migration_id) elif migrator_type == 'postgres': if hasattr(postgres_migrator, 'get_live_migration_progress'): progress = postgres_migrator.get_live_migration_progress(migration_id) else: progress = postgres_migrator.get_migration_status(migration_id) else: if hasattr(postgres_to_s3_migrator, 'get_live_migration_progress'): progress = postgres_to_s3_migrator.get_live_migration_progress(migration_id) else: progress = postgres_to_s3_migrator.get_migration_status(migration_id) # إضافة حقل type لتوحيد التنسيق if progress.get('status') == 'completed' or progress.get('success') == True: progress['type'] = 'completion' elif progress.get('status') == 'failed' or progress.get('error'): progress['type'] = 'error' else: progress['type'] = 'progress' yield f"data: {json.dumps(progress)}\n\n" if progress.get('status') not in ['running', 'in_progress']: break time.sleep(1) return app.response_class( generate(), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'} ) @app.route('/api/inject-env', methods=['POST']) def api_inject_env(): """Inject environment variables securely""" data = request.json env_vars = data.get('environment_variables', {}) migration_id = data.get('migration_id') if not env_vars: return jsonify({'success': False, 'error': 'No environment variables provided'}) try: # تخزين آمن session_id = get_session_id() stored_keys = [] for key, value in env_vars.items(): if value: secure_store(key, value, session_id, migration_id) stored_keys.append(key) return jsonify({ 'success': True, 'message': f'Securely stored {len(stored_keys)} environment variables', 'session_id': session_id[:8] + '...', 'expires_in_minutes': ENV_EXPIRY_MINUTES, 'stored_keys': stored_keys }) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/get-current-env', methods=['GET']) def api_get_current_env(): """Get current environment variables (masked)""" session_id = get_session_id() env_vars = {} if session_id in secure_vault: for key, data in secure_vault[session_id].items(): env_vars[key] = { 'exists': True, 'masked': True, 'expires_at': data['expires_at'], 'migration_id': data.get('migration_id') } return jsonify({ 'success': True, 'session_id': session_id[:8] + '...', 'environment_variables': env_vars, 'count': len(env_vars), 'expires_in_minutes': ENV_EXPIRY_MINUTES }) @app.route('/api/clear-session', methods=['POST']) def api_clear_session(): """Clear all environment variables for current session""" session_id = get_session_id() deleted = secure_delete(session_id) # إنهاء الجلسة session.clear() return jsonify({ 'success': True, 'message': f'Cleared {deleted} secure variables', 'session_ended': True }) @app.route('/api/clear-migration/', methods=['POST']) def api_clear_migration(migration_id): """Clear environment variables for a specific migration""" session_id = get_session_id() deleted = secure_delete(session_id, migration_id=migration_id) return jsonify({ 'success': True, 'message': f'Cleared {deleted} secure variables for migration {migration_id}' }) @app.route('/api/security-status', methods=['GET']) def api_security_status(): """Get security status of the system""" session_id = get_session_id() session_count = len(secure_vault) current_session_vars = len(secure_vault.get(session_id, {})) if session_id in secure_vault else 0 return jsonify({ 'success': True, 'security_status': { 'active_sessions': session_count, 'current_session_vars': current_session_vars, 'expiry_minutes': ENV_EXPIRY_MINUTES, 'encryption': 'AES-256 (Fernet)', 'auto_cleanup': True, 'session_isolation': True, 'current_session_id': session_id[:8] + '...' } }) # ============================================================================ # Health Check # ============================================================================ @app.route('/api/health', methods=['GET']) def health(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'service': 'Unified Migration API', 'version': '3.1.0', 'security_features': [ 'AES-256 encryption', 'Session isolation', 'Auto-cleanup after 10 minutes', 'Memory-only storage', 'On-shutdown secure wipe', 'Environment variable masking' ], 'features': [ 'PostgreSQL to PostgreSQL migration', 'S3 to S3 migration', 'PostgreSQL to S3 migration', 'Connection diagnostics', 'Secure environment management' ] }) # ============================================================================ # Static Files # ============================================================================ @app.route('/', defaults={'path': ''}) @app.route('/') def serve(path): if path != "" and os.path.exists(os.path.join(app.static_folder, path)): return send_from_directory(app.static_folder, path) else: return send_from_directory(app.static_folder, 'index.html') # ============================================================================ # Main # ============================================================================ if __name__ == '__main__': print("=" * 60) print("🔐 UNIFIED MIGRATION API - SECURE EDITION") print("=" * 60) # إنشاء مجلد آمن try: secure_dir = os.path.join(str(Path(__file__).parent.parent), '.secure_env') os.makedirs(secure_dir, mode=0o700, exist_ok=True) print(f"📁 Secure directory: {secure_dir}") except: print("📁 Using temporary directory for secure storage") # طباعة معلومات الأمان print(f"\n🛡️ SECURITY FEATURES ENABLED:") print(f" - AES-256 encryption for sensitive data") print(f" - Session-based isolation") print(f" - Auto-cleanup after {ENV_EXPIRY_MINUTES} minutes") print(f" - Memory-only encrypted storage") print(f" - On-shutdown secure wipe") print(f"\n🚀 Unified Migration API starting...") print(f"🌐 Access the UI at: http://localhost:8000") print(f"🔧 API endpoints available:") print(f" 📋 PostgreSQL: /api/postgres/*") print(f" 📋 S3 to S3: /api/s3/*") print(f" 📋 PG to S3: /api/postgres-s3/*") print(f" 🔐 Security: /api/security-status") print(f" /api/clear-session") print(f" /api/clear-migration/") # تشغيل التطبيق app.run(host='0.0.0.0', port=8000, debug=False)