الملفات
psqlmigrator/migrator.py

4500 أسطر
193 KiB
Python

#!/usr/bin/env python3
"""
migrator_unified.py - Unified Migration Engine
Combined migrator for PostgreSQL, S3 to S3, and PostgreSQL to S3 migrations
"""
import subprocess
import signal
import json
import os
import sys
import time
import threading
import hashlib
import io
import csv
import gzip
from datetime import datetime
from urllib.parse import urlparse
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError
# ============================================================================
# Part 1: PostgreSQL Migrator (PostgresMigrator)
# ============================================================================
class PostgresMigrator:
"""PostgreSQL to PostgreSQL migration engine"""
def __init__(self):
self.migrations = {}
self._lock = threading.Lock()
def parse_postgres_uri(self, uri):
"""Parse PostgreSQL URI and extract connection parameters"""
try:
if not uri.startswith(('postgresql://', 'postgres://')):
uri = 'postgresql://' + uri
parsed = urlparse(uri)
return {
'host': parsed.hostname,
'port': parsed.port or 5432,
'user': parsed.username or '',
'password': parsed.password or '',
'database': parsed.path.lstrip('/') if parsed.path else 'postgres',
'uri': uri
}
except Exception as e:
return None
def get_live_migration_progress(self, migration_id):
"""الحصول على التقدم المباشر للترحيل مع تفاصيل دقيقة"""
with self._lock:
if migration_id not in self.migrations:
return {'success': False, 'error': 'Migration not found'}
mig_data = self.migrations[migration_id]
# إذا كان الترحيل مكتملاً
if mig_data.get('status') != 'running':
return self._get_detailed_completed_progress(migration_id)
# إذا كان هناك متتبع تقدم نشط
if 'progress_tracker' in mig_data:
return self._get_live_tracker_progress(migration_id)
# إذا كان الترحيل قيد التشغيل ولكن بدون متتبع (تقديري)
return self._get_estimated_live_progress(migration_id)
def _get_live_tracker_progress(self, migration_id):
"""الحصول على التقدم من المتتبع المباشر"""
mig_data = self.migrations[migration_id]
tracker = mig_data['progress_tracker']
current_time = time.time()
elapsed_time = current_time - tracker['start_time']
# حساب النسب المئوية
tables_percentage = (tracker['processed_tables'] / tracker['total_tables'] * 100) if tracker['total_tables'] > 0 else 0
size_percentage = (tracker['processed_size'] / tracker['total_size'] * 100) if tracker['total_size'] > 0 else 0
rows_percentage = (tracker['processed_rows'] / tracker['total_rows'] * 100) if tracker['total_rows'] > 0 else 0
# حساب السرعة الحالية
recent_samples = [s for s in tracker.get('speed_samples', []) if current_time - s['time'] < 10]
if recent_samples:
recent_size = sum(s['size'] for s in recent_samples)
recent_time = recent_samples[-1]['time'] - recent_samples[0]['time'] if len(recent_samples) > 1 else 1
current_speed = recent_size / recent_time if recent_time > 0 else 0
else:
current_speed = tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0
# ETA بناءً على السرعة الحالية
remaining_size = tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0)
eta_seconds = remaining_size / current_speed if current_speed > 0 else 0
# تفاصيل الجدول الحالي
current_table_info = None
if tracker.get('current_table'):
current_table_info = {
'name': tracker['current_table'],
'size': tracker.get('current_table_size', 0),
'size_formatted': self._format_size(tracker.get('current_table_size', 0)),
'rows_total': tracker.get('current_table_rows', 0),
'rows_processed': tracker.get('current_table_rows_processed', 0),
'rows_percentage': round((tracker.get('current_table_rows_processed', 0) / tracker.get('current_table_rows', 1)) * 100, 2) if tracker.get('current_table_rows', 0) > 0 else 0,
'estimated_remaining': tracker.get('current_table_remaining_time', 0)
}
return {
'success': True,
'migration_id': migration_id,
'status': 'running',
'type': 'live',
'timestamp': current_time,
# إحصائيات عامة
'total': {
'tables': tracker['total_tables'],
'size': tracker['total_size'],
'size_formatted': self._format_size(tracker['total_size']),
'rows': tracker.get('total_rows', 0)
},
# إحصائيات منجزة
'processed': {
'tables': tracker['processed_tables'],
'size': tracker['processed_size'],
'size_formatted': self._format_size(tracker['processed_size']),
'rows': tracker.get('processed_rows', 0)
},
# إحصائيات فاشلة
'failed': {
'tables': tracker.get('failed_tables', 0),
'size': tracker.get('failed_size', 0),
'size_formatted': self._format_size(tracker.get('failed_size', 0)),
'rows': tracker.get('failed_rows', 0)
},
# إحصائيات متبقية
'remaining': {
'tables': tracker['total_tables'] - tracker['processed_tables'] - tracker.get('failed_tables', 0),
'size': tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0),
'size_formatted': self._format_size(tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0))
},
# النسب المئوية
'percentages': {
'tables': round(tables_percentage, 2),
'size': round(size_percentage, 2),
'rows': round(rows_percentage, 2)
},
# معلومات الوقت والسرعة
'time': {
'elapsed': elapsed_time,
'elapsed_formatted': self._format_time(elapsed_time),
'eta': eta_seconds,
'eta_formatted': self._format_time(eta_seconds),
'estimated_completion': current_time + eta_seconds if eta_seconds > 0 else None
},
'speed': {
'current': current_speed,
'current_formatted': f"{self._format_size(current_speed)}/s",
'average': tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0,
'average_formatted': f"{self._format_size(tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0)}/s",
'peak': max((s['size'] for s in tracker.get('speed_samples', [])), default=0),
'peak_formatted': self._format_size(max((s['size'] for s in tracker.get('speed_samples', [])), default=0))
},
# معلومات الجدول الحالي
'current_table': current_table_info,
# آخر 5 جداول منجزة
'recent_tables': tracker.get('tables_details', [])[-5:],
# أشرطة التقدم
'progress_bars': {
'tables': self._format_progress_bar(tables_percentage),
'size': self._format_progress_bar(size_percentage),
'rows': self._format_progress_bar(rows_percentage) if tracker.get('total_rows', 0) > 0 else None
}
}
def _get_estimated_live_progress(self, migration_id):
"""تقدير التقدم للترحيل الجاري بدون متتبع"""
mig_data = self.migrations[migration_id]
elapsed_time = time.time() - mig_data['started_at']
# محاولة الحصول على تقديرات أفضل
estimated_total_tables = mig_data.get('estimated_tables', 10)
estimated_total_size = mig_data.get('estimated_size', 100 * 1024 * 1024)
# تقدير التقدم بناءً على وقت pg_dump المعتاد
# pg_dump عادة يستغرق 1-5 دقائق لكل GB حسب السرعة
estimated_duration = estimated_total_size / (5 * 1024 * 1024) # 5 MB/s كتقدير
progress_percentage = min((elapsed_time / estimated_duration) * 100, 99) if estimated_duration > 0 else 0
tables_migrated = int(estimated_total_tables * progress_percentage / 100)
size_migrated = int(estimated_total_size * progress_percentage / 100)
return {
'success': True,
'migration_id': migration_id,
'status': 'running',
'type': 'estimated',
'timestamp': time.time(),
'estimated': True,
'total': {
'tables': estimated_total_tables,
'size': estimated_total_size,
'size_formatted': self._format_size(estimated_total_size)
},
'processed': {
'tables': tables_migrated,
'size': size_migrated,
'size_formatted': self._format_size(size_migrated)
},
'percentages': {
'tables': round(progress_percentage, 2),
'size': round(progress_percentage, 2)
},
'time': {
'elapsed': elapsed_time,
'elapsed_formatted': self._format_time(elapsed_time)
},
'progress_bars': {
'main': self._format_progress_bar(progress_percentage)
},
'note': 'This is an estimated progress. For accurate tracking, use migrate_tables_individually()'
}
def _get_detailed_completed_progress(self, migration_id):
"""الحصول على تفاصيل دقيقة للترحيل المكتمل"""
mig_data = self.migrations[migration_id]
if 'success' not in mig_data:
return {'success': False, 'error': 'Migration not completed'}
stats = mig_data.get('stats', {})
execution_time = mig_data.get('execution_time', 0)
# إذا كان هناك متتبع تقدم، استخدم بياناته
if 'progress_tracker' in mig_data:
tracker = mig_data['progress_tracker']
total_size = tracker.get('total_size', 0)
processed_size = tracker.get('processed_size', 0)
total_tables = tracker.get('total_tables', 0)
processed_tables = tracker.get('processed_tables', 0)
tables_details = tracker.get('tables_details', [])
else:
# استخدام التقديرات
total_tables = stats.get('tables_created', 0) + stats.get('tables_altered', 0)
processed_tables = stats.get('tables_created', 0)
estimated_size_per_table = 10 * 1024 * 1024
total_size = total_tables * estimated_size_per_table
processed_size = processed_tables * estimated_size_per_table
tables_details = []
return {
'success': True,
'migration_id': migration_id,
'status': 'completed' if mig_data.get('success') else 'failed',
'type': 'completed',
'timestamp': time.time(),
'total': {
'tables': total_tables,
'size': total_size,
'size_formatted': self._format_size(total_size)
},
'processed': {
'tables': processed_tables,
'size': processed_size,
'size_formatted': self._format_size(processed_size),
'tables_created': stats.get('tables_created', 0),
'tables_altered': stats.get('tables_altered', 0),
'indexes_created': stats.get('indexes_created', 0)
},
'percentages': {
'tables': 100.0 if processed_tables == total_tables else round((processed_tables / total_tables * 100) if total_tables > 0 else 0, 2),
'size': 100.0 if processed_size == total_size else round((processed_size / total_size * 100) if total_size > 0 else 0, 2)
},
'time': {
'total': execution_time,
'total_formatted': self._format_time(execution_time),
'average_speed': processed_size / execution_time if execution_time > 0 else 0,
'average_speed_formatted': f"{self._format_size(processed_size / execution_time if execution_time > 0 else 0)}/s"
},
'tables_details': tables_details[-10:], # آخر 10 جداول
'error': mig_data.get('error') if not mig_data.get('success') else None
}
def migrate_tables_individually(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
"""ترحيل الجداول بشكل فردي مع تتبع دقيق للتقدم"""
# تهيئة متتبع التقدم
self._init_detailed_tracker(migration_id, source_uri, schemas, tables)
def migration_task():
try:
result = self._run_individual_migration(migration_id, source_uri, dest_uri, schemas, tables)
with self._lock:
self.migrations[migration_id].update(result)
self.migrations[migration_id]['status'] = 'completed' if result['success'] else 'failed'
self.migrations[migration_id]['completed_at'] = time.time()
except Exception as e:
with self._lock:
self.migrations[migration_id]['status'] = 'failed'
self.migrations[migration_id]['error'] = str(e)
thread = threading.Thread(target=migration_task, daemon=True)
thread.start()
return migration_id
def _init_detailed_tracker(self, migration_id, source_uri, schemas=None, tables=None):
"""تهيئة متتبع تفصيلي مع بيانات دقيقة"""
# الحصول على قائمة الجداول وأحجامها
tables_list = []
total_size = 0
total_rows = 0
if tables:
for table in tables:
if '.' in table:
schema, name = table.split('.', 1)
else:
schema = 'public'
name = table
size = self._get_table_size(source_uri, schema, name) or 0
rows = self._get_table_row_count(source_uri, schema, name) or 0
tables_list.append({
'schema': schema,
'name': name,
'size': size,
'rows': rows
})
total_size += size
total_rows += rows
elif schemas:
for schema in schemas:
result = self.get_tables(source_uri, schema)
if result['success']:
for table_info in result['tables']:
size = self._get_table_size(source_uri, schema, table_info['name']) or 0
rows = self._get_table_row_count(source_uri, schema, table_info['name']) or 0
tables_list.append({
'schema': schema,
'name': table_info['name'],
'size': size,
'rows': rows
})
total_size += size
total_rows += rows
else:
result = self.get_tables(source_uri)
if result['success']:
for table_info in result['tables']:
size = self._get_table_size(source_uri, table_info['schema'], table_info['name']) or 0
rows = self._get_table_row_count(source_uri, table_info['schema'], table_info['name']) or 0
tables_list.append({
'schema': table_info['schema'],
'name': table_info['name'],
'size': size,
'rows': rows
})
total_size += size
total_rows += rows
with self._lock:
self.migrations[migration_id] = {
'status': 'running',
'started_at': time.time(),
'source': source_uri,
'destination': dest_uri,
'schemas': schemas,
'tables': tables,
'progress_tracker': {
'total_tables': len(tables_list),
'total_size': total_size,
'total_rows': total_rows,
'processed_tables': 0,
'processed_size': 0,
'processed_rows': 0,
'failed_tables': 0,
'failed_size': 0,
'failed_rows': 0,
'current_table': None,
'current_table_size': 0,
'current_table_rows': 0,
'current_table_rows_processed': 0,
'current_table_start_time': None,
'start_time': time.time(),
'last_update': time.time(),
'tables_list': tables_list,
'tables_details': [],
'speed_samples': []
}
}
def _run_individual_migration(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
"""تشغيل الترحيل بشكل فردي مع تتبع كل جدول"""
logs = []
successful_tables = []
failed_tables = []
with self._lock:
tracker = self.migrations[migration_id]['progress_tracker']
tables_list = tracker['tables_list']
for i, table_info in enumerate(tables_list, 1):
table_name = f"{table_info['schema']}.{table_info['name']}"
# تحديث معلومات الجدول الحالي
with self._lock:
tracker = self.migrations[migration_id]['progress_tracker']
tracker['current_table'] = table_name
tracker['current_table_size'] = table_info['size']
tracker['current_table_rows'] = table_info['rows']
tracker['current_table_rows_processed'] = 0
tracker['current_table_start_time'] = time.time()
print(f"[{migration_id}] 📊 Migrating table {i}/{len(tables_list)}: {table_name} ({self._format_size(table_info['size'])})")
# ترحيل الجدول مع تتبع الصفوف
table_result = self._migrate_table_with_row_tracking(
migration_id, source_uri, dest_uri,
table_info['schema'], table_info['name']
)
if table_result['success']:
# تحديث الإحصائيات
with self._lock:
tracker = self.migrations[migration_id]['progress_tracker']
tracker['processed_tables'] += 1
tracker['processed_size'] += table_info['size']
tracker['processed_rows'] += table_result.get('rows_migrated', 0)
# إضافة تفاصيل الجدول
tracker['tables_details'].append({
'name': table_name,
'size': table_info['size'],
'size_formatted': self._format_size(table_info['size']),
'rows': table_result.get('rows_migrated', 0),
'time': table_result.get('migration_time', 0),
'time_formatted': self._format_time(table_result.get('migration_time', 0)),
'speed': table_result.get('speed', 0),
'speed_formatted': f"{self._format_size(table_result.get('speed', 0))}/s",
'timestamp': time.time()
})
# إضافة عينة سرعة
tracker['speed_samples'].append({
'time': time.time(),
'size': table_info['size'],
'tables': 1
})
successful_tables.append(table_name)
print(f"[{migration_id}] ✅ Completed {table_name} in {self._format_time(table_result.get('migration_time', 0))}")
else:
# تحديث الإحصائيات الفاشلة
with self._lock:
tracker = self.migrations[migration_id]['progress_tracker']
tracker['failed_tables'] += 1
tracker['failed_size'] += table_info['size']
failed_tables.append({
'table': table_name,
'error': table_result.get('error')
})
print(f"[{migration_id}] ❌ Failed {table_name}: {table_result.get('error')}")
execution_time = time.time() - self.migrations[migration_id]['started_at']
return {
'success': len(failed_tables) == 0,
'migration_id': migration_id,
'execution_time': execution_time,
'stats': {
'tables_created': len(successful_tables),
'tables_failed': len(failed_tables),
'total_tables': len(tables_list),
'total_size': sum(t['size'] for t in tables_list)
},
'successful_tables': successful_tables,
'failed_tables': failed_tables,
'logs': logs
}
def _migrate_table_with_row_tracking(self, migration_id, source_uri, dest_uri, schema, table):
"""ترحيل جدول مع تتبع عدد الصفوف المنقولة"""
try:
start_time = time.time()
# الحصول على إجمالي الصفوف
total_rows = self._get_table_row_count(source_uri, schema, table) or 0
# استخدام COPY مع تتبع التقدم (يتطلب تعديل حسب الإمكانيات)
# هذا مثال مبسط - في الواقع قد تحتاج لاستخدام COPY مع callback
# تنفيذ أمر pg_dump للجدول الواحد
dump_cmd = [
'pg_dump', '--dbname', source_uri,
'-t', f'"{schema}"."{table}"',
'--data-only', '--inserts'
]
psql_cmd = ['psql', '--dbname', dest_uri]
full_cmd = ' '.join(dump_cmd) + ' | ' + ' '.join(psql_cmd)
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=300)
migration_time = time.time() - start_time
if result.returncode == 0:
# تقدير عدد الصفوف المنقولة (يمكن تحسينه)
rows_migrated = total_rows
return {
'success': True,
'rows_migrated': rows_migrated,
'migration_time': migration_time,
'speed': (table_size or 0) / migration_time if migration_time > 0 else 0
}
else:
return {
'success': False,
'error': result.stderr[:500]
}
except Exception as e:
return {'success': False, 'error': str(e)}
def _get_table_row_count(self, uri, schema, table):
"""الحصول على عدد صفوف الجدول"""
try:
parsed = self.parse_postgres_uri(uri)
if not parsed:
return None
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
sql = f'SELECT COUNT(*) FROM "{schema}"."{table}";'
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
count_line = result.stdout.strip()
if count_line and count_line.isdigit():
return int(count_line)
return None
except Exception:
return None
def format_live_status(self, migration_id):
"""تنسيق الحالة المباشرة كنص مقروء"""
progress = self.get_live_migration_progress(migration_id)
if not progress.get('success', True):
return f"❌ Error: {progress.get('error', 'Unknown error')}"
lines = []
lines.append("=" * 60)
lines.append(f"🚀 Migration {migration_id} - {progress['status'].upper()}")
lines.append("=" * 60)
if progress['type'] == 'estimated':
lines.append("⚠️ ESTIMATED PROGRESS (not real-time)")
# شريط التقدم الرئيسي
if 'progress_bars' in progress:
if 'size' in progress['progress_bars']:
lines.append(f"\n📊 Overall Progress: {progress['progress_bars']['size']}")
elif 'main' in progress['progress_bars']:
lines.append(f"\n📊 Progress: {progress['progress_bars']['main']}")
# إحصائيات الملفات
lines.append(f"\n📦 Tables:")
lines.append(f" Total: {progress['total']['tables']} tables")
lines.append(f" Processed: {progress['processed']['tables']} tables ({progress['percentages']['tables']}%)")
if progress.get('failed', {}).get('tables', 0) > 0:
lines.append(f" ❌ Failed: {progress['failed']['tables']} tables")
# إحصائيات الحجم
lines.append(f"\n💾 Size:")
lines.append(f" Total: {progress['total']['size_formatted']}")
lines.append(f" Transferred: {progress['processed']['size_formatted']} ({progress['percentages']['size']}%)")
if progress.get('remaining', {}).get('size', 0) > 0:
lines.append(f" Remaining: {progress['remaining']['size_formatted']}")
# معلومات السرعة والوقت
if 'time' in progress:
lines.append(f"\n⏱️ Time:")
lines.append(f" Elapsed: {progress['time']['elapsed_formatted']}")
if 'eta_formatted' in progress['time']:
lines.append(f" ETA: {progress['time']['eta_formatted']}")
if 'speed' in progress:
lines.append(f"\n⚡ Speed:")
lines.append(f" Current: {progress['speed']['current_formatted']}")
lines.append(f" Average: {progress['speed']['average_formatted']}")
if progress['speed']['peak'] > 0:
lines.append(f" Peak: {progress['speed']['peak_formatted']}")
# الجدول الحالي
if progress.get('current_table'):
ct = progress['current_table']
lines.append(f"\n📄 Current Table: {ct['name']}")
lines.append(f" Size: {ct['size_formatted']}")
if ct.get('rows_total', 0) > 0:
lines.append(f" Rows: {ct['rows_processed']}/{ct['rows_total']} ({ct['rows_percentage']}%)")
# آخر الجداول المنجزة
if progress.get('recent_tables'):
lines.append(f"\n✅ Recently Completed:")
for table in progress['recent_tables'][-3:]: # آخر 3 جداول
lines.append(f"{table['name']} - {table['size_formatted']} in {table.get('time_formatted', 'N/A')}")
lines.append("\n" + "=" * 60)
return '\n'.join(lines)
def calculate_migration_progress(self, migration_id):
"""حساب نسبة التقدم في الترحيل بناءً على عدد وحجم الجداول"""
with self._lock:
if migration_id not in self.migrations:
return {'success': False, 'error': 'Migration not found'}
mig_data = self.migrations[migration_id]
if mig_data.get('status') == 'running':
# حساب التقديرات للترحيل الجاري
return self._estimate_running_progress(migration_id)
else:
# إحصائيات للترحيل المكتمل
return self._get_completed_stats(migration_id)
def _estimate_running_progress(self, migration_id):
"""تقدير تقدم الترحيل الجاري"""
mig_data = self.migrations[migration_id]
elapsed_time = time.time() - mig_data['started_at']
# تقدير عدد الجداول (إذا كانت متوفرة)
estimated_total_tables = mig_data.get('estimated_tables', 10)
estimated_total_size = mig_data.get('estimated_size', 100 * 1024 * 1024) # 100 MB افتراضي
# حساب التقدم بناءً على الوقت المنقضي (تقدير تقريبي)
# في الواقع العملي، ستحتاج لمراقبة خرج pg_dump
progress_percentage = min(elapsed_time / 60 * 10, 95) # 10% لكل دقيقة حتى 95%
return {
'migration_id': migration_id,
'status': 'running',
'elapsed_time': elapsed_time,
'elapsed_time_formatted': self._format_time(elapsed_time),
'progress_percentage': round(progress_percentage, 2),
'estimated': True,
'estimated_total_tables': estimated_total_tables,
'estimated_total_size': estimated_total_size,
'estimated_total_size_formatted': self._format_size(estimated_total_size),
'tables_migrated': int(estimated_total_tables * progress_percentage / 100),
'size_migrated': int(estimated_total_size * progress_percentage / 100),
'size_migrated_formatted': self._format_size(int(estimated_total_size * progress_percentage / 100))
}
def _get_completed_stats(self, migration_id):
"""الحصول على إحصائيات الترحيل المكتمل"""
mig_data = self.migrations[migration_id]
if not isinstance(mig_data, dict) or 'success' not in mig_data:
return {'success': False, 'error': 'Migration not completed'}
execution_time = mig_data.get('execution_time', 0)
stats = mig_data.get('stats', {})
# حساب عدد الجداول المنقولة
tables_created = stats.get('tables_created', 0)
tables_altered = stats.get('tables_altered', 0)
indexes_created = stats.get('indexes_created', 0)
# تقدير حجم البيانات المنقولة (صعب حسابه بدقة مع pg_dump)
# نستخدم عدد الجداول كمؤشر تقريبي
estimated_size_per_table = 10 * 1024 * 1024 # 10 MB لكل جدول تقديري
estimated_total_size = tables_created * estimated_size_per_table
return {
'migration_id': migration_id,
'status': 'completed' if mig_data.get('success') else 'failed',
'success': mig_data.get('success', False),
'execution_time': execution_time,
'execution_time_formatted': self._format_time(execution_time),
'progress_percentage': 100.0 if mig_data.get('success') else 0,
'tables_created': tables_created,
'tables_altered': tables_altered,
'indexes_created': indexes_created,
'total_tables_affected': tables_created + tables_altered,
'estimated_size_migrated': estimated_total_size,
'estimated_size_migrated_formatted': self._format_size(estimated_total_size),
'error': mig_data.get('error') if not mig_data.get('success') else None
}
def track_table_migration(self, table_name, total_rows=None, rows_processed=None):
"""تتبع ترحيل جدول معين (للاستخدام مع ترحيل مخصص)"""
# هذه الدالة يمكن استخدامها إذا كنت تقوم بترحيل الجداول بشكل فردي
tracker_key = f"table_tracker_{table_name}"
if not hasattr(self, '_table_trackers'):
self._table_trackers = {}
if tracker_key not in self._table_trackers:
self._table_trackers[tracker_key] = {
'table_name': table_name,
'total_rows': total_rows,
'rows_processed': 0,
'start_time': time.time(),
'status': 'in_progress'
}
tracker = self._table_trackers[tracker_key]
if rows_processed is not None:
tracker['rows_processed'] = rows_processed
# حساب التقدم
if tracker['total_rows'] and tracker['total_rows'] > 0:
progress = (tracker['rows_processed'] / tracker['total_rows']) * 100
else:
progress = 0
elapsed_time = time.time() - tracker['start_time']
return {
'table_name': table_name,
'total_rows': tracker['total_rows'],
'rows_processed': tracker['rows_processed'],
'progress_percentage': round(progress, 2),
'elapsed_time': elapsed_time,
'elapsed_time_formatted': self._format_time(elapsed_time),
'status': tracker['status']
}
def estimate_migration_size(self, source_uri, schemas=None, tables=None):
"""تقدير حجم الترحيل قبل البدء"""
try:
total_size = 0
table_count = 0
table_details = []
if tables:
# حساب حجم الجداول المحددة
for table in tables:
if '.' in table:
schema, table_name = table.split('.', 1)
else:
schema = 'public'
table_name = table
# الحصول على حجم الجدول من PostgreSQL
size = self._get_table_size(source_uri, schema, table_name)
if size:
total_size += size
table_count += 1
table_details.append({
'table': f"{schema}.{table_name}",
'size': size,
'size_formatted': self._format_size(size)
})
elif schemas:
# حساب حجم جميع الجداول في المخططات المحددة
for schema in schemas:
tables_result = self.get_tables(source_uri, schema)
if tables_result['success']:
for table_info in tables_result['tables']:
size = self._get_table_size(source_uri, schema, table_info['name'])
if size:
total_size += size
table_count += 1
table_details.append({
'table': f"{schema}.{table_info['name']}",
'size': size,
'size_formatted': self._format_size(size)
})
else:
# حساب حجم جميع الجداول
tables_result = self.get_tables(source_uri)
if tables_result['success']:
for table_info in tables_result['tables']:
size = self._get_table_size(source_uri, table_info['schema'], table_info['name'])
if size:
total_size += size
table_count += 1
table_details.append({
'table': f"{table_info['schema']}.{table_info['name']}",
'size': size,
'size_formatted': self._format_size(size)
})
return {
'success': True,
'total_size': total_size,
'total_size_formatted': self._format_size(total_size),
'table_count': table_count,
'table_details': table_details[:10], # أول 10 جداول فقط
'estimated_transfer_time': self._estimate_transfer_time(total_size)
}
except Exception as e:
return {'success': False, 'error': str(e)}
def _get_table_size(self, uri, schema, table_name):
"""الحصول على حجم جدول معين من PostgreSQL"""
try:
parsed = self.parse_postgres_uri(uri)
if not parsed:
return None
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
# استعلام لجلب حجم الجدول
sql = f"""
SELECT pg_total_relation_size('"{schema}"."{table_name}"') as size;
"""
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
size_line = result.stdout.strip()
if size_line and size_line.isdigit():
return int(size_line)
return None
except Exception:
return None
def _estimate_transfer_time(self, size_bytes):
"""تقدير وقت النقل بناءً على حجم البيانات"""
# سرعات تقديرية للشبكة
speeds = {
'slow': 1024 * 1024, # 1 MB/s
'average': 5 * 1024 * 1024, # 5 MB/s
'fast': 20 * 1024 * 1024, # 20 MB/s
'very_fast': 100 * 1024 * 1024 # 100 MB/s
}
estimates = {}
for speed_name, speed_bps in speeds.items():
seconds = size_bytes / speed_bps if speed_bps > 0 else 0
estimates[speed_name] = {
'seconds': seconds,
'formatted': self._format_time(seconds)
}
return estimates
def get_migration_detailed_progress(self, migration_id):
"""الحصول على تقدم مفصل للترحيل"""
progress = self.calculate_migration_progress(migration_id)
if not progress.get('success', True):
return progress
# إضافة معلومات إضافية
with self._lock:
if migration_id in self.migrations:
mig_data = self.migrations[migration_id]
progress['source'] = mig_data.get('source', 'Unknown')
progress['destination'] = mig_data.get('destination', 'Unknown')
progress['started_at'] = mig_data.get('started_at')
if 'schemas' in mig_data:
progress['selected_schemas'] = mig_data['schemas']
if 'tables' in mig_data:
progress['selected_tables'] = mig_data['tables']
# إضافة شريط تقدم نصي
percentage = progress.get('progress_percentage', 0)
progress['progress_bar'] = self._format_progress_bar(percentage)
return progress
def _format_progress_bar(self, percentage, width=30):
"""تنسيق شريط التقدم كنص"""
filled = int(width * percentage / 100)
empty = width - filled
bar = '' * filled + '' * empty
return f"[{bar}] {percentage:.1f}%"
def _format_size(self, size_bytes):
"""تنسيق حجم الملف"""
if size_bytes == 0:
return '0 B'
size_names = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.2f} {size_names[i]}"
def _format_time(self, seconds):
"""تنسيق الوقت"""
if seconds < 60:
return f"{seconds:.0f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
elif seconds < 86400:
hours = seconds / 3600
return f"{hours:.1f}h"
else:
days = seconds / 86400
return f"{days:.1f}d"
def set_migration_estimates(self, migration_id, estimated_tables, estimated_size):
"""تعيين التقديرات للترحيل الجاري"""
with self._lock:
if migration_id in self.migrations:
self.migrations[migration_id]['estimated_tables'] = estimated_tables
self.migrations[migration_id]['estimated_size'] = estimated_size
def compare_source_destination(self, migration_id):
"""مقارنة المصدر والوجهة بعد الترحيل"""
with self._lock:
if migration_id not in self.migrations:
return {'success': False, 'error': 'Migration not found'}
mig_data = self.migrations[migration_id]
if not mig_data.get('success'):
return {'success': False, 'error': 'Migration not completed successfully'}
# مقارنة بسيطة (يمكن توسيعها)
source_tables = mig_data.get('stats', {}).get('tables_created', 0)
return {
'success': True,
'source_tables_count': source_tables,
'destination_tables_count': source_tables, # نفس العدد إذا نجح الترحيل
'match': True,
'verification_time': datetime.utcnow().isoformat()
}
def parse_uri_to_env(self, uri, prefix=''):
"""Parse URI to environment variables format"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return None
env_vars = {
f'{prefix}DB_HOST': parsed['host'],
f'{prefix}DB_PORT': str(parsed['port']),
f'{prefix}DB_USER': parsed['user'],
f'{prefix}DB_PASSWORD': parsed['password'],
f'{prefix}DB_NAME': parsed['database'],
f'{prefix}DB_URI': parsed['uri']
}
# Create connection string variants
env_vars[f'{prefix}DB_CONNECTION_STRING'] = f"host={parsed['host']} port={parsed['port']} dbname={parsed['database']} user={parsed['user']} password={parsed['password']}"
env_vars[f'{prefix}DB_URL'] = parsed['uri']
return env_vars
def test_connection(self, uri_input):
"""Test PostgreSQL connection with detailed error info"""
# 🔴 الخطوة 1: معالجة إذا كان المدخل JSON (dict)
if isinstance(uri_input, dict):
print(f"⚠️ Received JSON object, extracting URI string...")
# استخراج الـ URI من المفتاح 'uri' أو 'url'
if 'uri' in uri_input:
uri = uri_input['uri']
print(f"✅ Extracted URI from 'uri' key")
elif 'url' in uri_input:
uri = uri_input['url']
print(f"✅ Extracted URI from 'url' key")
else:
# إذا لم يكن هناك مفتاح واضح، ابحث في كل المفاتيح
error_msg = f'JSON must contain "uri" or "url" key. Received keys: {list(uri_input.keys())}'
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
else:
# إذا كان string مباشرة
uri = uri_input
# 🔴 تحقق أن uri هو string الآن
if not isinstance(uri, str):
error_msg = f'URI must be a string. Got type: {type(uri)}'
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
print(f"🔍 Testing connection to: {uri}")
parsed = self.parse_postgres_uri(uri)
if not parsed:
error_msg = f'Invalid URI format: {uri}'
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
# استخراج معلومات الاتصال للطباعة عند الفشل
host = parsed['host']
port = parsed['port']
user = parsed['user']
database = parsed['database']
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
cmd = [
'psql',
'-h', host,
'-p', str(port),
'-U', user,
'-d', database,
'-c', 'SELECT version(); SELECT current_database();',
'-t'
]
try:
print(f" 🔗 Attempting connection to host: {host}:{port}")
print(f" 👤 User: {user}, Database: {database}")
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
version = lines[0] if len(lines) > 0 else ''
db_name = lines[1] if len(lines) > 1 else ''
print(f"✅ Connection successful to {host}:{port}")
return {
'success': True,
'message': 'Connection successful',
'version': version,
'database': db_name,
'connection': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
else:
# 🔴 هنا طباعة تفاصيل الفشل مع الـ host
error_output = result.stderr.strip()
print(f"❌ Connection FAILED to host: {host}:{port}")
print(f" 🔸 User: {user}")
print(f" 🔸 Database: {database}")
print(f" 🔸 Error details: {error_output[:200]}") # أول 200 حرف فقط
return {
'success': False,
'error': error_output,
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
except subprocess.TimeoutExpired:
print(f"❌ Connection TIMEOUT to host: {host}:{port}")
print(f" 🔸 Server not responding after 10 seconds")
print(f" 🔸 Please check: firewall, network, server status")
return {
'success': False,
'error': 'Connection timeout',
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
except FileNotFoundError:
print(f"❌ PSQL CLIENT NOT FOUND")
print(f" 🔸 The 'psql' command-line tool is not installed")
print(f" 🔸 Install PostgreSQL client tools and try again")
return {
'success': False,
'error': 'psql command not found. Install PostgreSQL client.',
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
except Exception as e:
print(f"❌ UNEXPECTED ERROR connecting to host: {host}:{port}")
print(f" 🔸 Error type: {type(e).__name__}")
print(f" 🔸 Error message: {str(e)}")
return {
'success': False,
'error': str(e),
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
def get_schemas(self, uri):
"""Get list of schemas from PostgreSQL database"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return {'success': False, 'error': 'Invalid URI format'}
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
sql = """
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY schema_name;
"""
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
schemas = [s.strip() for s in result.stdout.splitlines() if s.strip()]
return {
'success': True,
'schemas': schemas,
'count': len(schemas)
}
else:
return {'success': False, 'error': result.stderr.strip()}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_tables(self, uri, schema=''):
"""Get list of tables from PostgreSQL database"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return {'success': False, 'error': 'Invalid URI fformat'}
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
if schema:
sql = f"""
SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = '{schema}'
ORDER BY table_name;
"""
else:
sql = """
SELECT table_schema, table_name, table_type
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY table_schema, table_name;
"""
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = [line.strip() for line in result.stdout.splitlines() if line.strip()]
tables = []
if schema:
for line in lines:
if '|' in line:
table_name, table_type = line.split('|')
tables.append({
'name': table_name.strip(),
'type': table_type.strip(),
'schema': schema
})
else:
for line in lines:
if '|' in line:
parts = line.split('|')
if len(parts) >= 3:
table_schema, table_name, table_type = parts[:3]
tables.append({
'schema': table_schema.strip(),
'name': table_name.strip(),
'type': table_type.strip()
})
return {
'success': True,
'tables': tables,
'count': len(tables)
}
else:
return {'success': False, 'error': result.stderr.strip()}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_table_counts(self, uri, schema=None):
"""Get row counts for tables"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return {}
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
if schema:
sql = f"""
SELECT table_name,
(SELECT COUNT(*) FROM "{schema}"."{table_name}") as row_count
FROM information_schema.tables
WHERE table_schema = '{schema}'
ORDER BY table_name;
"""
else:
sql = """
SELECT table_schema, table_name,
(SELECT COUNT(*) FROM information_schema.tables t2
WHERE t2.table_schema = t1.table_schema
AND t2.table_name = t1.table_name) as row_count
FROM information_schema.tables t1
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY table_schema, table_name;
"""
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
table_counts = {}
if result.returncode == 0:
for line in result.stdout.splitlines():
if line.strip() and '|' in line:
parts = line.split('|')
if len(parts) >= 3:
table_schema, table_name, count = parts[:3]
key = f"{table_schema.strip()}.{table_name.strip()}"
table_counts[key] = int(count.strip()) if count.strip().isdigit() else 0
return table_counts
except Exception:
return {}
def run_migration(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
"""Run migration and return results"""
logs = []
def log(msg, level='info'):
log_entry = {
'timestamp': time.time(),
'message': msg,
'level': level
}
logs.append(log_entry)
print(f"[{migration_id}] {msg}")
try:
log(f"🔧 Starting PostgreSQL Migration {migration_id}", 'info')
log(f"Source: {source_uri}", 'info')
log(f"Destination: {dest_uri}", 'info')
if schemas:
log(f"Selected schemas: {', '.join(schemas)}", 'info')
if tables:
log(f"Selected tables: {', '.join(tables)}", 'info')
# Build pg_dump command
dump_cmd = ['pg_dump', '--dbname', source_uri, '--clean', '--if-exists', '--no-owner']
# Add schema/table filters
if tables:
for table in tables:
if '.' in table:
dump_cmd.extend(['-t', table])
else:
log(f"⚠️ Table '{table}' should include schema (e.g., public.table)", 'warning')
elif schemas:
for schema in schemas:
dump_cmd.extend(['-n', schema])
# Build psql command
psql_cmd = ['psql', '--dbname', dest_uri, '--single-transaction']
# Create pipe command
full_cmd = ' '.join(dump_cmd) + ' | ' + ' '.join(psql_cmd)
log(f"Executing: {full_cmd[:100]}...", 'info')
# Run the migration
start_time = time.time()
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=300)
end_time = time.time()
execution_time = end_time - start_time
if result.returncode == 0:
log(f"✅ Migration completed successfully!", 'success')
log(f"⏱️ Execution time: {execution_time:.2f} seconds", 'info')
# Parse statistics
create_tables = result.stdout.count('CREATE TABLE')
if create_tables > 0:
log(f"📊 Tables created: {create_tables}", 'info')
alter_tables = result.stdout.count('ALTER TABLE')
create_indexes = result.stdout.count('CREATE INDEX')
if alter_tables > 0:
log(f"🔄 Tables altered: {alter_tables}", 'info')
if create_indexes > 0:
log(f"📈 Indexes created: {create_indexes}", 'info')
# Extract environment variables from URIs
source_env = self.parse_uri_to_env(source_uri, 'SRC_')
dest_env = self.parse_uri_to_env(dest_uri, 'DEST_')
all_env = {}
if source_env:
all_env.update(source_env)
if dest_env:
all_env.update(dest_env)
return {
'success': True,
'migration_id': migration_id,
'execution_time': execution_time,
'stats': {
'tables_created': create_tables,
'tables_altered': alter_tables,
'indexes_created': create_indexes
},
'environment_variables': all_env,
'logs': logs
}
else:
error_msg = result.stderr[:1000] if result.stderr else 'Unknown error'
log(f"❌ Migration failed: {error_msg}", 'error')
return {
'success': False,
'migration_id': migration_id,
'error': error_msg,
'logs': logs
}
except subprocess.TimeoutExpired:
log("❌ Migration timeout (5 minutes)", 'error')
return {
'success': False,
'migration_id': migration_id,
'error': 'Migration timeout',
'logs': logs
}
except Exception as e:
log(f"❌ Error: {str(e)}", 'error')
return {
'success': False,
'migration_id': migration_id,
'error': str(e),
'logs': logs
}
def start_migration_async(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
"""Start migration in background thread"""
def migration_task():
result = self.run_migration(migration_id, source_uri, dest_uri, schemas, tables)
with self._lock:
self.migrations[migration_id] = result
thread = threading.Thread(target=migration_task, daemon=True)
thread.start()
# Initialize migration entry
with self._lock:
self.migrations[migration_id] = {
'status': 'running',
'started_at': time.time(),
'source': source_uri,
'destination': dest_uri
}
return migration_id
def get_migration_status(self, migration_id):
"""Get status of a migration"""
with self._lock:
return self.migrations.get(migration_id)
def list_migrations(self):
"""List all migrations"""
with self._lock:
migrations_list = []
for mig_id, mig_data in self.migrations.items():
if isinstance(mig_data, dict) and 'success' in mig_data:
status = 'completed' if mig_data['success'] else 'failed'
else:
status = 'running'
migrations_list.append({
'id': mig_id,
'status': status,
'data': mig_data
})
return migrations_list
# ============================================================================
# Part 2: S3 to S3 Migrator (S3ToS3Migrator)
# ============================================================================
class S3ToS3Migrator:
"""S3 to S3 migration engine"""
def __init__(self):
self.migrations = {}
self._lock = threading.Lock()
self.source_clients = {}
self.dest_clients = {}
# ==================== S3 Client Management ====================
def get_source_client(self, access_key_id, secret_access_key, region='us-east-1',
endpoint_url=None, session_token=None):
"""Get or create source S3 client"""
cache_key = f"src:{access_key_id}:{secret_access_key}:{region}:{endpoint_url}"
if cache_key in self.source_clients:
return self.source_clients[cache_key]
try:
s3_config = {
'aws_access_key_id': access_key_id,
'aws_secret_access_key': secret_access_key,
'region_name': region
}
if session_token:
s3_config['aws_session_token'] = session_token
if endpoint_url:
s3_config['endpoint_url'] = endpoint_url
# تعطيل SSL للخدمات المحلية
if 'localhost' in endpoint_url or '127.0.0.1' in endpoint_url:
s3_config['verify'] = False
client = boto3.client('s3', **s3_config)
self.source_clients[cache_key] = client
return client
except Exception as e:
print(f"❌ Failed to create source S3 client: {str(e)}")
return None
def calculate_migration_progress(self, migration_id):
"""حساب نسبة التقدم في الترحيل بناءً على عدد وحجم الملفات"""
with self._lock:
if migration_id not in self.migrations:
return {'success': False, 'error': 'Migration not found'}
mig_data = self.migrations[migration_id]
if mig_data.get('status') == 'running':
return self._get_running_progress(migration_id)
else:
return self._get_completed_progress(migration_id)
def _get_running_progress(self, migration_id):
"""الحصول على تقدم الترحيل الجاري"""
mig_data = self.migrations[migration_id]
# التحقق من وجود بيانات التتبع
if 'progress_tracker' not in mig_data:
# إنشاء متتبع تقدم إذا لم يكن موجوداً
mig_data['progress_tracker'] = {
'total_objects': mig_data.get('total_objects', 0),
'total_size': mig_data.get('total_size', 0),
'processed_objects': 0,
'processed_size': 0,
'failed_objects': 0,
'failed_size': 0,
'current_object': None,
'current_object_size': 0,
'current_object_progress': 0,
'start_time': mig_data.get('started_at', time.time()),
'last_update': time.time(),
'objects_details': [],
'speed_samples': [],
'estimated_time_remaining': 0
}
tracker = mig_data['progress_tracker']
elapsed_time = time.time() - tracker['start_time']
# حساب النسب المئوية
objects_percentage = (tracker['processed_objects'] / tracker['total_objects'] * 100) if tracker['total_objects'] > 0 else 0
size_percentage = (tracker['processed_size'] / tracker['total_size'] * 100) if tracker['total_size'] > 0 else 0
# حساب السرعة (متوسط آخر 10 ثواني)
current_time = time.time()
tracker['speed_samples'] = [s for s in tracker.get('speed_samples', []) if current_time - s['time'] < 10]
if tracker['speed_samples']:
recent_size = sum(s['size'] for s in tracker['speed_samples'])
recent_time = tracker['speed_samples'][-1]['time'] - tracker['speed_samples'][0]['time'] if len(tracker['speed_samples']) > 1 else 1
current_speed = recent_size / recent_time if recent_time > 0 else 0
else:
current_speed = tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0
# تقدير الوقت المتبقي
remaining_size = tracker['total_size'] - tracker['processed_size'] - tracker['failed_size']
if current_speed > 0:
eta_seconds = remaining_size / current_speed
else:
eta_seconds = 0
# تجميع تفاصيل الملفات الأخيرة
recent_objects = tracker['objects_details'][-10:] if tracker['objects_details'] else []
return {
'migration_id': migration_id,
'status': 'running',
'timestamp': time.time(),
'total_objects': tracker['total_objects'],
'total_size': tracker['total_size'],
'total_size_formatted': self._format_size(tracker['total_size']),
'processed_objects': tracker['processed_objects'],
'processed_size': tracker['processed_size'],
'processed_size_formatted': self._format_size(tracker['processed_size']),
'failed_objects': tracker['failed_objects'],
'failed_size': tracker['failed_size'],
'failed_size_formatted': self._format_size(tracker['failed_size']),
'remaining_objects': tracker['total_objects'] - tracker['processed_objects'] - tracker['failed_objects'],
'remaining_size': tracker['total_size'] - tracker['processed_size'] - tracker['failed_size'],
'remaining_size_formatted': self._format_size(tracker['total_size'] - tracker['processed_size'] - tracker['failed_size']),
'objects_percentage': round(objects_percentage, 2),
'size_percentage': round(size_percentage, 2),
'elapsed_time': elapsed_time,
'elapsed_time_formatted': self._format_time(elapsed_time),
'current_speed': current_speed,
'current_speed_formatted': f"{self._format_size(current_speed)}/s",
'average_speed': tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0,
'average_speed_formatted': f"{self._format_size(tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0)}/s",
'eta_seconds': eta_seconds,
'eta_formatted': self._format_time(eta_seconds),
'current_object': tracker['current_object'],
'current_object_size': tracker['current_object_size'],
'current_object_size_formatted': self._format_size(tracker['current_object_size']),
'current_object_progress': tracker['current_object_progress'],
'recent_objects': recent_objects,
'progress_bar_objects': self._format_progress_bar(objects_percentage),
'progress_bar_size': self._format_progress_bar(size_percentage),
'estimated_completion_time': time.time() + eta_seconds if eta_seconds > 0 else None
}
def _get_completed_progress(self, migration_id):
"""الحصول على إحصائيات الترحيل المكتمل"""
mig_data = self.migrations[migration_id]
stats = mig_data.get('stats', {})
successful = mig_data.get('successful', [])
failed = mig_data.get('failed', [])
total_objects = stats.get('total_objects', 0)
migrated = stats.get('migrated', 0)
failed_count = stats.get('failed', 0)
total_size = stats.get('total_size', 0)
migrated_size = stats.get('migrated_size', 0)
completion_time = mig_data.get('completed_at', time.time())
start_time = mig_data.get('started_at', completion_time)
total_time = completion_time - start_time
return {
'migration_id': migration_id,
'status': 'completed' if mig_data.get('success') else 'failed',
'success': mig_data.get('success', False),
'timestamp': time.time(),
'total_objects': total_objects,
'total_size': total_size,
'total_size_formatted': self._format_size(total_size),
'migrated_objects': migrated,
'migrated_size': migrated_size,
'migrated_size_formatted': self._format_size(migrated_size),
'failed_objects': failed_count,
'failed_size': total_size - migrated_size,
'failed_size_formatted': self._format_size(total_size - migrated_size),
'objects_percentage': 100.0 if migrated == total_objects else round((migrated / total_objects * 100) if total_objects > 0 else 0, 2),
'size_percentage': 100.0 if migrated_size == total_size else round((migrated_size / total_size * 100) if total_size > 0 else 0, 2),
'total_time': total_time,
'total_time_formatted': self._format_time(total_time),
'average_speed': migrated_size / total_time if total_time > 0 else 0,
'average_speed_formatted': f"{self._format_size(migrated_size / total_time if total_time > 0 else 0)}/s",
'successful_objects': successful[:10] if successful else [],
'failed_objects_list': failed[:10] if failed else [],
'progress_bar': self._format_progress_bar(100.0 if mig_data.get('success') else 0)
}
def update_progress(self, migration_id, processed_objects=1, processed_size=0,
failed_objects=0, failed_size=0, current_object=None,
object_details=None):
"""تحديث تقدم الترحيل (للاستخدام أثناء الترحيل)"""
with self._lock:
if migration_id not in self.migrations:
return False
mig_data = self.migrations[migration_id]
if 'progress_tracker' not in mig_data:
mig_data['progress_tracker'] = {
'total_objects': mig_data.get('total_objects', 0),
'total_size': mig_data.get('total_size', 0),
'processed_objects': 0,
'processed_size': 0,
'failed_objects': 0,
'failed_size': 0,
'start_time': mig_data.get('started_at', time.time()),
'speed_samples': []
}
tracker = mig_data['progress_tracker']
# تحديث الإحصائيات
tracker['processed_objects'] += processed_objects
tracker['processed_size'] += processed_size
tracker['failed_objects'] += failed_objects
tracker['failed_size'] += failed_size
tracker['last_update'] = time.time()
# تحديث العنصر الحالي
if current_object:
tracker['current_object'] = current_object
# إضافة عينة سرعة
tracker['speed_samples'].append({
'time': time.time(),
'size': processed_size
})
# إضافة تفاصيل الكائن إذا وجدت
if object_details:
if 'objects_details' not in tracker:
tracker['objects_details'] = []
tracker['objects_details'].append({
'timestamp': time.time(),
**object_details
})
return True
def get_live_progress_stream(self, migration_id, interval=1):
"""الحصول على تدفق مباشر لتقدم الترحيل (للاستخدام مع WebSockets)"""
import time
while True:
progress = self.calculate_migration_progress(migration_id)
if progress.get('status') != 'running':
yield progress
break
yield progress
time.sleep(interval)
def format_live_status(self, migration_id):
"""تنسيق الحالة المباشرة كنص مقروء"""
progress = self.calculate_migration_progress(migration_id)
if not progress.get('success', True) and progress.get('status') != 'running':
return f"❌ Error: {progress.get('error', 'Unknown error')}"
if progress['status'] == 'running':
status_lines = [
f"🚀 Migration {migration_id} - {progress['status'].upper()}",
f"📊 Progress: {progress['progress_bar_objects']}",
f"📦 Objects: {progress['processed_objects']}/{progress['total_objects']} ({progress['objects_percentage']}%)",
f"💾 Size: {progress['processed_size_formatted']}/{progress['total_size_formatted']} ({progress['size_percentage']}%)",
f"⚡ Speed: {progress['current_speed_formatted']} (Avg: {progress['average_speed_formatted']})",
f"⏱️ Elapsed: {progress['elapsed_time_formatted']} | ETA: {progress['eta_formatted']}",
]
if progress['current_object']:
status_lines.append(f"📄 Current: {progress['current_object']} ({progress['current_object_size_formatted']})")
if progress['failed_objects'] > 0:
status_lines.append(f"❌ Failed: {progress['failed_objects']} objects ({progress['failed_size_formatted']})")
return '\n'.join(status_lines)
else:
# عرض ملخص الترحيل المكتمل
status_lines = [
f"{'' if progress['success'] else ''} Migration {migration_id} - {progress['status'].upper()}",
f"📊 Summary:",
f"📦 Objects: {progress['migrated_objects']}/{progress['total_objects']} ({progress['objects_percentage']}%)",
f"💾 Size: {progress['migrated_size_formatted']}/{progress['total_size_formatted']} ({progress['size_percentage']}%)",
f"⏱️ Total time: {progress['total_time_formatted']}",
f"⚡ Average speed: {progress['average_speed_formatted']}",
]
if progress.get('failed_objects', 0) > 0:
status_lines.append(f"❌ Failed: {progress['failed_objects']} objects")
return '\n'.join(status_lines)
def estimate_transfer_time(self, total_size, current_speed=None):
"""تقدير وقت النقل بناءً على حجم البيانات"""
# سرعات تقديرية للشبكة (بالبايت في الثانية)
speed_profiles = {
'slow': 1 * 1024 * 1024, # 1 MB/s
'average': 5 * 1024 * 1024, # 5 MB/s
'fast': 20 * 1024 * 1024, # 20 MB/s
'very_fast': 100 * 1024 * 1024, # 100 MB/s
'gigabit': 125 * 1024 * 1024, # 1 Gbps ~ 125 MB/s
}
estimates = {}
for profile, speed in speed_profiles.items():
seconds = total_size / speed if speed > 0 else 0
estimates[profile] = {
'speed': speed,
'speed_formatted': f"{self._format_size(speed)}/s",
'seconds': seconds,
'formatted': self._format_time(seconds)
}
# إذا كان لدينا سرعة حالية، أضف تقدير بناءً عليها
if current_speed and current_speed > 0:
seconds = total_size / current_speed
estimates['current'] = {
'speed': current_speed,
'speed_formatted': f"{self._format_size(current_speed)}/s",
'seconds': seconds,
'formatted': self._format_time(seconds)
}
return estimates
def _format_progress_bar(self, percentage, width=30):
"""تنسيق شريط التقدم كنص"""
filled = int(width * percentage / 100)
empty = width - filled
bar = '' * filled + '' * empty
return f"[{bar}] {percentage:.1f}%"
def _format_time(self, seconds):
"""تنسيق الوقت"""
if seconds < 60:
return f"{seconds:.0f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
elif seconds < 86400:
hours = seconds / 3600
return f"{hours:.1f}h"
else:
days = seconds / 86400
return f"{days:.1f}d"
def get_migration_speed_history(self, migration_id):
"""الحصول على تاريخ السرعة للترحيل"""
with self._lock:
if migration_id not in self.migrations:
return {'success': False, 'error': 'Migration not found'}
mig_data = self.migrations[migration_id]
tracker = mig_data.get('progress_tracker', {})
speed_samples = tracker.get('speed_samples', [])
return {
'success': True,
'migration_id': migration_id,
'speed_samples': speed_samples,
'average_speed': sum(s['size'] for s in speed_samples) / len(speed_samples) if speed_samples else 0,
'peak_speed': max((s['size'] for s in speed_samples), default=0),
'samples_count': len(speed_samples)
}
def compare_source_destination(self, migration_id):
"""مقارنة المصدر والوجهة بعد الترحيل"""
with self._lock:
if migration_id not in self.migrations:
return {'success': False, 'error': 'Migration not found'}
mig_data = self.migrations[migration_id]
if not mig_data.get('success'):
return {'success': False, 'error': 'Migration not completed successfully'}
stats = mig_data.get('stats', {})
return {
'success': True,
'source_objects': stats.get('total_objects', 0),
'destination_objects': stats.get('migrated', 0),
'source_size': stats.get('total_size', 0),
'destination_size': stats.get('migrated_size', 0),
'match': stats.get('total_objects', 0) == stats.get('migrated', 0),
'verification_time': datetime.utcnow().isoformat()
}
def get_destination_client(self, access_key_id, secret_access_key, region='us-east-1',
endpoint_url=None, session_token=None):
"""Get or create destination S3 client"""
cache_key = f"dst:{access_key_id}:{secret_access_key}:{region}:{endpoint_url}"
if cache_key in self.dest_clients:
return self.dest_clients[cache_key]
try:
s3_config = {
'aws_access_key_id': access_key_id,
'aws_secret_access_key': secret_access_key,
'region_name': region
}
if session_token:
s3_config['aws_session_token'] = session_token
if endpoint_url:
s3_config['endpoint_url'] = endpoint_url
if 'localhost' in endpoint_url or '127.0.0.1' in endpoint_url:
s3_config['verify'] = False
client = boto3.client('s3', **s3_config)
self.dest_clients[cache_key] = client
return client
except Exception as e:
print(f"❌ Failed to create destination S3 client: {str(e)}")
return None
# ==================== Connection Testing ====================
def test_source_connection(self, access_key_id, secret_access_key, region='us-east-1',
endpoint_url=None, session_token=None):
"""Test source S3 connection"""
return self._test_connection('Source', access_key_id, secret_access_key,
region, endpoint_url, session_token)
def test_destination_connection(self, access_key_id, secret_access_key, region='us-east-1',
endpoint_url=None, session_token=None):
"""Test destination S3 connection"""
return self._test_connection('Destination', access_key_id, secret_access_key,
region, endpoint_url, session_token)
def _test_connection(self, conn_type, access_key_id, secret_access_key, region,
endpoint_url, session_token):
"""Internal method to test S3 connection"""
if not access_key_id or not secret_access_key:
print(f"{conn_type} AWS credentials are required")
return {
'success': False,
'error': f'{conn_type} AWS credentials are required',
'details': {
'access_key_id_provided': bool(access_key_id),
'secret_key_provided': bool(secret_access_key)
}
}
print(f"🔍 Testing {conn_type} S3 connection...")
print(f" 🔸 Access Key ID: {access_key_id[:10]}...")
print(f" 🔸 Region: {region}")
print(f" 🔸 Endpoint URL: {endpoint_url or 'AWS S3 (default)'}")
try:
if conn_type == 'Source':
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
else:
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
if not client:
return {
'success': False,
'error': f'Failed to create {conn_type} S3 client'
}
# Test connection by listing buckets
start_time = time.time()
response = client.list_buckets()
end_time = time.time()
response_time = end_time - start_time
buckets = [bucket['Name'] for bucket in response.get('Buckets', [])]
print(f"{conn_type} S3 Connection Successful!")
print(f" 🔸 Response time: {response_time:.2f} seconds")
print(f" 🔸 Available buckets: {len(buckets)}")
return {
'success': True,
'message': f'{conn_type} S3 connection successful',
'buckets': buckets[:10], # First 10 buckets only
'bucket_count': len(buckets),
'response_time': response_time,
'connection_details': {
'type': conn_type,
'region': region,
'endpoint': endpoint_url or 'AWS S3 (default)',
'has_credentials': True
}
}
except NoCredentialsError:
error_msg = f"{conn_type} No credentials provided or credentials are invalid"
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
except EndpointConnectionError as e:
error_msg = f"{conn_type} Cannot connect to endpoint: {endpoint_url}"
print(f"{error_msg}")
return {'success': False, 'error': error_msg, 'details': str(e)}
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
print(f"{conn_type} AWS Error: {error_code} - {error_msg}")
# 404 usually means list_buckets permission denied, but connection works
if error_code == '404':
return {
'success': True,
'message': f'{conn_type} S3 connection successful (credentials valid, but list_buckets not allowed)',
'connection_details': {
'type': conn_type,
'region': region,
'endpoint': endpoint_url or 'AWS S3 (default)',
'note': '404: Not Found - This is usually permission denied for list_buckets'
}
}
return {
'success': False,
'error': f"{error_code}: {error_msg}",
'aws_error': error_code
}
except Exception as e:
error_msg = f"{conn_type} Unexpected error: {str(e)}"
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
# ==================== Bucket Operations ====================
def list_buckets(self, access_key_id, secret_access_key, region='us-east-1',
endpoint_url=None, session_token=None, is_source=True):
"""List all S3 buckets with details"""
try:
if is_source:
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
else:
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
if not client:
return {'success': False, 'error': 'Failed to create S3 client'}
response = client.list_buckets()
buckets_info = []
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
creation_date = bucket['CreationDate']
# Get bucket location (region)
try:
location = client.get_bucket_location(Bucket=bucket_name)
bucket_region = location.get('LocationConstraint', 'us-east-1')
if not bucket_region:
bucket_region = 'us-east-1'
except:
bucket_region = region
# Get bucket size and object count (optional, may be slow)
try:
size_response = client.list_objects_v2(Bucket=bucket_name, MaxKeys=1000)
object_count = len(size_response.get('Contents', []))
total_size = sum(obj.get('Size', 0) for obj in size_response.get('Contents', []))
except:
object_count = 0
total_size = 0
buckets_info.append({
'name': bucket_name,
'creation_date': creation_date.isoformat() if hasattr(creation_date, 'isoformat') else str(creation_date),
'region': bucket_region,
'object_count': object_count,
'total_size': total_size
})
return {
'success': True,
'buckets': buckets_info,
'count': len(buckets_info)
}
except Exception as e:
return {'success': False, 'error': str(e)}
def list_objects(self, bucket_name, prefix='', max_keys=1000, is_source=True,
access_key_id=None, secret_access_key=None, region='us-east-1',
endpoint_url=None, session_token=None):
"""List objects in a bucket with pagination support"""
try:
if is_source:
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
else:
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
if not client:
return {'success': False, 'error': 'Failed to create S3 client'}
all_objects = []
continuation_token = None
while True:
list_kwargs = {
'Bucket': bucket_name,
'Prefix': prefix,
'MaxKeys': max_keys
}
if continuation_token:
list_kwargs['ContinuationToken'] = continuation_token
response = client.list_objects_v2(**list_kwargs)
for obj in response.get('Contents', []):
all_objects.append({
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat() if hasattr(obj['LastModified'], 'isoformat') else str(obj['LastModified']),
'etag': obj['ETag'].strip('"')
})
if response.get('IsTruncated'):
continuation_token = response.get('NextContinuationToken')
else:
break
return {
'success': True,
'objects': all_objects,
'count': len(all_objects),
'bucket': bucket_name,
'prefix': prefix,
'total_size': sum(obj['size'] for obj in all_objects)
}
except Exception as e:
return {'success': False, 'error': str(e)}
def bucket_exists(self, bucket_name, is_source=True, access_key_id=None,
secret_access_key=None, region='us-east-1', endpoint_url=None,
session_token=None):
"""Check if bucket exists and is accessible"""
try:
if is_source:
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
else:
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
if not client:
return {'success': False, 'error': 'Failed to create S3 client'}
client.head_bucket(Bucket=bucket_name)
# Get bucket location
try:
location = client.get_bucket_location(Bucket=bucket_name)
bucket_region = location.get('LocationConstraint', 'us-east-1')
if not bucket_region:
bucket_region = 'us-east-1'
except:
bucket_region = region
return {
'success': True,
'exists': True,
'bucket': bucket_name,
'region': bucket_region
}
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return {'success': True, 'exists': False, 'bucket': bucket_name}
elif error_code == '403':
return {'success': False, 'error': 'Access denied to bucket', 'exists': False}
else:
return {'success': False, 'error': str(e)}
except Exception as e:
return {'success': False, 'error': str(e)}
def create_bucket(self, bucket_name, region='us-east-1', endpoint_url=None,
access_key_id=None, secret_access_key=None, session_token=None):
"""Create a new S3 bucket in destination"""
try:
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
if not client:
return {'success': False, 'error': 'Failed to create S3 client'}
# Check if bucket already exists
exists_check = self.bucket_exists(bucket_name, is_source=False,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
region=region, endpoint_url=endpoint_url,
session_token=session_token)
if exists_check.get('exists'):
return {
'success': True,
'message': f'Bucket already exists: {bucket_name}',
'bucket': bucket_name,
'created': False
}
# Create bucket
create_kwargs = {'Bucket': bucket_name}
# Special handling for us-east-1
if region != 'us-east-1':
create_kwargs['CreateBucketConfiguration'] = {
'LocationConstraint': region
}
client.create_bucket(**create_kwargs)
return {
'success': True,
'message': f'Bucket created successfully: {bucket_name}',
'bucket': bucket_name,
'region': region,
'created': True
}
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
return {
'success': False,
'error': f"{error_code}: {error_msg}",
'bucket': bucket_name
}
except Exception as e:
return {'success': False, 'error': str(e), 'bucket': bucket_name}
# ==================== Object Migration ====================
def migrate_object(self, source_bucket, source_key, dest_bucket, dest_key=None,
source_credentials=None, dest_credentials=None,
preserve_metadata=True, preserve_acl=False,
metadata=None, storage_class='STANDARD'):
"""
Migrate a single object from source S3 to destination S3
"""
logs = []
def log(msg, level='info'):
log_entry = {
'timestamp': time.time(),
'message': msg,
'level': level
}
logs.append(log_entry)
print(f"[Migration] {msg}")
try:
if not dest_key:
dest_key = source_key
log(f"📦 Migrating: s3://{source_bucket}/{source_key} → s3://{dest_bucket}/{dest_key}")
# Get source client
source_client = self.get_source_client(
source_credentials.get('access_key_id'),
source_credentials.get('secret_access_key'),
source_credentials.get('region', 'us-east-1'),
source_credentials.get('endpoint_url'),
source_credentials.get('session_token')
)
if not source_client:
return {
'success': False,
'error': 'Failed to create source S3 client',
'logs': logs
}
# Get destination client
dest_client = self.get_destination_client(
dest_credentials.get('access_key_id'),
dest_credentials.get('secret_access_key'),
dest_credentials.get('region', 'us-east-1'),
dest_credentials.get('endpoint_url'),
dest_credentials.get('session_token')
)
if not dest_client:
return {
'success': False,
'error': 'Failed to create destination S3 client',
'logs': logs
}
# Get object metadata from source
log("🔍 Getting source object metadata...")
head_response = source_client.head_object(
Bucket=source_bucket,
Key=source_key
)
source_size = head_response['ContentLength']
source_etag = head_response['ETag'].strip('"')
source_last_modified = head_response['LastModified']
log(f" 📊 Size: {self._format_size(source_size)}")
log(f" 🏷️ ETag: {source_etag[:16]}...")
# Prepare metadata
final_metadata = {}
if preserve_metadata:
final_metadata = head_response.get('Metadata', {}).copy()
if metadata:
final_metadata.update(metadata)
# Add migration metadata
final_metadata['migration_source_bucket'] = source_bucket
final_metadata['migration_source_key'] = source_key
final_metadata['migration_timestamp'] = datetime.utcnow().isoformat()
final_metadata['migration_tool'] = 'S3ToS3Migrator'
# Check if destination bucket exists, create if not
dest_bucket_check = self.bucket_exists(
dest_bucket, is_source=False,
access_key_id=dest_credentials.get('access_key_id'),
secret_access_key=dest_credentials.get('secret_access_key'),
region=dest_credentials.get('region', 'us-east-1'),
endpoint_url=dest_credentials.get('endpoint_url'),
session_token=dest_credentials.get('session_token')
)
if not dest_bucket_check.get('exists'):
log(f"⚠️ Destination bucket '{dest_bucket}' does not exist. Creating...")
create_result = self.create_bucket(
dest_bucket,
region=dest_credentials.get('region', 'us-east-1'),
endpoint_url=dest_credentials.get('endpoint_url'),
access_key_id=dest_credentials.get('access_key_id'),
secret_access_key=dest_credentials.get('secret_access_key'),
session_token=dest_credentials.get('session_token')
)
if not create_result['success']:
return {
'success': False,
'error': f"Failed to create destination bucket: {create_result['error']}",
'logs': logs
}
log(f"✅ Created destination bucket: {dest_bucket}")
# Perform copy operation
log("⬆️ Copying object to destination...")
copy_start = time.time()
copy_source = {
'Bucket': source_bucket,
'Key': source_key
}
copy_kwargs = {
'Bucket': dest_bucket,
'Key': dest_key,
'CopySource': copy_source,
'Metadata': final_metadata,
'MetadataDirective': 'REPLACE' if preserve_metadata else 'COPY'
}
if storage_class:
copy_kwargs['StorageClass'] = storage_class
dest_client.copy_object(**copy_kwargs)
copy_time = time.time() - copy_start
# Verify copy
log("🔍 Verifying destination object...")
dest_head = dest_client.head_object(
Bucket=dest_bucket,
Key=dest_key
)
dest_size = dest_head['ContentLength']
dest_etag = dest_head['ETag'].strip('"')
# Generate URL
if dest_credentials.get('endpoint_url'):
dest_url = f"{dest_credentials['endpoint_url']}/{dest_bucket}/{dest_key}"
else:
dest_url = f"https://{dest_bucket}.s3.{dest_credentials.get('region', 'us-east-1')}.amazonaws.com/{dest_key}"
log(f"✅ Migration completed in {copy_time:.2f} seconds")
log(f" 📍 Destination: {dest_url}")
return {
'success': True,
'message': 'Object migrated successfully',
'source': {
'bucket': source_bucket,
'key': source_key,
'size': source_size,
'etag': source_etag,
'last_modified': source_last_modified.isoformat() if hasattr(source_last_modified, 'isoformat') else str(source_last_modified)
},
'destination': {
'bucket': dest_bucket,
'key': dest_key,
'size': dest_size,
'etag': dest_etag,
'url': dest_url
},
'copy_time': copy_time,
'metadata': final_metadata,
'logs': logs
}
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
log(f"❌ AWS Error: {error_code} - {error_msg}", 'error')
return {
'success': False,
'error': f"{error_code}: {error_msg}",
'aws_error': error_code,
'logs': logs
}
except Exception as e:
log(f"❌ Error: {str(e)}", 'error')
return {
'success': False,
'error': str(e),
'logs': logs
}
def migrate_objects_batch(self, objects, source_bucket, dest_bucket,
source_credentials, dest_credentials,
preserve_metadata=True, storage_class='STANDARD',
max_concurrent=5):
"""
Migrate multiple objects in batch
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
logs = []
results = {
'successful': [],
'failed': [],
'total_size': 0,
'total_time': 0
}
def log(msg, level='info'):
log_entry = {
'timestamp': time.time(),
'message': msg,
'level': level
}
logs.append(log_entry)
print(f"[Batch] {msg}")
log(f"🚀 Starting batch migration of {len(objects)} objects")
log(f" 📦 Source: s3://{source_bucket}")
log(f" 📦 Destination: s3://{dest_bucket}")
start_time = time.time()
def migrate_single(obj):
source_key = obj['key'] if isinstance(obj, dict) else obj
dest_key = obj.get('dest_key', source_key) if isinstance(obj, dict) else source_key
result = self.migrate_object(
source_bucket=source_bucket,
source_key=source_key,
dest_bucket=dest_bucket,
dest_key=dest_key,
source_credentials=source_credentials,
dest_credentials=dest_credentials,
preserve_metadata=preserve_metadata,
storage_class=storage_class
)
return {
'source_key': source_key,
'dest_key': dest_key,
'result': result
}
# Execute migrations in parallel
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = [executor.submit(migrate_single, obj) for obj in objects]
for future in as_completed(futures):
try:
migration = future.result()
if migration['result']['success']:
results['successful'].append(migration)
results['total_size'] += migration['result']['destination']['size']
else:
results['failed'].append(migration)
except Exception as e:
results['failed'].append({
'source_key': 'unknown',
'dest_key': 'unknown',
'error': str(e)
})
results['total_time'] = time.time() - start_time
log(f"✅ Batch migration completed in {results['total_time']:.2f} seconds")
log(f" ✅ Successful: {len(results['successful'])}")
log(f" ❌ Failed: {len(results['failed'])}")
log(f" 📦 Total size: {self._format_size(results['total_size'])}")
return {
'success': len(results['failed']) == 0,
'message': f"Migrated {len(results['successful'])}/{len(objects)} objects",
'results': results,
'logs': logs
}
# ==================== Full Migration ====================
def start_migration(self, migration_id, source_config, dest_config,
source_bucket, dest_bucket, prefix='',
include_patterns=None, exclude_patterns=None,
preserve_metadata=True, storage_class='STANDARD',
create_dest_bucket=True, max_concurrent=5):
"""
Start full S3 to S3 migration in background
"""
def migration_task():
result = self.migrate_bucket(
migration_id=migration_id,
source_config=source_config,
dest_config=dest_config,
source_bucket=source_bucket,
dest_bucket=dest_bucket,
prefix=prefix,
include_patterns=include_patterns,
exclude_patterns=exclude_patterns,
preserve_metadata=preserve_metadata,
storage_class=storage_class,
create_dest_bucket=create_dest_bucket,
max_concurrent=max_concurrent
)
with self._lock:
self.migrations[migration_id] = result
thread = threading.Thread(target=migration_task, daemon=True)
thread.start()
# Initialize migration entry
with self._lock:
self.migrations[migration_id] = {
'status': 'running',
'started_at': time.time(),
'source_bucket': source_bucket,
'dest_bucket': dest_bucket,
'prefix': prefix,
'source_config': {k: v[:10] + '...' if k in ['access_key_id', 'secret_access_key'] and v else None
for k, v in source_config.items()},
'dest_config': {k: v[:10] + '...' if k in ['access_key_id', 'secret_access_key'] and v else None
for k, v in dest_config.items()}
}
return migration_id
def migrate_bucket(self, migration_id, source_config, dest_config,
source_bucket, dest_bucket, prefix='',
include_patterns=None, exclude_patterns=None,
preserve_metadata=True, storage_class='STANDARD',
create_dest_bucket=True, max_concurrent=5):
"""
Migrate entire bucket from source to destination
"""
logs = []
def log(msg, level='info'):
log_entry = {
'timestamp': time.time(),
'message': msg,
'level': level
}
logs.append(log_entry)
print(f"[{migration_id}] {msg}")
try:
log(f"🔧 Starting S3 to S3 Migration {migration_id}")
log(f" 📤 Source: s3://{source_bucket}/{prefix}")
log(f" 📥 Destination: s3://{dest_bucket}/{prefix}")
log(f" ⚙️ Preserve metadata: {preserve_metadata}, Storage class: {storage_class}")
# Create destination bucket if needed
if create_dest_bucket:
dest_client = self.get_destination_client(
dest_config.get('access_key_id'),
dest_config.get('secret_access_key'),
dest_config.get('region', 'us-east-1'),
dest_config.get('endpoint_url'),
dest_config.get('session_token')
)
if dest_client:
exists_check = self.bucket_exists(
dest_bucket, is_source=False,
access_key_id=dest_config.get('access_key_id'),
secret_access_key=dest_config.get('secret_access_key'),
region=dest_config.get('region', 'us-east-1'),
endpoint_url=dest_config.get('endpoint_url'),
session_token=dest_config.get('session_token')
)
if not exists_check.get('exists'):
log(f"📦 Creating destination bucket: {dest_bucket}")
create_result = self.create_bucket(
dest_bucket,
region=dest_config.get('region', 'us-east-1'),
endpoint_url=dest_config.get('endpoint_url'),
access_key_id=dest_config.get('access_key_id'),
secret_access_key=dest_config.get('secret_access_key'),
session_token=dest_config.get('session_token')
)
if not create_result['success']:
log(f"❌ Failed to create destination bucket: {create_result['error']}", 'error')
return {
'success': False,
'migration_id': migration_id,
'error': f"Failed to create destination bucket: {create_result['error']}",
'logs': logs
}
log(f"✅ Destination bucket created/verified")
# List objects from source
log(f"🔍 Listing objects from source bucket: {source_bucket}/{prefix}")
list_result = self.list_objects(
bucket_name=source_bucket,
prefix=prefix,
max_keys=1000,
is_source=True,
access_key_id=source_config.get('access_key_id'),
secret_access_key=source_config.get('secret_access_key'),
region=source_config.get('region', 'us-east-1'),
endpoint_url=source_config.get('endpoint_url'),
session_token=source_config.get('session_token')
)
if not list_result['success']:
log(f"❌ Failed to list source bucket: {list_result['error']}", 'error')
return {
'success': False,
'migration_id': migration_id,
'error': f"Failed to list source bucket: {list_result['error']}",
'logs': logs
}
objects = list_result['objects']
total_objects = len(objects)
total_size = list_result['total_size']
log(f"📊 Found {total_objects} objects, total size: {self._format_size(total_size)}")
if total_objects == 0:
log(f"⚠️ No objects to migrate")
return {
'success': True,
'migration_id': migration_id,
'message': 'No objects to migrate',
'stats': {
'total_objects': 0,
'migrated': 0,
'failed': 0,
'total_size': 0
},
'logs': logs
}
# Filter objects if patterns provided
if include_patterns or exclude_patterns:
import fnmatch
filtered_objects = []
for obj in objects:
key = obj['key']
include = True
if include_patterns:
include = any(fnmatch.fnmatch(key, pattern) for pattern in include_patterns)
if exclude_patterns:
include = include and not any(fnmatch.fnmatch(key, pattern) for pattern in exclude_patterns)
if include:
filtered_objects.append(obj)
objects = filtered_objects
log(f"🎯 After filtering: {len(objects)} objects")
# Migrate objects in batches
successful = []
failed = []
total_bytes = 0
log(f"🚀 Starting migration of {len(objects)} objects...")
# Process in chunks
chunk_size = max_concurrent * 10
for i in range(0, len(objects), chunk_size):
chunk = objects[i:i + chunk_size]
log(f"📦 Processing chunk {i//chunk_size + 1}/{(len(objects)-1)//chunk_size + 1}")
batch_result = self.migrate_objects_batch(
objects=chunk,
source_bucket=source_bucket,
dest_bucket=dest_bucket,
source_credentials=source_config,
dest_credentials=dest_config,
preserve_metadata=preserve_metadata,
storage_class=storage_class,
max_concurrent=max_concurrent
)
for success in batch_result['results']['successful']:
successful.append(success)
total_bytes += success['result']['destination']['size']
for fail in batch_result['results']['failed']:
failed.append(fail)
log(f" ✅ Progress: {len(successful)}/{len(objects)} objects, "
f"📦 {self._format_size(total_bytes)}")
# Summary
log(f"📊 Migration Summary:")
log(f" ✅ Successful: {len(successful)} objects")
log(f" ❌ Failed: {len(failed)} objects")
log(f" 📦 Total size: {self._format_size(total_size)}")
log(f" ⏱️ Total time: {time.time() - self.migrations[migration_id]['started_at']:.2f} seconds")
return {
'success': len(failed) == 0,
'migration_id': migration_id,
'message': f"Migration completed: {len(successful)}/{total_objects} objects migrated",
'stats': {
'total_objects': total_objects,
'migrated': len(successful),
'failed': len(failed),
'total_size': total_size,
'migrated_size': total_bytes
},
'successful': successful[:100], # First 100 only
'failed': failed[:100], # First 100 only
'logs': logs
}
except Exception as e:
log(f"❌ Migration failed: {str(e)}", 'error')
return {
'success': False,
'migration_id': migration_id,
'error': str(e),
'logs': logs
}
# ==================== Migration Status ====================
def get_migration_status(self, migration_id):
"""Get status of a migration"""
with self._lock:
return self.migrations.get(migration_id)
"""List all migrations"""
with self._lock:
migrations_list = []
for mig_id, mig_data in self.migrations.items():
if isinstance(mig_data, dict) and 'success' in mig_data:
status = 'completed' if mig_data['success'] else 'failed'
elif isinstance(mig_data, dict) and mig_data.get('status') == 'running':
status = 'running'
else:
status = 'unknown'
migrations_list.append({
'id': mig_id,
'status': status,
'started_at': mig_data.get('started_at') if isinstance(mig_data, dict) else None,
'source_bucket': mig_data.get('source_bucket') if isinstance(mig_data, dict) else None,
'dest_bucket': mig_data.get('dest_bucket') if isinstance(mig_data, dict) else None,
'data': mig_data
})
# Sort by started_at descending
migrations_list.sort(key=lambda x: x.get('started_at', 0), reverse=True)
return migrations_list
def list_migrations(self):
"""List all migrations"""
with self._lock:
migrations_list = []
for mig_id, mig_data in self.migrations.items():
if isinstance(mig_data, dict) and 'success' in mig_data:
status = 'completed' if mig_data['success'] else 'failed'
elif isinstance(mig_data, dict) and mig_data.get('status') == 'running':
status = 'running'
else:
status = 'unknown'
migrations_list.append({
'id': mig_id,
'status': status,
'started_at': mig_data.get('started_at') if isinstance(mig_data, dict) else None,
'source_bucket': mig_data.get('source_bucket') if isinstance(mig_data, dict) else None,
'dest_bucket': mig_data.get('dest_bucket') if isinstance(mig_data, dict) else None,
'data': mig_data
})
# Sort by started_at descending
migrations_list.sort(key=lambda x: x.get('started_at', 0), reverse=True)
return migrations_list
def cancel_migration(self, migration_id):
"""Cancel a running migration"""
with self._lock:
if migration_id in self.migrations:
mig_data = self.migrations[migration_id]
if isinstance(mig_data, dict) and mig_data.get('status') == 'running':
mig_data['status'] = 'cancelled'
mig_data['cancelled_at'] = time.time()
return {'success': True, 'message': 'Migration cancelled'}
return {'success': False, 'error': 'Migration not found or not running'}
# ==================== Helper Functions ====================
def _format_size(self, size_bytes):
"""Format file size in human-readable format"""
if size_bytes == 0:
return '0 B'
size_names = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.2f} {size_names[i]}"
def parse_s3_uri(self, s3_uri):
"""Parse S3 URI into bucket and key"""
try:
if not s3_uri.startswith('s3://'):
s3_uri = 's3://' + s3_uri
parsed = urlparse(s3_uri)
bucket = parsed.netloc
key = parsed.path.lstrip('/') if parsed.path else ''
return {
'bucket': bucket,
'key': key,
'uri': s3_uri
}
except Exception as e:
return None
def generate_presigned_url(self, bucket, key, expiration=3600, is_source=True,
access_key_id=None, secret_access_key=None,
region='us-east-1', endpoint_url=None,
session_token=None):
"""Generate presigned URL for temporary access"""
try:
if is_source:
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
else:
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
if not client:
return {'success': False, 'error': 'Failed to create S3 client'}
url = client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket, 'Key': key},
ExpiresIn=expiration
)
return {
'success': True,
'url': url,
'expires_in': expiration,
'bucket': bucket,
'key': key
}
except Exception as e:
return {'success': False, 'error': str(e)}
# ============================================================================
# Part 3: PostgreSQL to S3 Migrator (PostgresToS3Migrator)
# ============================================================================
class PostgresToS3Migrator:
"""PostgreSQL to S3 migration engine"""
def __init__(self):
self.migrations = {}
self._lock = threading.Lock()
self.s3_clients = {}
def parse_postgres_uri(self, uri):
"""Parse PostgreSQL URI and extract connection parameters"""
try:
if not uri.startswith(('postgresql://', 'postgres://')):
uri = 'postgresql://' + uri
parsed = urlparse(uri)
return {
'host': parsed.hostname,
'port': parsed.port or 5432,
'user': parsed.username or '',
'password': parsed.password or '',
'database': parsed.path.lstrip('/') if parsed.path else 'postgres',
'uri': uri
}
except Exception as e:
return None
def parse_uri_to_env(self, uri, prefix=''):
"""Parse URI to environment variables format"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return None
env_vars = {
f'{prefix}DB_HOST': parsed['host'],
f'{prefix}DB_PORT': str(parsed['port']),
f'{prefix}DB_USER': parsed['user'],
f'{prefix}DB_PASSWORD': parsed['password'],
f'{prefix}DB_NAME': parsed['database'],
f'{prefix}DB_URI': parsed['uri']
}
# Create connection string variants
env_vars[f'{prefix}DB_CONNECTION_STRING'] = f"host={parsed['host']} port={parsed['port']} dbname={parsed['database']} user={parsed['user']} password={parsed['password']}"
env_vars[f'{prefix}DB_URL'] = parsed['uri']
return env_vars
def get_live_migration_progress(self, migration_id):
"""الحصول على التقدم المباشر للترحيل من PostgreSQL إلى S3"""
with self._lock:
if migration_id not in self.migrations:
return {'success': False, 'error': 'Migration not found'}
mig_data = self.migrations[migration_id]
if mig_data.get('status') != 'running':
return self._get_completed_migration_stats(migration_id)
if 'progress_tracker' in mig_data:
return self._get_live_tracker_progress(migration_id)
return self._get_estimated_progress(migration_id)
def _get_live_tracker_progress(self, migration_id):
"""الحصول على التقدم من المتتبع المباشر"""
mig_data = self.migrations[migration_id]
tracker = mig_data['progress_tracker']
current_time = time.time()
elapsed_time = current_time - tracker['start_time']
# حساب النسب المئوية
tables_percentage = (tracker['processed_tables'] / tracker['total_tables'] * 100) if tracker['total_tables'] > 0 else 0
size_percentage = (tracker['processed_size'] / tracker['total_size'] * 100) if tracker['total_size'] > 0 else 0
rows_percentage = (tracker['processed_rows'] / tracker['total_rows'] * 100) if tracker['total_rows'] > 0 else 0
# حساب السرعة الحالية
recent_samples = [s for s in tracker.get('speed_samples', []) if current_time - s['time'] < 10]
if recent_samples:
recent_size = sum(s['size'] for s in recent_samples)
recent_time = recent_samples[-1]['time'] - recent_samples[0]['time'] if len(recent_samples) > 1 else 1
current_speed = recent_size / recent_time if recent_time > 0 else 0
else:
current_speed = tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0
# ETA بناءً على السرعة الحالية
remaining_size = tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0)
eta_seconds = remaining_size / current_speed if current_speed > 0 else 0
# تفاصيل الجدول الحالي
current_table_info = None
if tracker.get('current_table'):
ct = tracker['current_table']
current_table_info = {
'name': ct['name'],
'size': ct.get('size', 0),
'size_formatted': self._format_size(ct.get('size', 0)),
'rows': ct.get('rows', 0),
'rows_processed': ct.get('rows_processed', 0),
'rows_percentage': round((ct.get('rows_processed', 0) / max(ct.get('rows', 1), 1)) * 100, 2),
'stage': ct.get('stage', 'querying'), # querying, compressing, uploading
'progress': ct.get('progress', 0),
'elapsed': current_time - ct.get('start_time', current_time),
'elapsed_formatted': self._format_time(current_time - ct.get('start_time', current_time))
}
# تجميع النتائج
return {
'success': True,
'migration_id': migration_id,
'status': 'running',
'type': 'live',
'timestamp': current_time,
'total': {
'tables': tracker['total_tables'],
'size': tracker['total_size'],
'size_formatted': self._format_size(tracker['total_size']),
'rows': tracker.get('total_rows', 0)
},
'processed': {
'tables': tracker['processed_tables'],
'size': tracker['processed_size'],
'size_formatted': self._format_size(tracker['processed_size']),
'rows': tracker.get('processed_rows', 0)
},
'failed': {
'tables': tracker.get('failed_tables', 0),
'size': tracker.get('failed_size', 0),
'size_formatted': self._format_size(tracker.get('failed_size', 0)),
'rows': tracker.get('failed_rows', 0)
},
'remaining': {
'tables': tracker['total_tables'] - tracker['processed_tables'] - tracker.get('failed_tables', 0),
'size': tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0),
'size_formatted': self._format_size(tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0))
},
'percentages': {
'tables': round(tables_percentage, 2),
'size': round(size_percentage, 2),
'rows': round(rows_percentage, 2) if tracker.get('total_rows', 0) > 0 else 0
},
'time': {
'elapsed': elapsed_time,
'elapsed_formatted': self._format_time(elapsed_time),
'eta': eta_seconds,
'eta_formatted': self._format_time(eta_seconds)
},
'speed': {
'current': current_speed,
'current_formatted': f"{self._format_size(current_speed)}/s",
'average': tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0,
'average_formatted': f"{self._format_size(tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0)}/s",
'peak': max((s['size'] for s in tracker.get('speed_samples', [])), default=0),
'peak_formatted': self._format_size(max((s['size'] for s in tracker.get('speed_samples', [])), default=0))
},
'current_table': current_table_info,
'recent_exports': tracker.get('tables_details', [])[-5:],
's3_info': {
'bucket': mig_data.get('s3_bucket'),
'prefix': mig_data.get('s3_prefix'),
'region': mig_data.get('region', 'us-east-1'),
'endpoint': mig_data.get('endpoint_url')
},
'format': {
'type': tracker.get('format', 'csv'),
'compressed': tracker.get('compress', True)
},
'progress_bars': {
'tables': self._format_progress_bar(tables_percentage),
'size': self._format_progress_bar(size_percentage),
'rows': self._format_progress_bar(rows_percentage) if tracker.get('total_rows', 0) > 0 else None
}
}
def _get_estimated_progress(self, migration_id):
"""تقدير التقدم للترحيل الجاري"""
mig_data = self.migrations[migration_id]
elapsed_time = time.time() - mig_data['started_at']
# تقدير عدد وحجم الجداول
estimated_total_tables = mig_data.get('estimated_tables', 10)
estimated_total_size = mig_data.get('estimated_size', 100 * 1024 * 1024)
# تقدير التقدم (افترض 5MB/s كسرعة متوسطة)
estimated_duration = estimated_total_size / (5 * 1024 * 1024)
progress_percentage = min((elapsed_time / estimated_duration) * 100, 99) if estimated_duration > 0 else 0
tables_exported = int(estimated_total_tables * progress_percentage / 100)
size_exported = int(estimated_total_size * progress_percentage / 100)
return {
'success': True,
'migration_id': migration_id,
'status': 'running',
'type': 'estimated',
'timestamp': time.time(),
'estimated': True,
'total': {
'tables': estimated_total_tables,
'size': estimated_total_size,
'size_formatted': self._format_size(estimated_total_size)
},
'processed': {
'tables': tables_exported,
'size': size_exported,
'size_formatted': self._format_size(size_exported)
},
'percentages': {
'tables': round(progress_percentage, 2),
'size': round(progress_percentage, 2)
},
'time': {
'elapsed': elapsed_time,
'elapsed_formatted': self._format_time(elapsed_time)
},
'progress_bars': {
'main': self._format_progress_bar(progress_percentage)
},
'note': 'تقدير تقريبي - يتم جمع معلومات دقيقة عن الجداول...'
}
def _get_completed_migration_stats(self, migration_id):
"""الحصول على إحصائيات الترحيل المكتمل"""
mig_data = self.migrations[migration_id]
stats = mig_data.get('stats', {})
successful_exports = mig_data.get('successful_exports', [])
failed_exports = mig_data.get('failed_exports', [])
total_tables = stats.get('total_tables', 0)
successful = stats.get('successful_exports', 0)
failed = stats.get('failed_exports', 0)
total_size = stats.get('total_size', 0)
execution_time = mig_data.get('execution_time', time.time() - mig_data.get('started_at', time.time()))
return {
'success': True,
'migration_id': migration_id,
'status': 'completed' if mig_data.get('success') else 'failed',
'type': 'completed',
'timestamp': time.time(),
'total': {
'tables': total_tables,
'size': total_size,
'size_formatted': self._format_size(total_size)
},
'processed': {
'tables': successful,
'size': total_size,
'size_formatted': self._format_size(total_size)
},
'failed': {
'tables': failed,
'size': 0,
'size_formatted': '0 B'
},
'percentages': {
'tables': 100.0 if successful == total_tables else round((successful / total_tables * 100) if total_tables > 0 else 0, 2),
'size': 100.0
},
'time': {
'total': execution_time,
'total_formatted': self._format_time(execution_time),
'average_speed': total_size / execution_time if execution_time > 0 else 0,
'average_speed_formatted': f"{self._format_size(total_size / execution_time if execution_time > 0 else 0)}/s"
},
'successful_exports': successful_exports[-10:],
'failed_exports': failed_exports,
's3_config': mig_data.get('s3_config', {})
}
def start_migration_with_progress(self, migration_id, postgres_uri, s3_bucket, s3_prefix='',
schemas=None, tables=None, compress=True, format='csv',
access_key_id=None, secret_access_key=None,
region='us-east-1', endpoint_url=None):
"""بدء الترحيل مع تتبع التقدم المباشر"""
# تهيئة متتبع التقدم
self._init_progress_tracker(migration_id, postgres_uri, schemas, tables,
compress, format, region, endpoint_url)
def migration_task():
try:
result = self._run_migration_with_progress(
migration_id, postgres_uri, s3_bucket, s3_prefix,
schemas, tables, compress, format,
access_key_id, secret_access_key, region, endpoint_url
)
with self._lock:
self.migrations[migration_id].update(result)
self.migrations[migration_id]['status'] = 'completed' if result['success'] else 'failed'
self.migrations[migration_id]['completed_at'] = time.time()
except Exception as e:
with self._lock:
self.migrations[migration_id]['status'] = 'failed'
self.migrations[migration_id]['error'] = str(e)
thread = threading.Thread(target=migration_task, daemon=True)
thread.start()
return migration_id
def _init_progress_tracker(self, migration_id, postgres_uri, schemas, tables, compress, format, region, endpoint_url):
"""تهيئة متتبع التقدم بالبيانات الفعلية"""
# الحصول على قائمة الجداول وأحجامها
tables_list = []
total_size = 0
total_rows = 0
if tables:
for table in tables:
if '.' in table:
schema, name = table.split('.', 1)
else:
schema = 'public'
name = table
# تقدير حجم الجدول (صعب مع PostgreSQL)
size = 10 * 1024 * 1024 # تقدير افتراضي 10MB
rows = self._get_table_row_count(postgres_uri, schema, name) or 0
tables_list.append({
'schema': schema,
'name': name,
'size': size,
'rows': rows,
'status': 'pending'
})
total_size += size
total_rows += rows
elif schemas:
for schema in schemas:
result = self.get_tables(postgres_uri, schema)
if result['success']:
for table_info in result['tables']:
rows = self._get_table_row_count(postgres_uri, schema, table_info['name']) or 0
tables_list.append({
'schema': schema,
'name': table_info['name'],
'size': 10 * 1024 * 1024, # تقدير افتراضي
'rows': rows,
'status': 'pending'
})
total_size += 10 * 1024 * 1024
total_rows += rows
else:
result = self.get_tables(postgres_uri)
if result['success']:
for table_info in result['tables']:
rows = self._get_table_row_count(postgres_uri, table_info['schema'], table_info['name']) or 0
tables_list.append({
'schema': table_info['schema'],
'name': table_info['name'],
'size': 10 * 1024 * 1024, # تقدير افتراضي
'rows': rows,
'status': 'pending'
})
total_size += 10 * 1024 * 1024
total_rows += rows
with self._lock:
self.migrations[migration_id] = {
'status': 'running',
'started_at': time.time(),
'postgres_uri': postgres_uri,
's3_bucket': s3_bucket,
's3_prefix': s3_prefix,
'region': region,
'endpoint_url': endpoint_url,
'estimated_tables': len(tables_list),
'estimated_size': total_size,
'progress_tracker': {
'total_tables': len(tables_list),
'total_size': total_size,
'total_rows': total_rows,
'processed_tables': 0,
'processed_size': 0,
'processed_rows': 0,
'failed_tables': 0,
'failed_size': 0,
'failed_rows': 0,
'current_table': None,
'start_time': time.time(),
'last_update': time.time(),
'tables_list': tables_list,
'tables_details': [],
'speed_samples': [],
'format': format,
'compress': compress
}
}
def _run_migration_with_progress(self, migration_id, postgres_uri, s3_bucket, s3_prefix,
schemas, tables, compress, format,
access_key_id, secret_access_key, region, endpoint_url):
"""تشغيل الترحيل مع تتبع التقدم"""
successful_exports = []
failed_exports = []
with self._lock:
tracker = self.migrations[migration_id]['progress_tracker']
tables_list = tracker['tables_list']
for i, table_info in enumerate(tables_list, 1):
table_name = f"{table_info['schema']}.{table_info['name']}"
# تحديث معلومات الجدول الحالي
current_table = {
'name': table_name,
'size': table_info['size'],
'rows': table_info['rows'],
'rows_processed': 0,
'stage': 'querying',
'progress': 0,
'start_time': time.time()
}
with self._lock:
self.migrations[migration_id]['progress_tracker']['current_table'] = current_table
print(f"[{migration_id}] 📊 Exporting table {i}/{len(tables_list)}: {table_name}")
# تصدير الجدول مع تتبع التقدم
result = self._export_table_with_progress(
migration_id, postgres_uri,
table_info['schema'], table_info['name'],
s3_bucket, s3_prefix, compress, format,
access_key_id, secret_access_key, region, endpoint_url,
table_info['rows']
)
if result['success']:
# تحديث الإحصائيات
with self._lock:
tracker = self.migrations[migration_id]['progress_tracker']
tracker['processed_tables'] += 1
tracker['processed_size'] += result['file_size']
tracker['processed_rows'] += result.get('rows_exported', 0)
# إضافة تفاصيل التصدير
tracker['tables_details'].append({
'name': table_name,
'size': result['file_size'],
'size_formatted': self._format_size(result['file_size']),
'rows': result.get('rows_exported', 0),
'query_time': result.get('query_time', 0),
'upload_time': result.get('upload_time', 0),
'total_time': result.get('query_time', 0) + result.get('upload_time', 0),
'speed': result['file_size'] / (result.get('upload_time', 1) or 1),
'compressed': result.get('compressed', False),
'format': format,
'timestamp': time.time()
})
# إضافة عينة سرعة
tracker['speed_samples'].append({
'time': time.time(),
'size': result['file_size'],
'tables': 1
})
successful_exports.append({
'table': table_name,
's3_key': result['s3_key'],
'size': result['file_size'],
's3_url': result['s3_url']
})
print(f"[{migration_id}] ✅ Exported {table_name} - {self._format_size(result['file_size'])}")
else:
# تحديث الإحصائيات الفاشلة
with self._lock:
tracker = self.migrations[migration_id]['progress_tracker']
tracker['failed_tables'] += 1
tracker['failed_size'] += table_info['size']
failed_exports.append({
'table': table_name,
'error': result.get('error', 'Unknown error')
})
print(f"[{migration_id}] ❌ Failed {table_name}: {result.get('error')}")
# حساب الإحصائيات النهائية
total_size = sum(e['size'] for e in successful_exports)
return {
'success': len(failed_exports) == 0,
'migration_id': migration_id,
'execution_time': time.time() - self.migrations[migration_id]['started_at'],
'stats': {
'total_tables': len(tables_list),
'successful_exports': len(successful_exports),
'failed_exports': len(failed_exports),
'total_size': total_size
},
'successful_exports': successful_exports,
'failed_exports': failed_exports,
's3_config': {
'bucket': s3_bucket,
'prefix': s3_prefix,
'region': region,
'endpoint': endpoint_url
}
}
def _export_table_with_progress(self, migration_id, postgres_uri, schema, table,
s3_bucket, s3_prefix, compress, format,
access_key_id, secret_access_key, region, endpoint_url,
estimated_rows=0):
"""تصدير جدول مع تتبع التقدم"""
try:
start_time = time.time()
# تحديث المرحلة: الاستعلام
self._update_table_stage(migration_id, 'querying', 10)
# تصدير الجدول
result = self.export_table_to_s3(
postgres_uri=postgres_uri,
schema=schema,
table=table,
s3_bucket=s3_bucket,
s3_key=f"{s3_prefix}{schema}_{table}_{int(time.time())}.{format}{'.gz' if compress else ''}",
compress=compress,
format=format,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
region=region,
endpoint_url=endpoint_url
)
if result['success']:
result['rows_exported'] = estimated_rows
return result
else:
return result
except Exception as e:
return {'success': False, 'error': str(e)}
def _update_table_stage(self, migration_id, stage, progress):
"""تحديث مرحلة تصدير الجدول الحالي"""
with self._lock:
if migration_id in self.migrations:
tracker = self.migrations[migration_id].get('progress_tracker')
if tracker and tracker.get('current_table'):
tracker['current_table']['stage'] = stage
tracker['current_table']['progress'] = progress
tracker['last_update'] = time.time()
def _get_table_row_count(self, uri, schema, table):
"""الحصول على عدد صفوف الجدول"""
try:
parsed = self.parse_postgres_uri(uri)
if not parsed:
return None
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
sql = f'SELECT COUNT(*) FROM "{schema}"."{table}";'
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
count_line = result.stdout.strip()
if count_line and count_line.isdigit():
return int(count_line)
return None
except Exception:
return None
def format_live_status(self, migration_id):
"""تنسيق الحالة المباشرة كنص مقروء"""
progress = self.get_live_migration_progress(migration_id)
if not progress.get('success', True):
return f"❌ Error: {progress.get('error', 'Unknown error')}"
lines = []
lines.append("=" * 70)
lines.append(f"🚀 PostgreSQL to S3 Migration {migration_id} - {progress['status'].upper()}")
lines.append("=" * 70)
if progress.get('type') == 'estimated':
lines.append("⚠️ تقدير تقريبي - يتم جمع معلومات دقيقة...")
# معلومات S3
if 's3_info' in progress:
s3 = progress['s3_info']
lines.append(f"\n📦 S3 Destination: s3://{s3['bucket']}/{s3['prefix']}")
lines.append(f" Region: {s3['region']}")
# شريط التقدم الرئيسي
if 'progress_bars' in progress:
if 'size' in progress['progress_bars']:
lines.append(f"\n📊 Progress: {progress['progress_bars']['size']}")
# إحصائيات الجداول
lines.append(f"\n📋 Tables:")
lines.append(f" Total: {progress['total']['tables']} tables")
lines.append(f" Exported: {progress['processed']['tables']} tables ({progress['percentages']['tables']}%)")
if progress.get('failed', {}).get('tables', 0) > 0:
lines.append(f" ❌ Failed: {progress['failed']['tables']} tables")
# إحصائيات الحجم
lines.append(f"\n💾 Size:")
lines.append(f" Total: {progress['total']['size_formatted']}")
lines.append(f" Exported: {progress['processed']['size_formatted']} ({progress['percentages']['size']}%)")
if progress.get('remaining', {}).get('size', 0) > 0:
lines.append(f" Remaining: {progress['remaining']['size_formatted']}")
# معلومات الصفوف
if progress['total'].get('rows', 0) > 0:
lines.append(f"\n📊 Rows:")
lines.append(f" Total: {progress['total']['rows']:,} rows")
lines.append(f" Exported: {progress['processed']['rows']:,} rows ({progress['percentages']['rows']}%)")
# معلومات السرعة والوقت
lines.append(f"\n⏱️ Time:")
lines.append(f" Elapsed: {progress['time']['elapsed_formatted']}")
if 'eta' in progress['time']:
lines.append(f" ETA: {progress['time']['eta_formatted']}")
lines.append(f"\n⚡ Speed:")
lines.append(f" Current: {progress['speed']['current_formatted']}")
lines.append(f" Average: {progress['speed']['average_formatted']}")
# الجدول الحالي
if progress.get('current_table'):
ct = progress['current_table']
lines.append(f"\n📄 Current Table: {ct['name']}")
lines.append(f" Size: {ct['size_formatted']}")
lines.append(f" Stage: {ct['stage'].upper()}")
if ct.get('rows', 0) > 0:
lines.append(f" Rows: {ct['rows_processed']}/{ct['rows']} ({ct['rows_percentage']}%)")
lines.append(f" Elapsed: {ct['elapsed_formatted']}")
# آخر التصديرات الناجحة
if progress.get('recent_exports'):
lines.append(f"\n✅ Recent Exports:")
for exp in progress['recent_exports'][-3:]:
lines.append(f"{exp['name']} - {exp['size_formatted']}")
lines.append("\n" + "=" * 70)
return '\n'.join(lines)
def _format_size(self, size_bytes):
"""تنسيق حجم الملف"""
if size_bytes == 0:
return '0 B'
size_names = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.2f} {size_names[i]}"
def _format_time(self, seconds):
"""تنسيق الوقت"""
if seconds < 60:
return f"{seconds:.0f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
elif seconds < 86400:
hours = seconds / 3600
return f"{hours:.1f}h"
else:
days = seconds / 86400
return f"{days:.1f}d"
def _format_progress_bar(self, percentage, width=30):
"""تنسيق شريط التقدم كنص"""
filled = int(width * percentage / 100)
empty = width - filled
bar = '' * filled + '' * empty
return f"[{bar}] {percentage:.1f}%"
def get_s3_client(self, access_key_id, secret_access_key, region='us-east-1', endpoint_url=None):
"""Get or create S3 client with given credentials"""
cache_key = f"{access_key_id}:{secret_access_key}:{region}:{endpoint_url}"
if cache_key in self.s3_clients:
return self.s3_clients[cache_key]
try:
# Create S3 client configuration
s3_config = {
'aws_access_key_id': access_key_id,
'aws_secret_access_key': secret_access_key,
'region_name': region
}
# Add endpoint if provided (for S3-compatible services like MinIO)
if endpoint_url:
s3_config['endpoint_url'] = endpoint_url
# Create S3 client
client = boto3.client('s3', **s3_config)
# Cache the client
self.s3_clients[cache_key] = client
return client
except Exception as e:
print(f"❌ Failed to create S3 client: {str(e)}")
return None
def test_s3_connection(self, access_key_id, secret_access_key, region='us-east-1', endpoint_url=None):
"""Test S3 connection with the four parameters"""
# التحقق من بيانات الاعتماد
if not access_key_id or not secret_access_key:
print(f"❌ AWS credentials are required")
print(f" 🔸 Access Key ID: {'Missing' if not access_key_id else 'Provided'}")
print(f" 🔸 Secret Key: {'Missing' if not secret_access_key else 'Provided'}")
return {
'success': False,
'error': 'AWS credentials are required',
'details': {
'access_key_id_provided': bool(access_key_id),
'secret_key_provided': bool(secret_access_key)
}
}
print(f"🔍 Testing S3 connection...")
print(f" 🔸 Access Key ID: {access_key_id[:10]}...")
print(f" 🔸 Secret Key: {secret_access_key[:10]}...")
print(f" 🔸 Region: {region}")
print(f" 🔸 Endpoint URL: {endpoint_url or 'AWS S3 (default)'}")
try:
# Get S3 client
s3_client = self.get_s3_client(access_key_id, secret_access_key, region, endpoint_url)
if not s3_client:
return {
'success': False,
'error': 'Failed to create S3 client'
}
# Test connection by listing buckets
start_time = time.time()
response = s3_client.list_buckets()
end_time = time.time()
response_time = end_time - start_time
buckets = [bucket['Name'] for bucket in response.get('Buckets', [])]
print(f"✅ S3 Connection Successful!")
print(f" 🔸 Response time: {response_time:.2f} seconds")
print(f" 🔸 Available buckets: {len(buckets)}")
if buckets:
print(f" 🔸 Buckets: {', '.join(buckets[:5])}{'...' if len(buckets) > 5 else ''}")
return {
'success': True,
'message': 'S3 connection successful',
'buckets': buckets,
'bucket_count': len(buckets),
'response_time': response_time,
'connection_details': {
'region': region,
'endpoint': endpoint_url or 'AWS S3 (default)',
'has_credentials': bool(access_key_id and secret_access_key)
}
}
except NoCredentialsError:
error_msg = "No credentials provided or credentials are invalid"
print(f"{error_msg}")
return {
'success': False,
'error': error_msg
}
except EndpointConnectionError as e:
error_msg = f"Cannot connect to endpoint: {endpoint_url}"
print(f"{error_msg}")
return {
'success': False,
'error': error_msg,
'details': str(e)
}
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
print(f"❌ AWS Error: {error_code} - {error_msg}")
return {
'success': False,
'error': f"{error_code}: {error_msg}",
'aws_error': error_code
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
print(f"{error_msg}")
return {
'success': False,
'error': error_msg
}
def list_s3_buckets(self, access_key_id, secret_access_key, region='us-east-1', endpoint_url=None):
"""List all S3 buckets with details"""
try:
s3_client = self.get_s3_client(access_key_id, secret_access_key, region, endpoint_url)
if not s3_client:
return {
'success': False,
'error': 'Failed to create S3 client'
}
response = s3_client.list_buckets()
buckets_info = []
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
creation_date = bucket['CreationDate']
# Get bucket location (region)
try:
location = s3_client.get_bucket_location(Bucket=bucket_name)
bucket_region = location.get('LocationConstraint', 'us-east-1')
if not bucket_region:
bucket_region = 'us-east-1'
except:
bucket_region = region
buckets_info.append({
'name': bucket_name,
'creation_date': creation_date.isoformat() if hasattr(creation_date, 'isoformat') else str(creation_date),
'region': bucket_region
})
return {
'success': True,
'buckets': buckets_info,
'count': len(buckets_info)
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def test_postgres_connection(self, uri_input):
"""Test PostgreSQL connection with detailed error info"""
if isinstance(uri_input, dict):
print(f"⚠️ Received JSON object, extracting URI string...")
if 'uri' in uri_input:
uri = uri_input['uri']
print(f"✅ Extracted URI from 'uri' key")
elif 'url' in uri_input:
uri = uri_input['url']
print(f"✅ Extracted URI from 'url' key")
else:
error_msg = f'JSON must contain "uri" or "url" key. Received keys: {list(uri_input.keys())}'
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
else:
uri = uri_input
if not isinstance(uri, str):
error_msg = f'URI must be a string. Got type: {type(uri)}'
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
print(f"🔍 Testing PostgreSQL connection to: {uri}")
parsed = self.parse_postgres_uri(uri)
if not parsed:
error_msg = f'Invalid URI format: {uri}'
print(f"{error_msg}")
return {'success': False, 'error': error_msg}
host = parsed['host']
port = parsed['port']
user = parsed['user']
database = parsed['database']
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
cmd = [
'psql',
'-h', host,
'-p', str(port),
'-U', user,
'-d', database,
'-c', 'SELECT version(); SELECT current_database();',
'-t'
]
try:
print(f" 🔗 Attempting connection to host: {host}:{port}")
print(f" 👤 User: {user}, Database: {database}")
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
version = lines[0] if len(lines) > 0 else ''
db_name = lines[1] if len(lines) > 1 else ''
print(f"✅ Connection successful to {host}:{port}")
return {
'success': True,
'message': 'Connection successful',
'version': version,
'database': db_name,
'connection': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
else:
error_output = result.stderr.strip()
print(f"❌ Connection FAILED to host: {host}:{port}")
print(f" 🔸 User: {user}")
print(f" 🔸 Database: {database}")
print(f" 🔸 Error details: {error_output[:200]}")
return {
'success': False,
'error': error_output,
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
except subprocess.TimeoutExpired:
print(f"❌ Connection TIMEOUT to host: {host}:{port}")
print(f" 🔸 Server not responding after 10 seconds")
print(f" 🔸 Please check: firewall, network, server status")
return {
'success': False,
'error': 'Connection timeout',
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
except FileNotFoundError:
print(f"❌ PSQL CLIENT NOT FOUND")
print(f" 🔸 The 'psql' command-line tool is not installed")
print(f" 🔸 Install PostgreSQL client tools and try again")
return {
'success': False,
'error': 'psql command not found. Install PostgreSQL client.',
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
except Exception as e:
print(f"❌ UNEXPECTED ERROR connecting to host: {host}:{port}")
print(f" 🔸 Error type: {type(e).__name__}")
print(f" 🔸 Error message: {str(e)}")
return {
'success': False,
'error': str(e),
'connection_details': {
'host': host,
'port': port,
'user': user,
'database': database
}
}
def get_schemas(self, uri):
"""Get list of schemas from PostgreSQL database"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return {'success': False, 'error': 'Invalid URI format'}
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
sql = """
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY schema_name;
"""
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
schemas = [s.strip() for s in result.stdout.splitlines() if s.strip()]
return {
'success': True,
'schemas': schemas,
'count': len(schemas)
}
else:
return {'success': False, 'error': result.stderr.strip()}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_tables(self, uri, schema=''):
"""Get list of tables from PostgreSQL database"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return {'success': False, 'error': 'Invalid URI format'}
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
if schema:
sql = f"""
SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = '{schema}'
ORDER BY table_name;
"""
else:
sql = """
SELECT table_schema, table_name, table_type
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY table_schema, table_name;
"""
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = [line.strip() for line in result.stdout.splitlines() if line.strip()]
tables = []
if schema:
for line in lines:
if '|' in line:
table_name, table_type = line.split('|')
tables.append({
'name': table_name.strip(),
'type': table_type.strip(),
'schema': schema
})
else:
for line in lines:
if '|' in line:
parts = line.split('|')
if len(parts) >= 3:
table_schema, table_name, table_type = parts[:3]
tables.append({
'schema': table_schema.strip(),
'name': table_name.strip(),
'type': table_type.strip()
})
return {
'success': True,
'tables': tables,
'count': len(tables)
}
else:
return {'success': False, 'error': result.stderr.strip()}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_table_counts(self, uri, schema=None):
"""Get row counts for tables"""
parsed = self.parse_postgres_uri(uri)
if not parsed:
return {}
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
if schema:
sql = f"""
SELECT table_name,
(SELECT COUNT(*) FROM "{schema}"."{table_name}") as row_count
FROM information_schema.tables
WHERE table_schema = '{schema}'
ORDER BY table_name;
"""
else:
sql = """
SELECT table_schema, table_name,
(SELECT COUNT(*) FROM information_schema.tables t2
WHERE t2.table_schema = t1.table_schema
AND t2.table_name = t1.table_name) as row_count
FROM information_schema.tables t1
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY table_schema, table_name;
"""
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
table_counts = {}
if result.returncode == 0:
for line in result.stdout.splitlines():
if line.strip() and '|' in line:
parts = line.split('|')
if len(parts) >= 3:
table_schema, table_name, count = parts[:3]
key = f"{table_schema.strip()}.{table_name.strip()}"
table_counts[key] = int(count.strip()) if count.strip().isdigit() else 0
return table_counts
except Exception:
return {}
def export_table_to_s3(self, postgres_uri, schema, table, s3_bucket, s3_key,
compress=True, format='csv',
access_key_id=None, secret_access_key=None,
region='us-east-1', endpoint_url=None):
"""Export a single PostgreSQL table to S3"""
logs = []
def log(msg, level='info'):
log_entry = {
'timestamp': time.time(),
'message': msg,
'level': level
}
logs.append(log_entry)
print(f"[Export] {msg}")
try:
log(f"🔧 Starting export of {schema}.{table} to S3", 'info')
log(f"PostgreSQL: {postgres_uri}", 'info')
log(f"S3 Destination: s3://{s3_bucket}/{s3_key}", 'info')
log(f"Format: {format}, Compress: {compress}", 'info')
# Parse PostgreSQL URI
parsed = self.parse_postgres_uri(postgres_uri)
if not parsed:
return {
'success': False,
'error': 'Invalid PostgreSQL URI',
'logs': logs
}
# Set environment for psql
env = os.environ.copy()
env['PGPASSWORD'] = parsed['password']
# Build SQL command based on format
if format.lower() == 'csv':
sql = f"COPY (SELECT * FROM \"{schema}\".\"{table}\") TO STDOUT WITH CSV HEADER"
else: # json
sql = f"""
SELECT json_agg(row_to_json(t))
FROM (SELECT * FROM "{schema}"."{table}") t
"""
# Build psql command
cmd = [
'psql',
'-h', parsed['host'],
'-p', str(parsed['port']),
'-U', parsed['user'],
'-d', parsed['database'],
'-t',
'-c', sql
]
# Execute query and capture output
log(f"📊 Querying PostgreSQL table...", 'info')
start_time = time.time()
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
error_msg = result.stderr[:500]
log(f"❌ Failed to query table: {error_msg}", 'error')
return {
'success': False,
'error': f"Query failed: {error_msg}",
'logs': logs
}
query_time = time.time() - start_time
log(f"✅ Query completed in {query_time:.2f} seconds", 'success')
# Get data
data = result.stdout
# Prepare data for upload
if format.lower() == 'csv':
file_content = data.encode('utf-8')
content_type = 'text/csv'
file_extension = 'csv'
else: # json
file_content = json.dumps(json.loads(data), indent=2).encode('utf-8') if data.strip() else b'[]'
content_type = 'application/json'
file_extension = 'json'
# Compress if requested
if compress:
log(f"📦 Compressing data...", 'info')
buffer = io.BytesIO()
with gzip.GzipFile(fileobj=buffer, mode='wb') as f:
f.write(file_content)
file_content = buffer.getvalue()
file_extension = f"{file_extension}.gz"
content_type = 'application/gzip'
log(f"✅ Compression complete: {len(data)}{len(file_content)} bytes", 'success')
# Upload to S3
log(f"⬆️ Uploading to S3...", 'info')
# Get S3 client
s3_client = self.get_s3_client(access_key_id, secret_access_key, region, endpoint_url)
if not s3_client:
return {
'success': False,
'error': 'Failed to create S3 client',
'logs': logs
}
# Upload file
upload_start = time.time()
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=file_content,
ContentType=content_type,
Metadata={
'source_schema': schema,
'source_table': table,
'export_time': datetime.utcnow().isoformat(),
'compressed': str(compress),
'format': format
}
)
upload_time = time.time() - upload_start
# Generate S3 URL
if endpoint_url:
s3_url = f"{endpoint_url}/{s3_bucket}/{s3_key}"
else:
s3_url = f"https://{s3_bucket}.s3.{region}.amazonaws.com/{s3_key}"
log(f"✅ Upload completed in {upload_time:.2f} seconds", 'success')
log(f"🔗 S3 URL: {s3_url}", 'info')
return {
'success': True,
'message': f"Exported {schema}.{table} to S3 successfully",
's3_url': s3_url,
's3_bucket': s3_bucket,
's3_key': s3_key,
'file_size': len(file_content),
'original_size': len(data) if compress else None,
'compressed': compress,
'format': format,
'upload_time': upload_time,
'query_time': query_time,
'logs': logs
}
except subprocess.TimeoutExpired:
log("❌ Query timeout (5 minutes)", 'error')
return {
'success': False,
'error': 'Query timeout',
'logs': logs
}
except Exception as e:
log(f"❌ Error: {str(e)}", 'error')
return {
'success': False,
'error': str(e),
'logs': logs
}
def migrate_to_s3(self, migration_id, postgres_uri, s3_bucket, s3_prefix='',
schemas=None, tables=None, compress=True, format='csv',
access_key_id=None, secret_access_key=None,
region='us-east-1', endpoint_url=None):
"""Migrate PostgreSQL data to S3"""
logs = []
def log(msg, level='info'):
log_entry = {
'timestamp': time.time(),
'message': msg,
'level': level
}
logs.append(log_entry)
print(f"[{migration_id}] {msg}")
try:
log(f"🔧 Starting PostgreSQL to S3 Migration {migration_id}", 'info')
log(f"PostgreSQL: {postgres_uri}", 'info')
log(f"S3 Destination: s3://{s3_bucket}/{s3_prefix}", 'info')
log(f"Format: {format}, Compress: {compress}", 'info')
log(f"S3 Region: {region}, Endpoint: {endpoint_url or 'AWS S3 (default)'}", 'info')
# Get tables to export
tables_to_export = []
if tables:
tables_to_export = tables
log(f"📋 Selected tables: {len(tables_to_export)} tables", 'info')
elif schemas:
# Get all tables from selected schemas
for schema in schemas:
result = self.get_tables(postgres_uri, schema)
if result['success']:
for table_info in result['tables']:
tables_to_export.append(f"{table_info['schema']}.{table_info['name']}")
log(f"📋 Selected schemas: {', '.join(schemas)}{len(tables_to_export)} tables", 'info')
else:
# Get all tables
result = self.get_tables(postgres_uri)
if not result['success']:
return {
'success': False,
'migration_id': migration_id,
'error': f"Failed to get tables: {result['error']}",
'logs': logs
}
for table_info in result['tables']:
tables_to_export.append(f"{table_info['schema']}.{table_info['name']}")
log(f"📋 All tables: {len(tables_to_export)} tables", 'info')
if not tables_to_export:
return {
'success': False,
'migration_id': migration_id,
'error': 'No tables to export',
'logs': logs
}
# Export each table
successful_exports = []
failed_exports = []
total_tables = len(tables_to_export)
log(f"🚀 Starting export of {total_tables} tables...", 'info')
for i, table_ref in enumerate(tables_to_export, 1):
if '.' in table_ref:
schema, table = table_ref.split('.', 1)
else:
schema = 'public'
table = table_ref
# Create S3 key
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
extension = 'csv.gz' if compress else 'csv' if format == 'csv' else 'json'
s3_key = f"{s3_prefix}{schema}_{table}_{timestamp}.{extension}" if s3_prefix else f"{schema}_{table}_{timestamp}.{extension}"
log(f"📊 Exporting table {i}/{total_tables}: {schema}.{table}", 'info')
# Export table
result = self.export_table_to_s3(
postgres_uri=postgres_uri,
schema=schema,
table=table,
s3_bucket=s3_bucket,
s3_key=s3_key,
compress=compress,
format=format,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
region=region,
endpoint_url=endpoint_url
)
if result['success']:
successful_exports.append({
'table': f"{schema}.{table}",
's3_key': s3_key,
'size': result.get('file_size', 0),
's3_url': result.get('s3_url')
})
log(f"✅ Successfully exported {schema}.{table}", 'success')
else:
failed_exports.append({
'table': f"{schema}.{table}",
'error': result.get('error', 'Unknown error')
})
log(f"❌ Failed to export {schema}.{table}: {result.get('error')}", 'error')
# Calculate statistics
total_size = sum(export['size'] for export in successful_exports)
log(f"📊 Migration Summary:", 'info')
log(f" ✅ Successful: {len(successful_exports)} tables", 'success')
log(f" ❌ Failed: {len(failed_exports)} tables", 'error' if failed_exports else 'info')
log(f" 📦 Total size: {total_size:,} bytes", 'info')
# Generate S3 configuration
s3_config = {
'access_key_id': access_key_id[:10] + '...' if access_key_id else None,
'secret_access_key': '***' if secret_access_key else None,
'region': region,
'endpoint_url': endpoint_url,
'bucket': s3_bucket,
'prefix': s3_prefix
}
return {
'success': True,
'migration_id': migration_id,
'message': f"Migration completed: {len(successful_exports)}/{total_tables} tables exported",
'stats': {
'total_tables': total_tables,
'successful_exports': len(successful_exports),
'failed_exports': len(failed_exports),
'total_size': total_size
},
'successful_exports': successful_exports,
'failed_exports': failed_exports,
's3_config': s3_config,
'logs': logs
}
except Exception as e:
log(f"❌ Migration failed: {str(e)}", 'error')
return {
'success': False,
'migration_id': migration_id,
'error': str(e),
'logs': logs
}
def start_migration_async(self, migration_id, postgres_uri, s3_bucket, s3_prefix='',
schemas=None, tables=None, compress=True, format='csv',
access_key_id=None, secret_access_key=None,
region='us-east-1', endpoint_url=None):
"""Start migration in background thread"""
def migration_task():
result = self.migrate_to_s3(
migration_id=migration_id,
postgres_uri=postgres_uri,
s3_bucket=s3_bucket,
s3_prefix=s3_prefix,
schemas=schemas,
tables=tables,
compress=compress,
format=format,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
region=region,
endpoint_url=endpoint_url
)
with self._lock:
self.migrations[migration_id] = result
thread = threading.Thread(target=migration_task, daemon=True)
thread.start()
# Initialize migration entry
with self._lock:
self.migrations[migration_id] = {
'status': 'running',
'started_at': time.time(),
'postgres_uri': postgres_uri,
's3_bucket': s3_bucket,
's3_prefix': s3_prefix
}
return migration_id
def get_migration_status(self, migration_id):
"""Get status of a migration"""
with self._lock:
return self.migrations.get(migration_id)
def list_migrations(self):
"""List all migrations"""
with self._lock:
migrations_list = []
for mig_id, mig_data in self.migrations.items():
if isinstance(mig_data, dict) and 'success' in mig_data:
status = 'completed' if mig_data['success'] else 'failed'
else:
status = 'running'
migrations_list.append({
'id': mig_id,
'status': status,
'data': mig_data
})
return migrations_list
# ============================================================================
# Global instances for backward compatibility
# ============================================================================
# Create instances for each migrator type
postgres_migrator = PostgresMigrator()
s3_to_s3_migrator = S3ToS3Migrator()
postgres_to_s3_migrator = PostgresToS3Migrator()
# For backward compatibility with existing code that imports 'migrator'
# You can choose which one to export as 'migrator' based on your needs
# migrator = postgres_migrator # For PostgreSQL to PostgreSQL
# migrator = s3_to_s3_migrator # For S3 to S3
# migrator = postgres_to_s3_migrator # For PostgreSQL to S3
# By default, let's export all three with clear names
__all__ = [
'PostgresMigrator',
'S3ToS3Migrator',
'PostgresToS3Migrator',
'postgres_migrator',
's3_to_s3_migrator',
'postgres_to_s3_migrator'
]