4500 أسطر
193 KiB
Python
4500 أسطر
193 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
migrator_unified.py - Unified Migration Engine
|
|
Combined migrator for PostgreSQL, S3 to S3, and PostgreSQL to S3 migrations
|
|
"""
|
|
|
|
import subprocess
|
|
import signal
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import threading
|
|
import hashlib
|
|
import io
|
|
import csv
|
|
import gzip
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
import boto3
|
|
from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError
|
|
|
|
# ============================================================================
|
|
# Part 1: PostgreSQL Migrator (PostgresMigrator)
|
|
# ============================================================================
|
|
|
|
class PostgresMigrator:
|
|
"""PostgreSQL to PostgreSQL migration engine"""
|
|
|
|
def __init__(self):
|
|
self.migrations = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def parse_postgres_uri(self, uri):
|
|
"""Parse PostgreSQL URI and extract connection parameters"""
|
|
try:
|
|
if not uri.startswith(('postgresql://', 'postgres://')):
|
|
uri = 'postgresql://' + uri
|
|
|
|
parsed = urlparse(uri)
|
|
|
|
return {
|
|
'host': parsed.hostname,
|
|
'port': parsed.port or 5432,
|
|
'user': parsed.username or '',
|
|
'password': parsed.password or '',
|
|
'database': parsed.path.lstrip('/') if parsed.path else 'postgres',
|
|
'uri': uri
|
|
}
|
|
except Exception as e:
|
|
return None
|
|
def get_live_migration_progress(self, migration_id):
|
|
"""الحصول على التقدم المباشر للترحيل مع تفاصيل دقيقة"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return {'success': False, 'error': 'Migration not found'}
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
# إذا كان الترحيل مكتملاً
|
|
if mig_data.get('status') != 'running':
|
|
return self._get_detailed_completed_progress(migration_id)
|
|
|
|
# إذا كان هناك متتبع تقدم نشط
|
|
if 'progress_tracker' in mig_data:
|
|
return self._get_live_tracker_progress(migration_id)
|
|
|
|
# إذا كان الترحيل قيد التشغيل ولكن بدون متتبع (تقديري)
|
|
return self._get_estimated_live_progress(migration_id)
|
|
|
|
def _get_live_tracker_progress(self, migration_id):
|
|
"""الحصول على التقدم من المتتبع المباشر"""
|
|
mig_data = self.migrations[migration_id]
|
|
tracker = mig_data['progress_tracker']
|
|
|
|
current_time = time.time()
|
|
elapsed_time = current_time - tracker['start_time']
|
|
|
|
# حساب النسب المئوية
|
|
tables_percentage = (tracker['processed_tables'] / tracker['total_tables'] * 100) if tracker['total_tables'] > 0 else 0
|
|
size_percentage = (tracker['processed_size'] / tracker['total_size'] * 100) if tracker['total_size'] > 0 else 0
|
|
rows_percentage = (tracker['processed_rows'] / tracker['total_rows'] * 100) if tracker['total_rows'] > 0 else 0
|
|
|
|
# حساب السرعة الحالية
|
|
recent_samples = [s for s in tracker.get('speed_samples', []) if current_time - s['time'] < 10]
|
|
if recent_samples:
|
|
recent_size = sum(s['size'] for s in recent_samples)
|
|
recent_time = recent_samples[-1]['time'] - recent_samples[0]['time'] if len(recent_samples) > 1 else 1
|
|
current_speed = recent_size / recent_time if recent_time > 0 else 0
|
|
else:
|
|
current_speed = tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0
|
|
|
|
# ETA بناءً على السرعة الحالية
|
|
remaining_size = tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0)
|
|
eta_seconds = remaining_size / current_speed if current_speed > 0 else 0
|
|
|
|
# تفاصيل الجدول الحالي
|
|
current_table_info = None
|
|
if tracker.get('current_table'):
|
|
current_table_info = {
|
|
'name': tracker['current_table'],
|
|
'size': tracker.get('current_table_size', 0),
|
|
'size_formatted': self._format_size(tracker.get('current_table_size', 0)),
|
|
'rows_total': tracker.get('current_table_rows', 0),
|
|
'rows_processed': tracker.get('current_table_rows_processed', 0),
|
|
'rows_percentage': round((tracker.get('current_table_rows_processed', 0) / tracker.get('current_table_rows', 1)) * 100, 2) if tracker.get('current_table_rows', 0) > 0 else 0,
|
|
'estimated_remaining': tracker.get('current_table_remaining_time', 0)
|
|
}
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'status': 'running',
|
|
'type': 'live',
|
|
'timestamp': current_time,
|
|
|
|
# إحصائيات عامة
|
|
'total': {
|
|
'tables': tracker['total_tables'],
|
|
'size': tracker['total_size'],
|
|
'size_formatted': self._format_size(tracker['total_size']),
|
|
'rows': tracker.get('total_rows', 0)
|
|
},
|
|
|
|
# إحصائيات منجزة
|
|
'processed': {
|
|
'tables': tracker['processed_tables'],
|
|
'size': tracker['processed_size'],
|
|
'size_formatted': self._format_size(tracker['processed_size']),
|
|
'rows': tracker.get('processed_rows', 0)
|
|
},
|
|
|
|
# إحصائيات فاشلة
|
|
'failed': {
|
|
'tables': tracker.get('failed_tables', 0),
|
|
'size': tracker.get('failed_size', 0),
|
|
'size_formatted': self._format_size(tracker.get('failed_size', 0)),
|
|
'rows': tracker.get('failed_rows', 0)
|
|
},
|
|
|
|
# إحصائيات متبقية
|
|
'remaining': {
|
|
'tables': tracker['total_tables'] - tracker['processed_tables'] - tracker.get('failed_tables', 0),
|
|
'size': tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0),
|
|
'size_formatted': self._format_size(tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0))
|
|
},
|
|
|
|
# النسب المئوية
|
|
'percentages': {
|
|
'tables': round(tables_percentage, 2),
|
|
'size': round(size_percentage, 2),
|
|
'rows': round(rows_percentage, 2)
|
|
},
|
|
|
|
# معلومات الوقت والسرعة
|
|
'time': {
|
|
'elapsed': elapsed_time,
|
|
'elapsed_formatted': self._format_time(elapsed_time),
|
|
'eta': eta_seconds,
|
|
'eta_formatted': self._format_time(eta_seconds),
|
|
'estimated_completion': current_time + eta_seconds if eta_seconds > 0 else None
|
|
},
|
|
|
|
'speed': {
|
|
'current': current_speed,
|
|
'current_formatted': f"{self._format_size(current_speed)}/s",
|
|
'average': tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0,
|
|
'average_formatted': f"{self._format_size(tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0)}/s",
|
|
'peak': max((s['size'] for s in tracker.get('speed_samples', [])), default=0),
|
|
'peak_formatted': self._format_size(max((s['size'] for s in tracker.get('speed_samples', [])), default=0))
|
|
},
|
|
|
|
# معلومات الجدول الحالي
|
|
'current_table': current_table_info,
|
|
|
|
# آخر 5 جداول منجزة
|
|
'recent_tables': tracker.get('tables_details', [])[-5:],
|
|
|
|
# أشرطة التقدم
|
|
'progress_bars': {
|
|
'tables': self._format_progress_bar(tables_percentage),
|
|
'size': self._format_progress_bar(size_percentage),
|
|
'rows': self._format_progress_bar(rows_percentage) if tracker.get('total_rows', 0) > 0 else None
|
|
}
|
|
}
|
|
|
|
def _get_estimated_live_progress(self, migration_id):
|
|
"""تقدير التقدم للترحيل الجاري بدون متتبع"""
|
|
mig_data = self.migrations[migration_id]
|
|
elapsed_time = time.time() - mig_data['started_at']
|
|
|
|
# محاولة الحصول على تقديرات أفضل
|
|
estimated_total_tables = mig_data.get('estimated_tables', 10)
|
|
estimated_total_size = mig_data.get('estimated_size', 100 * 1024 * 1024)
|
|
|
|
# تقدير التقدم بناءً على وقت pg_dump المعتاد
|
|
# pg_dump عادة يستغرق 1-5 دقائق لكل GB حسب السرعة
|
|
estimated_duration = estimated_total_size / (5 * 1024 * 1024) # 5 MB/s كتقدير
|
|
progress_percentage = min((elapsed_time / estimated_duration) * 100, 99) if estimated_duration > 0 else 0
|
|
|
|
tables_migrated = int(estimated_total_tables * progress_percentage / 100)
|
|
size_migrated = int(estimated_total_size * progress_percentage / 100)
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'status': 'running',
|
|
'type': 'estimated',
|
|
'timestamp': time.time(),
|
|
'estimated': True,
|
|
|
|
'total': {
|
|
'tables': estimated_total_tables,
|
|
'size': estimated_total_size,
|
|
'size_formatted': self._format_size(estimated_total_size)
|
|
},
|
|
|
|
'processed': {
|
|
'tables': tables_migrated,
|
|
'size': size_migrated,
|
|
'size_formatted': self._format_size(size_migrated)
|
|
},
|
|
|
|
'percentages': {
|
|
'tables': round(progress_percentage, 2),
|
|
'size': round(progress_percentage, 2)
|
|
},
|
|
|
|
'time': {
|
|
'elapsed': elapsed_time,
|
|
'elapsed_formatted': self._format_time(elapsed_time)
|
|
},
|
|
|
|
'progress_bars': {
|
|
'main': self._format_progress_bar(progress_percentage)
|
|
},
|
|
|
|
'note': 'This is an estimated progress. For accurate tracking, use migrate_tables_individually()'
|
|
}
|
|
|
|
def _get_detailed_completed_progress(self, migration_id):
|
|
"""الحصول على تفاصيل دقيقة للترحيل المكتمل"""
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if 'success' not in mig_data:
|
|
return {'success': False, 'error': 'Migration not completed'}
|
|
|
|
stats = mig_data.get('stats', {})
|
|
execution_time = mig_data.get('execution_time', 0)
|
|
|
|
# إذا كان هناك متتبع تقدم، استخدم بياناته
|
|
if 'progress_tracker' in mig_data:
|
|
tracker = mig_data['progress_tracker']
|
|
total_size = tracker.get('total_size', 0)
|
|
processed_size = tracker.get('processed_size', 0)
|
|
total_tables = tracker.get('total_tables', 0)
|
|
processed_tables = tracker.get('processed_tables', 0)
|
|
tables_details = tracker.get('tables_details', [])
|
|
else:
|
|
# استخدام التقديرات
|
|
total_tables = stats.get('tables_created', 0) + stats.get('tables_altered', 0)
|
|
processed_tables = stats.get('tables_created', 0)
|
|
estimated_size_per_table = 10 * 1024 * 1024
|
|
total_size = total_tables * estimated_size_per_table
|
|
processed_size = processed_tables * estimated_size_per_table
|
|
tables_details = []
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'status': 'completed' if mig_data.get('success') else 'failed',
|
|
'type': 'completed',
|
|
'timestamp': time.time(),
|
|
|
|
'total': {
|
|
'tables': total_tables,
|
|
'size': total_size,
|
|
'size_formatted': self._format_size(total_size)
|
|
},
|
|
|
|
'processed': {
|
|
'tables': processed_tables,
|
|
'size': processed_size,
|
|
'size_formatted': self._format_size(processed_size),
|
|
'tables_created': stats.get('tables_created', 0),
|
|
'tables_altered': stats.get('tables_altered', 0),
|
|
'indexes_created': stats.get('indexes_created', 0)
|
|
},
|
|
|
|
'percentages': {
|
|
'tables': 100.0 if processed_tables == total_tables else round((processed_tables / total_tables * 100) if total_tables > 0 else 0, 2),
|
|
'size': 100.0 if processed_size == total_size else round((processed_size / total_size * 100) if total_size > 0 else 0, 2)
|
|
},
|
|
|
|
'time': {
|
|
'total': execution_time,
|
|
'total_formatted': self._format_time(execution_time),
|
|
'average_speed': processed_size / execution_time if execution_time > 0 else 0,
|
|
'average_speed_formatted': f"{self._format_size(processed_size / execution_time if execution_time > 0 else 0)}/s"
|
|
},
|
|
|
|
'tables_details': tables_details[-10:], # آخر 10 جداول
|
|
|
|
'error': mig_data.get('error') if not mig_data.get('success') else None
|
|
}
|
|
|
|
def migrate_tables_individually(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
|
|
"""ترحيل الجداول بشكل فردي مع تتبع دقيق للتقدم"""
|
|
|
|
# تهيئة متتبع التقدم
|
|
self._init_detailed_tracker(migration_id, source_uri, schemas, tables)
|
|
|
|
def migration_task():
|
|
try:
|
|
result = self._run_individual_migration(migration_id, source_uri, dest_uri, schemas, tables)
|
|
with self._lock:
|
|
self.migrations[migration_id].update(result)
|
|
self.migrations[migration_id]['status'] = 'completed' if result['success'] else 'failed'
|
|
self.migrations[migration_id]['completed_at'] = time.time()
|
|
except Exception as e:
|
|
with self._lock:
|
|
self.migrations[migration_id]['status'] = 'failed'
|
|
self.migrations[migration_id]['error'] = str(e)
|
|
|
|
thread = threading.Thread(target=migration_task, daemon=True)
|
|
thread.start()
|
|
|
|
return migration_id
|
|
|
|
def _init_detailed_tracker(self, migration_id, source_uri, schemas=None, tables=None):
|
|
"""تهيئة متتبع تفصيلي مع بيانات دقيقة"""
|
|
|
|
# الحصول على قائمة الجداول وأحجامها
|
|
tables_list = []
|
|
total_size = 0
|
|
total_rows = 0
|
|
|
|
if tables:
|
|
for table in tables:
|
|
if '.' in table:
|
|
schema, name = table.split('.', 1)
|
|
else:
|
|
schema = 'public'
|
|
name = table
|
|
|
|
size = self._get_table_size(source_uri, schema, name) or 0
|
|
rows = self._get_table_row_count(source_uri, schema, name) or 0
|
|
|
|
tables_list.append({
|
|
'schema': schema,
|
|
'name': name,
|
|
'size': size,
|
|
'rows': rows
|
|
})
|
|
total_size += size
|
|
total_rows += rows
|
|
|
|
elif schemas:
|
|
for schema in schemas:
|
|
result = self.get_tables(source_uri, schema)
|
|
if result['success']:
|
|
for table_info in result['tables']:
|
|
size = self._get_table_size(source_uri, schema, table_info['name']) or 0
|
|
rows = self._get_table_row_count(source_uri, schema, table_info['name']) or 0
|
|
|
|
tables_list.append({
|
|
'schema': schema,
|
|
'name': table_info['name'],
|
|
'size': size,
|
|
'rows': rows
|
|
})
|
|
total_size += size
|
|
total_rows += rows
|
|
|
|
else:
|
|
result = self.get_tables(source_uri)
|
|
if result['success']:
|
|
for table_info in result['tables']:
|
|
size = self._get_table_size(source_uri, table_info['schema'], table_info['name']) or 0
|
|
rows = self._get_table_row_count(source_uri, table_info['schema'], table_info['name']) or 0
|
|
|
|
tables_list.append({
|
|
'schema': table_info['schema'],
|
|
'name': table_info['name'],
|
|
'size': size,
|
|
'rows': rows
|
|
})
|
|
total_size += size
|
|
total_rows += rows
|
|
|
|
with self._lock:
|
|
self.migrations[migration_id] = {
|
|
'status': 'running',
|
|
'started_at': time.time(),
|
|
'source': source_uri,
|
|
'destination': dest_uri,
|
|
'schemas': schemas,
|
|
'tables': tables,
|
|
'progress_tracker': {
|
|
'total_tables': len(tables_list),
|
|
'total_size': total_size,
|
|
'total_rows': total_rows,
|
|
'processed_tables': 0,
|
|
'processed_size': 0,
|
|
'processed_rows': 0,
|
|
'failed_tables': 0,
|
|
'failed_size': 0,
|
|
'failed_rows': 0,
|
|
'current_table': None,
|
|
'current_table_size': 0,
|
|
'current_table_rows': 0,
|
|
'current_table_rows_processed': 0,
|
|
'current_table_start_time': None,
|
|
'start_time': time.time(),
|
|
'last_update': time.time(),
|
|
'tables_list': tables_list,
|
|
'tables_details': [],
|
|
'speed_samples': []
|
|
}
|
|
}
|
|
|
|
def _run_individual_migration(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
|
|
"""تشغيل الترحيل بشكل فردي مع تتبع كل جدول"""
|
|
|
|
logs = []
|
|
successful_tables = []
|
|
failed_tables = []
|
|
|
|
with self._lock:
|
|
tracker = self.migrations[migration_id]['progress_tracker']
|
|
tables_list = tracker['tables_list']
|
|
|
|
for i, table_info in enumerate(tables_list, 1):
|
|
table_name = f"{table_info['schema']}.{table_info['name']}"
|
|
|
|
# تحديث معلومات الجدول الحالي
|
|
with self._lock:
|
|
tracker = self.migrations[migration_id]['progress_tracker']
|
|
tracker['current_table'] = table_name
|
|
tracker['current_table_size'] = table_info['size']
|
|
tracker['current_table_rows'] = table_info['rows']
|
|
tracker['current_table_rows_processed'] = 0
|
|
tracker['current_table_start_time'] = time.time()
|
|
|
|
print(f"[{migration_id}] 📊 Migrating table {i}/{len(tables_list)}: {table_name} ({self._format_size(table_info['size'])})")
|
|
|
|
# ترحيل الجدول مع تتبع الصفوف
|
|
table_result = self._migrate_table_with_row_tracking(
|
|
migration_id, source_uri, dest_uri,
|
|
table_info['schema'], table_info['name']
|
|
)
|
|
|
|
if table_result['success']:
|
|
# تحديث الإحصائيات
|
|
with self._lock:
|
|
tracker = self.migrations[migration_id]['progress_tracker']
|
|
tracker['processed_tables'] += 1
|
|
tracker['processed_size'] += table_info['size']
|
|
tracker['processed_rows'] += table_result.get('rows_migrated', 0)
|
|
|
|
# إضافة تفاصيل الجدول
|
|
tracker['tables_details'].append({
|
|
'name': table_name,
|
|
'size': table_info['size'],
|
|
'size_formatted': self._format_size(table_info['size']),
|
|
'rows': table_result.get('rows_migrated', 0),
|
|
'time': table_result.get('migration_time', 0),
|
|
'time_formatted': self._format_time(table_result.get('migration_time', 0)),
|
|
'speed': table_result.get('speed', 0),
|
|
'speed_formatted': f"{self._format_size(table_result.get('speed', 0))}/s",
|
|
'timestamp': time.time()
|
|
})
|
|
|
|
# إضافة عينة سرعة
|
|
tracker['speed_samples'].append({
|
|
'time': time.time(),
|
|
'size': table_info['size'],
|
|
'tables': 1
|
|
})
|
|
|
|
successful_tables.append(table_name)
|
|
print(f"[{migration_id}] ✅ Completed {table_name} in {self._format_time(table_result.get('migration_time', 0))}")
|
|
|
|
else:
|
|
# تحديث الإحصائيات الفاشلة
|
|
with self._lock:
|
|
tracker = self.migrations[migration_id]['progress_tracker']
|
|
tracker['failed_tables'] += 1
|
|
tracker['failed_size'] += table_info['size']
|
|
|
|
failed_tables.append({
|
|
'table': table_name,
|
|
'error': table_result.get('error')
|
|
})
|
|
print(f"[{migration_id}] ❌ Failed {table_name}: {table_result.get('error')}")
|
|
|
|
execution_time = time.time() - self.migrations[migration_id]['started_at']
|
|
|
|
return {
|
|
'success': len(failed_tables) == 0,
|
|
'migration_id': migration_id,
|
|
'execution_time': execution_time,
|
|
'stats': {
|
|
'tables_created': len(successful_tables),
|
|
'tables_failed': len(failed_tables),
|
|
'total_tables': len(tables_list),
|
|
'total_size': sum(t['size'] for t in tables_list)
|
|
},
|
|
'successful_tables': successful_tables,
|
|
'failed_tables': failed_tables,
|
|
'logs': logs
|
|
}
|
|
|
|
def _migrate_table_with_row_tracking(self, migration_id, source_uri, dest_uri, schema, table):
|
|
"""ترحيل جدول مع تتبع عدد الصفوف المنقولة"""
|
|
try:
|
|
start_time = time.time()
|
|
|
|
# الحصول على إجمالي الصفوف
|
|
total_rows = self._get_table_row_count(source_uri, schema, table) or 0
|
|
|
|
# استخدام COPY مع تتبع التقدم (يتطلب تعديل حسب الإمكانيات)
|
|
# هذا مثال مبسط - في الواقع قد تحتاج لاستخدام COPY مع callback
|
|
|
|
# تنفيذ أمر pg_dump للجدول الواحد
|
|
dump_cmd = [
|
|
'pg_dump', '--dbname', source_uri,
|
|
'-t', f'"{schema}"."{table}"',
|
|
'--data-only', '--inserts'
|
|
]
|
|
|
|
psql_cmd = ['psql', '--dbname', dest_uri]
|
|
|
|
full_cmd = ' '.join(dump_cmd) + ' | ' + ' '.join(psql_cmd)
|
|
|
|
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=300)
|
|
|
|
migration_time = time.time() - start_time
|
|
|
|
if result.returncode == 0:
|
|
# تقدير عدد الصفوف المنقولة (يمكن تحسينه)
|
|
rows_migrated = total_rows
|
|
|
|
return {
|
|
'success': True,
|
|
'rows_migrated': rows_migrated,
|
|
'migration_time': migration_time,
|
|
'speed': (table_size or 0) / migration_time if migration_time > 0 else 0
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error': result.stderr[:500]
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def _get_table_row_count(self, uri, schema, table):
|
|
"""الحصول على عدد صفوف الجدول"""
|
|
try:
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return None
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
sql = f'SELECT COUNT(*) FROM "{schema}"."{table}";'
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
count_line = result.stdout.strip()
|
|
if count_line and count_line.isdigit():
|
|
return int(count_line)
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
def format_live_status(self, migration_id):
|
|
"""تنسيق الحالة المباشرة كنص مقروء"""
|
|
progress = self.get_live_migration_progress(migration_id)
|
|
|
|
if not progress.get('success', True):
|
|
return f"❌ Error: {progress.get('error', 'Unknown error')}"
|
|
|
|
lines = []
|
|
lines.append("=" * 60)
|
|
lines.append(f"🚀 Migration {migration_id} - {progress['status'].upper()}")
|
|
lines.append("=" * 60)
|
|
|
|
if progress['type'] == 'estimated':
|
|
lines.append("⚠️ ESTIMATED PROGRESS (not real-time)")
|
|
|
|
# شريط التقدم الرئيسي
|
|
if 'progress_bars' in progress:
|
|
if 'size' in progress['progress_bars']:
|
|
lines.append(f"\n📊 Overall Progress: {progress['progress_bars']['size']}")
|
|
elif 'main' in progress['progress_bars']:
|
|
lines.append(f"\n📊 Progress: {progress['progress_bars']['main']}")
|
|
|
|
# إحصائيات الملفات
|
|
lines.append(f"\n📦 Tables:")
|
|
lines.append(f" Total: {progress['total']['tables']} tables")
|
|
lines.append(f" Processed: {progress['processed']['tables']} tables ({progress['percentages']['tables']}%)")
|
|
if progress.get('failed', {}).get('tables', 0) > 0:
|
|
lines.append(f" ❌ Failed: {progress['failed']['tables']} tables")
|
|
|
|
# إحصائيات الحجم
|
|
lines.append(f"\n💾 Size:")
|
|
lines.append(f" Total: {progress['total']['size_formatted']}")
|
|
lines.append(f" Transferred: {progress['processed']['size_formatted']} ({progress['percentages']['size']}%)")
|
|
if progress.get('remaining', {}).get('size', 0) > 0:
|
|
lines.append(f" Remaining: {progress['remaining']['size_formatted']}")
|
|
|
|
# معلومات السرعة والوقت
|
|
if 'time' in progress:
|
|
lines.append(f"\n⏱️ Time:")
|
|
lines.append(f" Elapsed: {progress['time']['elapsed_formatted']}")
|
|
if 'eta_formatted' in progress['time']:
|
|
lines.append(f" ETA: {progress['time']['eta_formatted']}")
|
|
|
|
if 'speed' in progress:
|
|
lines.append(f"\n⚡ Speed:")
|
|
lines.append(f" Current: {progress['speed']['current_formatted']}")
|
|
lines.append(f" Average: {progress['speed']['average_formatted']}")
|
|
if progress['speed']['peak'] > 0:
|
|
lines.append(f" Peak: {progress['speed']['peak_formatted']}")
|
|
|
|
# الجدول الحالي
|
|
if progress.get('current_table'):
|
|
ct = progress['current_table']
|
|
lines.append(f"\n📄 Current Table: {ct['name']}")
|
|
lines.append(f" Size: {ct['size_formatted']}")
|
|
if ct.get('rows_total', 0) > 0:
|
|
lines.append(f" Rows: {ct['rows_processed']}/{ct['rows_total']} ({ct['rows_percentage']}%)")
|
|
|
|
# آخر الجداول المنجزة
|
|
if progress.get('recent_tables'):
|
|
lines.append(f"\n✅ Recently Completed:")
|
|
for table in progress['recent_tables'][-3:]: # آخر 3 جداول
|
|
lines.append(f" • {table['name']} - {table['size_formatted']} in {table.get('time_formatted', 'N/A')}")
|
|
|
|
lines.append("\n" + "=" * 60)
|
|
|
|
return '\n'.join(lines)
|
|
def calculate_migration_progress(self, migration_id):
|
|
"""حساب نسبة التقدم في الترحيل بناءً على عدد وحجم الجداول"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return {'success': False, 'error': 'Migration not found'}
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if mig_data.get('status') == 'running':
|
|
# حساب التقديرات للترحيل الجاري
|
|
return self._estimate_running_progress(migration_id)
|
|
else:
|
|
# إحصائيات للترحيل المكتمل
|
|
return self._get_completed_stats(migration_id)
|
|
|
|
def _estimate_running_progress(self, migration_id):
|
|
"""تقدير تقدم الترحيل الجاري"""
|
|
mig_data = self.migrations[migration_id]
|
|
elapsed_time = time.time() - mig_data['started_at']
|
|
|
|
# تقدير عدد الجداول (إذا كانت متوفرة)
|
|
estimated_total_tables = mig_data.get('estimated_tables', 10)
|
|
estimated_total_size = mig_data.get('estimated_size', 100 * 1024 * 1024) # 100 MB افتراضي
|
|
|
|
# حساب التقدم بناءً على الوقت المنقضي (تقدير تقريبي)
|
|
# في الواقع العملي، ستحتاج لمراقبة خرج pg_dump
|
|
progress_percentage = min(elapsed_time / 60 * 10, 95) # 10% لكل دقيقة حتى 95%
|
|
|
|
return {
|
|
'migration_id': migration_id,
|
|
'status': 'running',
|
|
'elapsed_time': elapsed_time,
|
|
'elapsed_time_formatted': self._format_time(elapsed_time),
|
|
'progress_percentage': round(progress_percentage, 2),
|
|
'estimated': True,
|
|
'estimated_total_tables': estimated_total_tables,
|
|
'estimated_total_size': estimated_total_size,
|
|
'estimated_total_size_formatted': self._format_size(estimated_total_size),
|
|
'tables_migrated': int(estimated_total_tables * progress_percentage / 100),
|
|
'size_migrated': int(estimated_total_size * progress_percentage / 100),
|
|
'size_migrated_formatted': self._format_size(int(estimated_total_size * progress_percentage / 100))
|
|
}
|
|
|
|
def _get_completed_stats(self, migration_id):
|
|
"""الحصول على إحصائيات الترحيل المكتمل"""
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if not isinstance(mig_data, dict) or 'success' not in mig_data:
|
|
return {'success': False, 'error': 'Migration not completed'}
|
|
|
|
execution_time = mig_data.get('execution_time', 0)
|
|
stats = mig_data.get('stats', {})
|
|
|
|
# حساب عدد الجداول المنقولة
|
|
tables_created = stats.get('tables_created', 0)
|
|
tables_altered = stats.get('tables_altered', 0)
|
|
indexes_created = stats.get('indexes_created', 0)
|
|
|
|
# تقدير حجم البيانات المنقولة (صعب حسابه بدقة مع pg_dump)
|
|
# نستخدم عدد الجداول كمؤشر تقريبي
|
|
estimated_size_per_table = 10 * 1024 * 1024 # 10 MB لكل جدول تقديري
|
|
estimated_total_size = tables_created * estimated_size_per_table
|
|
|
|
return {
|
|
'migration_id': migration_id,
|
|
'status': 'completed' if mig_data.get('success') else 'failed',
|
|
'success': mig_data.get('success', False),
|
|
'execution_time': execution_time,
|
|
'execution_time_formatted': self._format_time(execution_time),
|
|
'progress_percentage': 100.0 if mig_data.get('success') else 0,
|
|
'tables_created': tables_created,
|
|
'tables_altered': tables_altered,
|
|
'indexes_created': indexes_created,
|
|
'total_tables_affected': tables_created + tables_altered,
|
|
'estimated_size_migrated': estimated_total_size,
|
|
'estimated_size_migrated_formatted': self._format_size(estimated_total_size),
|
|
'error': mig_data.get('error') if not mig_data.get('success') else None
|
|
}
|
|
|
|
def track_table_migration(self, table_name, total_rows=None, rows_processed=None):
|
|
"""تتبع ترحيل جدول معين (للاستخدام مع ترحيل مخصص)"""
|
|
# هذه الدالة يمكن استخدامها إذا كنت تقوم بترحيل الجداول بشكل فردي
|
|
tracker_key = f"table_tracker_{table_name}"
|
|
|
|
if not hasattr(self, '_table_trackers'):
|
|
self._table_trackers = {}
|
|
|
|
if tracker_key not in self._table_trackers:
|
|
self._table_trackers[tracker_key] = {
|
|
'table_name': table_name,
|
|
'total_rows': total_rows,
|
|
'rows_processed': 0,
|
|
'start_time': time.time(),
|
|
'status': 'in_progress'
|
|
}
|
|
|
|
tracker = self._table_trackers[tracker_key]
|
|
|
|
if rows_processed is not None:
|
|
tracker['rows_processed'] = rows_processed
|
|
|
|
# حساب التقدم
|
|
if tracker['total_rows'] and tracker['total_rows'] > 0:
|
|
progress = (tracker['rows_processed'] / tracker['total_rows']) * 100
|
|
else:
|
|
progress = 0
|
|
|
|
elapsed_time = time.time() - tracker['start_time']
|
|
|
|
return {
|
|
'table_name': table_name,
|
|
'total_rows': tracker['total_rows'],
|
|
'rows_processed': tracker['rows_processed'],
|
|
'progress_percentage': round(progress, 2),
|
|
'elapsed_time': elapsed_time,
|
|
'elapsed_time_formatted': self._format_time(elapsed_time),
|
|
'status': tracker['status']
|
|
}
|
|
|
|
def estimate_migration_size(self, source_uri, schemas=None, tables=None):
|
|
"""تقدير حجم الترحيل قبل البدء"""
|
|
try:
|
|
total_size = 0
|
|
table_count = 0
|
|
table_details = []
|
|
|
|
if tables:
|
|
# حساب حجم الجداول المحددة
|
|
for table in tables:
|
|
if '.' in table:
|
|
schema, table_name = table.split('.', 1)
|
|
else:
|
|
schema = 'public'
|
|
table_name = table
|
|
|
|
# الحصول على حجم الجدول من PostgreSQL
|
|
size = self._get_table_size(source_uri, schema, table_name)
|
|
if size:
|
|
total_size += size
|
|
table_count += 1
|
|
table_details.append({
|
|
'table': f"{schema}.{table_name}",
|
|
'size': size,
|
|
'size_formatted': self._format_size(size)
|
|
})
|
|
|
|
elif schemas:
|
|
# حساب حجم جميع الجداول في المخططات المحددة
|
|
for schema in schemas:
|
|
tables_result = self.get_tables(source_uri, schema)
|
|
if tables_result['success']:
|
|
for table_info in tables_result['tables']:
|
|
size = self._get_table_size(source_uri, schema, table_info['name'])
|
|
if size:
|
|
total_size += size
|
|
table_count += 1
|
|
table_details.append({
|
|
'table': f"{schema}.{table_info['name']}",
|
|
'size': size,
|
|
'size_formatted': self._format_size(size)
|
|
})
|
|
else:
|
|
# حساب حجم جميع الجداول
|
|
tables_result = self.get_tables(source_uri)
|
|
if tables_result['success']:
|
|
for table_info in tables_result['tables']:
|
|
size = self._get_table_size(source_uri, table_info['schema'], table_info['name'])
|
|
if size:
|
|
total_size += size
|
|
table_count += 1
|
|
table_details.append({
|
|
'table': f"{table_info['schema']}.{table_info['name']}",
|
|
'size': size,
|
|
'size_formatted': self._format_size(size)
|
|
})
|
|
|
|
return {
|
|
'success': True,
|
|
'total_size': total_size,
|
|
'total_size_formatted': self._format_size(total_size),
|
|
'table_count': table_count,
|
|
'table_details': table_details[:10], # أول 10 جداول فقط
|
|
'estimated_transfer_time': self._estimate_transfer_time(total_size)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def _get_table_size(self, uri, schema, table_name):
|
|
"""الحصول على حجم جدول معين من PostgreSQL"""
|
|
try:
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return None
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
# استعلام لجلب حجم الجدول
|
|
sql = f"""
|
|
SELECT pg_total_relation_size('"{schema}"."{table_name}"') as size;
|
|
"""
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
size_line = result.stdout.strip()
|
|
if size_line and size_line.isdigit():
|
|
return int(size_line)
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
def _estimate_transfer_time(self, size_bytes):
|
|
"""تقدير وقت النقل بناءً على حجم البيانات"""
|
|
# سرعات تقديرية للشبكة
|
|
speeds = {
|
|
'slow': 1024 * 1024, # 1 MB/s
|
|
'average': 5 * 1024 * 1024, # 5 MB/s
|
|
'fast': 20 * 1024 * 1024, # 20 MB/s
|
|
'very_fast': 100 * 1024 * 1024 # 100 MB/s
|
|
}
|
|
|
|
estimates = {}
|
|
for speed_name, speed_bps in speeds.items():
|
|
seconds = size_bytes / speed_bps if speed_bps > 0 else 0
|
|
estimates[speed_name] = {
|
|
'seconds': seconds,
|
|
'formatted': self._format_time(seconds)
|
|
}
|
|
|
|
return estimates
|
|
|
|
def get_migration_detailed_progress(self, migration_id):
|
|
"""الحصول على تقدم مفصل للترحيل"""
|
|
progress = self.calculate_migration_progress(migration_id)
|
|
|
|
if not progress.get('success', True):
|
|
return progress
|
|
|
|
# إضافة معلومات إضافية
|
|
with self._lock:
|
|
if migration_id in self.migrations:
|
|
mig_data = self.migrations[migration_id]
|
|
progress['source'] = mig_data.get('source', 'Unknown')
|
|
progress['destination'] = mig_data.get('destination', 'Unknown')
|
|
progress['started_at'] = mig_data.get('started_at')
|
|
|
|
if 'schemas' in mig_data:
|
|
progress['selected_schemas'] = mig_data['schemas']
|
|
if 'tables' in mig_data:
|
|
progress['selected_tables'] = mig_data['tables']
|
|
|
|
# إضافة شريط تقدم نصي
|
|
percentage = progress.get('progress_percentage', 0)
|
|
progress['progress_bar'] = self._format_progress_bar(percentage)
|
|
|
|
return progress
|
|
|
|
def _format_progress_bar(self, percentage, width=30):
|
|
"""تنسيق شريط التقدم كنص"""
|
|
filled = int(width * percentage / 100)
|
|
empty = width - filled
|
|
bar = '█' * filled + '░' * empty
|
|
return f"[{bar}] {percentage:.1f}%"
|
|
|
|
def _format_size(self, size_bytes):
|
|
"""تنسيق حجم الملف"""
|
|
if size_bytes == 0:
|
|
return '0 B'
|
|
|
|
size_names = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
|
|
i = 0
|
|
while size_bytes >= 1024 and i < len(size_names) - 1:
|
|
size_bytes /= 1024.0
|
|
i += 1
|
|
|
|
return f"{size_bytes:.2f} {size_names[i]}"
|
|
|
|
def _format_time(self, seconds):
|
|
"""تنسيق الوقت"""
|
|
if seconds < 60:
|
|
return f"{seconds:.0f}s"
|
|
elif seconds < 3600:
|
|
minutes = seconds / 60
|
|
return f"{minutes:.1f}m"
|
|
elif seconds < 86400:
|
|
hours = seconds / 3600
|
|
return f"{hours:.1f}h"
|
|
else:
|
|
days = seconds / 86400
|
|
return f"{days:.1f}d"
|
|
|
|
def set_migration_estimates(self, migration_id, estimated_tables, estimated_size):
|
|
"""تعيين التقديرات للترحيل الجاري"""
|
|
with self._lock:
|
|
if migration_id in self.migrations:
|
|
self.migrations[migration_id]['estimated_tables'] = estimated_tables
|
|
self.migrations[migration_id]['estimated_size'] = estimated_size
|
|
|
|
def compare_source_destination(self, migration_id):
|
|
"""مقارنة المصدر والوجهة بعد الترحيل"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return {'success': False, 'error': 'Migration not found'}
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if not mig_data.get('success'):
|
|
return {'success': False, 'error': 'Migration not completed successfully'}
|
|
|
|
# مقارنة بسيطة (يمكن توسيعها)
|
|
source_tables = mig_data.get('stats', {}).get('tables_created', 0)
|
|
|
|
return {
|
|
'success': True,
|
|
'source_tables_count': source_tables,
|
|
'destination_tables_count': source_tables, # نفس العدد إذا نجح الترحيل
|
|
'match': True,
|
|
'verification_time': datetime.utcnow().isoformat()
|
|
}
|
|
def parse_uri_to_env(self, uri, prefix=''):
|
|
"""Parse URI to environment variables format"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return None
|
|
|
|
env_vars = {
|
|
f'{prefix}DB_HOST': parsed['host'],
|
|
f'{prefix}DB_PORT': str(parsed['port']),
|
|
f'{prefix}DB_USER': parsed['user'],
|
|
f'{prefix}DB_PASSWORD': parsed['password'],
|
|
f'{prefix}DB_NAME': parsed['database'],
|
|
f'{prefix}DB_URI': parsed['uri']
|
|
}
|
|
|
|
# Create connection string variants
|
|
env_vars[f'{prefix}DB_CONNECTION_STRING'] = f"host={parsed['host']} port={parsed['port']} dbname={parsed['database']} user={parsed['user']} password={parsed['password']}"
|
|
env_vars[f'{prefix}DB_URL'] = parsed['uri']
|
|
|
|
return env_vars
|
|
|
|
def test_connection(self, uri_input):
|
|
"""Test PostgreSQL connection with detailed error info"""
|
|
|
|
# 🔴 الخطوة 1: معالجة إذا كان المدخل JSON (dict)
|
|
if isinstance(uri_input, dict):
|
|
print(f"⚠️ Received JSON object, extracting URI string...")
|
|
|
|
# استخراج الـ URI من المفتاح 'uri' أو 'url'
|
|
if 'uri' in uri_input:
|
|
uri = uri_input['uri']
|
|
print(f"✅ Extracted URI from 'uri' key")
|
|
elif 'url' in uri_input:
|
|
uri = uri_input['url']
|
|
print(f"✅ Extracted URI from 'url' key")
|
|
else:
|
|
# إذا لم يكن هناك مفتاح واضح، ابحث في كل المفاتيح
|
|
error_msg = f'JSON must contain "uri" or "url" key. Received keys: {list(uri_input.keys())}'
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
else:
|
|
# إذا كان string مباشرة
|
|
uri = uri_input
|
|
|
|
# 🔴 تحقق أن uri هو string الآن
|
|
if not isinstance(uri, str):
|
|
error_msg = f'URI must be a string. Got type: {type(uri)}'
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
print(f"🔍 Testing connection to: {uri}")
|
|
parsed = self.parse_postgres_uri(uri)
|
|
|
|
if not parsed:
|
|
error_msg = f'Invalid URI format: {uri}'
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
# استخراج معلومات الاتصال للطباعة عند الفشل
|
|
host = parsed['host']
|
|
port = parsed['port']
|
|
user = parsed['user']
|
|
database = parsed['database']
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', host,
|
|
'-p', str(port),
|
|
'-U', user,
|
|
'-d', database,
|
|
'-c', 'SELECT version(); SELECT current_database();',
|
|
'-t'
|
|
]
|
|
|
|
try:
|
|
print(f" 🔗 Attempting connection to host: {host}:{port}")
|
|
print(f" 👤 User: {user}, Database: {database}")
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')
|
|
version = lines[0] if len(lines) > 0 else ''
|
|
db_name = lines[1] if len(lines) > 1 else ''
|
|
|
|
print(f"✅ Connection successful to {host}:{port}")
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Connection successful',
|
|
'version': version,
|
|
'database': db_name,
|
|
'connection': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
else:
|
|
# 🔴 هنا طباعة تفاصيل الفشل مع الـ host
|
|
error_output = result.stderr.strip()
|
|
print(f"❌ Connection FAILED to host: {host}:{port}")
|
|
print(f" 🔸 User: {user}")
|
|
print(f" 🔸 Database: {database}")
|
|
print(f" 🔸 Error details: {error_output[:200]}") # أول 200 حرف فقط
|
|
|
|
return {
|
|
'success': False,
|
|
'error': error_output,
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f"❌ Connection TIMEOUT to host: {host}:{port}")
|
|
print(f" 🔸 Server not responding after 10 seconds")
|
|
print(f" 🔸 Please check: firewall, network, server status")
|
|
|
|
return {
|
|
'success': False,
|
|
'error': 'Connection timeout',
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
print(f"❌ PSQL CLIENT NOT FOUND")
|
|
print(f" 🔸 The 'psql' command-line tool is not installed")
|
|
print(f" 🔸 Install PostgreSQL client tools and try again")
|
|
|
|
return {
|
|
'success': False,
|
|
'error': 'psql command not found. Install PostgreSQL client.',
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"❌ UNEXPECTED ERROR connecting to host: {host}:{port}")
|
|
print(f" 🔸 Error type: {type(e).__name__}")
|
|
print(f" 🔸 Error message: {str(e)}")
|
|
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
def get_schemas(self, uri):
|
|
"""Get list of schemas from PostgreSQL database"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return {'success': False, 'error': 'Invalid URI format'}
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
sql = """
|
|
SELECT schema_name
|
|
FROM information_schema.schemata
|
|
WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
|
|
ORDER BY schema_name;
|
|
"""
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
schemas = [s.strip() for s in result.stdout.splitlines() if s.strip()]
|
|
return {
|
|
'success': True,
|
|
'schemas': schemas,
|
|
'count': len(schemas)
|
|
}
|
|
else:
|
|
return {'success': False, 'error': result.stderr.strip()}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def get_tables(self, uri, schema=''):
|
|
"""Get list of tables from PostgreSQL database"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return {'success': False, 'error': 'Invalid URI fformat'}
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
if schema:
|
|
sql = f"""
|
|
SELECT table_name, table_type
|
|
FROM information_schema.tables
|
|
WHERE table_schema = '{schema}'
|
|
ORDER BY table_name;
|
|
"""
|
|
else:
|
|
sql = """
|
|
SELECT table_schema, table_name, table_type
|
|
FROM information_schema.tables
|
|
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
|
|
ORDER BY table_schema, table_name;
|
|
"""
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
lines = [line.strip() for line in result.stdout.splitlines() if line.strip()]
|
|
tables = []
|
|
|
|
if schema:
|
|
for line in lines:
|
|
if '|' in line:
|
|
table_name, table_type = line.split('|')
|
|
tables.append({
|
|
'name': table_name.strip(),
|
|
'type': table_type.strip(),
|
|
'schema': schema
|
|
})
|
|
else:
|
|
for line in lines:
|
|
if '|' in line:
|
|
parts = line.split('|')
|
|
if len(parts) >= 3:
|
|
table_schema, table_name, table_type = parts[:3]
|
|
tables.append({
|
|
'schema': table_schema.strip(),
|
|
'name': table_name.strip(),
|
|
'type': table_type.strip()
|
|
})
|
|
|
|
return {
|
|
'success': True,
|
|
'tables': tables,
|
|
'count': len(tables)
|
|
}
|
|
else:
|
|
return {'success': False, 'error': result.stderr.strip()}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def get_table_counts(self, uri, schema=None):
|
|
"""Get row counts for tables"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return {}
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
if schema:
|
|
sql = f"""
|
|
SELECT table_name,
|
|
(SELECT COUNT(*) FROM "{schema}"."{table_name}") as row_count
|
|
FROM information_schema.tables
|
|
WHERE table_schema = '{schema}'
|
|
ORDER BY table_name;
|
|
"""
|
|
else:
|
|
sql = """
|
|
SELECT table_schema, table_name,
|
|
(SELECT COUNT(*) FROM information_schema.tables t2
|
|
WHERE t2.table_schema = t1.table_schema
|
|
AND t2.table_name = t1.table_name) as row_count
|
|
FROM information_schema.tables t1
|
|
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
|
|
ORDER BY table_schema, table_name;
|
|
"""
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
|
|
|
|
table_counts = {}
|
|
if result.returncode == 0:
|
|
for line in result.stdout.splitlines():
|
|
if line.strip() and '|' in line:
|
|
parts = line.split('|')
|
|
if len(parts) >= 3:
|
|
table_schema, table_name, count = parts[:3]
|
|
key = f"{table_schema.strip()}.{table_name.strip()}"
|
|
table_counts[key] = int(count.strip()) if count.strip().isdigit() else 0
|
|
return table_counts
|
|
|
|
except Exception:
|
|
return {}
|
|
|
|
def run_migration(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
|
|
"""Run migration and return results"""
|
|
logs = []
|
|
|
|
def log(msg, level='info'):
|
|
log_entry = {
|
|
'timestamp': time.time(),
|
|
'message': msg,
|
|
'level': level
|
|
}
|
|
logs.append(log_entry)
|
|
print(f"[{migration_id}] {msg}")
|
|
|
|
try:
|
|
log(f"🔧 Starting PostgreSQL Migration {migration_id}", 'info')
|
|
log(f"Source: {source_uri}", 'info')
|
|
log(f"Destination: {dest_uri}", 'info')
|
|
|
|
if schemas:
|
|
log(f"Selected schemas: {', '.join(schemas)}", 'info')
|
|
if tables:
|
|
log(f"Selected tables: {', '.join(tables)}", 'info')
|
|
|
|
# Build pg_dump command
|
|
dump_cmd = ['pg_dump', '--dbname', source_uri, '--clean', '--if-exists', '--no-owner']
|
|
|
|
# Add schema/table filters
|
|
if tables:
|
|
for table in tables:
|
|
if '.' in table:
|
|
dump_cmd.extend(['-t', table])
|
|
else:
|
|
log(f"⚠️ Table '{table}' should include schema (e.g., public.table)", 'warning')
|
|
elif schemas:
|
|
for schema in schemas:
|
|
dump_cmd.extend(['-n', schema])
|
|
|
|
# Build psql command
|
|
psql_cmd = ['psql', '--dbname', dest_uri, '--single-transaction']
|
|
|
|
# Create pipe command
|
|
full_cmd = ' '.join(dump_cmd) + ' | ' + ' '.join(psql_cmd)
|
|
|
|
log(f"Executing: {full_cmd[:100]}...", 'info')
|
|
|
|
# Run the migration
|
|
start_time = time.time()
|
|
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=300)
|
|
end_time = time.time()
|
|
|
|
execution_time = end_time - start_time
|
|
|
|
if result.returncode == 0:
|
|
log(f"✅ Migration completed successfully!", 'success')
|
|
log(f"⏱️ Execution time: {execution_time:.2f} seconds", 'info')
|
|
|
|
# Parse statistics
|
|
create_tables = result.stdout.count('CREATE TABLE')
|
|
if create_tables > 0:
|
|
log(f"📊 Tables created: {create_tables}", 'info')
|
|
|
|
alter_tables = result.stdout.count('ALTER TABLE')
|
|
create_indexes = result.stdout.count('CREATE INDEX')
|
|
|
|
if alter_tables > 0:
|
|
log(f"🔄 Tables altered: {alter_tables}", 'info')
|
|
if create_indexes > 0:
|
|
log(f"📈 Indexes created: {create_indexes}", 'info')
|
|
|
|
# Extract environment variables from URIs
|
|
source_env = self.parse_uri_to_env(source_uri, 'SRC_')
|
|
dest_env = self.parse_uri_to_env(dest_uri, 'DEST_')
|
|
all_env = {}
|
|
if source_env:
|
|
all_env.update(source_env)
|
|
if dest_env:
|
|
all_env.update(dest_env)
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'execution_time': execution_time,
|
|
'stats': {
|
|
'tables_created': create_tables,
|
|
'tables_altered': alter_tables,
|
|
'indexes_created': create_indexes
|
|
},
|
|
'environment_variables': all_env,
|
|
'logs': logs
|
|
}
|
|
else:
|
|
error_msg = result.stderr[:1000] if result.stderr else 'Unknown error'
|
|
log(f"❌ Migration failed: {error_msg}", 'error')
|
|
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': error_msg,
|
|
'logs': logs
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
log("❌ Migration timeout (5 minutes)", 'error')
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': 'Migration timeout',
|
|
'logs': logs
|
|
}
|
|
except Exception as e:
|
|
log(f"❌ Error: {str(e)}", 'error')
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': str(e),
|
|
'logs': logs
|
|
}
|
|
|
|
def start_migration_async(self, migration_id, source_uri, dest_uri, schemas=None, tables=None):
|
|
"""Start migration in background thread"""
|
|
def migration_task():
|
|
result = self.run_migration(migration_id, source_uri, dest_uri, schemas, tables)
|
|
with self._lock:
|
|
self.migrations[migration_id] = result
|
|
|
|
thread = threading.Thread(target=migration_task, daemon=True)
|
|
thread.start()
|
|
|
|
# Initialize migration entry
|
|
with self._lock:
|
|
self.migrations[migration_id] = {
|
|
'status': 'running',
|
|
'started_at': time.time(),
|
|
'source': source_uri,
|
|
'destination': dest_uri
|
|
}
|
|
|
|
return migration_id
|
|
|
|
def get_migration_status(self, migration_id):
|
|
"""Get status of a migration"""
|
|
with self._lock:
|
|
return self.migrations.get(migration_id)
|
|
|
|
def list_migrations(self):
|
|
"""List all migrations"""
|
|
with self._lock:
|
|
migrations_list = []
|
|
for mig_id, mig_data in self.migrations.items():
|
|
if isinstance(mig_data, dict) and 'success' in mig_data:
|
|
status = 'completed' if mig_data['success'] else 'failed'
|
|
else:
|
|
status = 'running'
|
|
|
|
migrations_list.append({
|
|
'id': mig_id,
|
|
'status': status,
|
|
'data': mig_data
|
|
})
|
|
|
|
return migrations_list
|
|
|
|
|
|
# ============================================================================
|
|
# Part 2: S3 to S3 Migrator (S3ToS3Migrator)
|
|
# ============================================================================
|
|
|
|
class S3ToS3Migrator:
|
|
"""S3 to S3 migration engine"""
|
|
|
|
def __init__(self):
|
|
self.migrations = {}
|
|
self._lock = threading.Lock()
|
|
self.source_clients = {}
|
|
self.dest_clients = {}
|
|
|
|
# ==================== S3 Client Management ====================
|
|
|
|
def get_source_client(self, access_key_id, secret_access_key, region='us-east-1',
|
|
endpoint_url=None, session_token=None):
|
|
"""Get or create source S3 client"""
|
|
cache_key = f"src:{access_key_id}:{secret_access_key}:{region}:{endpoint_url}"
|
|
|
|
if cache_key in self.source_clients:
|
|
return self.source_clients[cache_key]
|
|
|
|
try:
|
|
s3_config = {
|
|
'aws_access_key_id': access_key_id,
|
|
'aws_secret_access_key': secret_access_key,
|
|
'region_name': region
|
|
}
|
|
|
|
if session_token:
|
|
s3_config['aws_session_token'] = session_token
|
|
|
|
if endpoint_url:
|
|
s3_config['endpoint_url'] = endpoint_url
|
|
# تعطيل SSL للخدمات المحلية
|
|
if 'localhost' in endpoint_url or '127.0.0.1' in endpoint_url:
|
|
s3_config['verify'] = False
|
|
|
|
client = boto3.client('s3', **s3_config)
|
|
self.source_clients[cache_key] = client
|
|
return client
|
|
|
|
except Exception as e:
|
|
print(f"❌ Failed to create source S3 client: {str(e)}")
|
|
return None
|
|
def calculate_migration_progress(self, migration_id):
|
|
"""حساب نسبة التقدم في الترحيل بناءً على عدد وحجم الملفات"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return {'success': False, 'error': 'Migration not found'}
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if mig_data.get('status') == 'running':
|
|
return self._get_running_progress(migration_id)
|
|
else:
|
|
return self._get_completed_progress(migration_id)
|
|
|
|
def _get_running_progress(self, migration_id):
|
|
"""الحصول على تقدم الترحيل الجاري"""
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
# التحقق من وجود بيانات التتبع
|
|
if 'progress_tracker' not in mig_data:
|
|
# إنشاء متتبع تقدم إذا لم يكن موجوداً
|
|
mig_data['progress_tracker'] = {
|
|
'total_objects': mig_data.get('total_objects', 0),
|
|
'total_size': mig_data.get('total_size', 0),
|
|
'processed_objects': 0,
|
|
'processed_size': 0,
|
|
'failed_objects': 0,
|
|
'failed_size': 0,
|
|
'current_object': None,
|
|
'current_object_size': 0,
|
|
'current_object_progress': 0,
|
|
'start_time': mig_data.get('started_at', time.time()),
|
|
'last_update': time.time(),
|
|
'objects_details': [],
|
|
'speed_samples': [],
|
|
'estimated_time_remaining': 0
|
|
}
|
|
|
|
tracker = mig_data['progress_tracker']
|
|
elapsed_time = time.time() - tracker['start_time']
|
|
|
|
# حساب النسب المئوية
|
|
objects_percentage = (tracker['processed_objects'] / tracker['total_objects'] * 100) if tracker['total_objects'] > 0 else 0
|
|
size_percentage = (tracker['processed_size'] / tracker['total_size'] * 100) if tracker['total_size'] > 0 else 0
|
|
|
|
# حساب السرعة (متوسط آخر 10 ثواني)
|
|
current_time = time.time()
|
|
tracker['speed_samples'] = [s for s in tracker.get('speed_samples', []) if current_time - s['time'] < 10]
|
|
|
|
if tracker['speed_samples']:
|
|
recent_size = sum(s['size'] for s in tracker['speed_samples'])
|
|
recent_time = tracker['speed_samples'][-1]['time'] - tracker['speed_samples'][0]['time'] if len(tracker['speed_samples']) > 1 else 1
|
|
current_speed = recent_size / recent_time if recent_time > 0 else 0
|
|
else:
|
|
current_speed = tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0
|
|
|
|
# تقدير الوقت المتبقي
|
|
remaining_size = tracker['total_size'] - tracker['processed_size'] - tracker['failed_size']
|
|
if current_speed > 0:
|
|
eta_seconds = remaining_size / current_speed
|
|
else:
|
|
eta_seconds = 0
|
|
|
|
# تجميع تفاصيل الملفات الأخيرة
|
|
recent_objects = tracker['objects_details'][-10:] if tracker['objects_details'] else []
|
|
|
|
return {
|
|
'migration_id': migration_id,
|
|
'status': 'running',
|
|
'timestamp': time.time(),
|
|
'total_objects': tracker['total_objects'],
|
|
'total_size': tracker['total_size'],
|
|
'total_size_formatted': self._format_size(tracker['total_size']),
|
|
'processed_objects': tracker['processed_objects'],
|
|
'processed_size': tracker['processed_size'],
|
|
'processed_size_formatted': self._format_size(tracker['processed_size']),
|
|
'failed_objects': tracker['failed_objects'],
|
|
'failed_size': tracker['failed_size'],
|
|
'failed_size_formatted': self._format_size(tracker['failed_size']),
|
|
'remaining_objects': tracker['total_objects'] - tracker['processed_objects'] - tracker['failed_objects'],
|
|
'remaining_size': tracker['total_size'] - tracker['processed_size'] - tracker['failed_size'],
|
|
'remaining_size_formatted': self._format_size(tracker['total_size'] - tracker['processed_size'] - tracker['failed_size']),
|
|
'objects_percentage': round(objects_percentage, 2),
|
|
'size_percentage': round(size_percentage, 2),
|
|
'elapsed_time': elapsed_time,
|
|
'elapsed_time_formatted': self._format_time(elapsed_time),
|
|
'current_speed': current_speed,
|
|
'current_speed_formatted': f"{self._format_size(current_speed)}/s",
|
|
'average_speed': tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0,
|
|
'average_speed_formatted': f"{self._format_size(tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0)}/s",
|
|
'eta_seconds': eta_seconds,
|
|
'eta_formatted': self._format_time(eta_seconds),
|
|
'current_object': tracker['current_object'],
|
|
'current_object_size': tracker['current_object_size'],
|
|
'current_object_size_formatted': self._format_size(tracker['current_object_size']),
|
|
'current_object_progress': tracker['current_object_progress'],
|
|
'recent_objects': recent_objects,
|
|
'progress_bar_objects': self._format_progress_bar(objects_percentage),
|
|
'progress_bar_size': self._format_progress_bar(size_percentage),
|
|
'estimated_completion_time': time.time() + eta_seconds if eta_seconds > 0 else None
|
|
}
|
|
|
|
def _get_completed_progress(self, migration_id):
|
|
"""الحصول على إحصائيات الترحيل المكتمل"""
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
stats = mig_data.get('stats', {})
|
|
successful = mig_data.get('successful', [])
|
|
failed = mig_data.get('failed', [])
|
|
|
|
total_objects = stats.get('total_objects', 0)
|
|
migrated = stats.get('migrated', 0)
|
|
failed_count = stats.get('failed', 0)
|
|
total_size = stats.get('total_size', 0)
|
|
migrated_size = stats.get('migrated_size', 0)
|
|
|
|
completion_time = mig_data.get('completed_at', time.time())
|
|
start_time = mig_data.get('started_at', completion_time)
|
|
total_time = completion_time - start_time
|
|
|
|
return {
|
|
'migration_id': migration_id,
|
|
'status': 'completed' if mig_data.get('success') else 'failed',
|
|
'success': mig_data.get('success', False),
|
|
'timestamp': time.time(),
|
|
'total_objects': total_objects,
|
|
'total_size': total_size,
|
|
'total_size_formatted': self._format_size(total_size),
|
|
'migrated_objects': migrated,
|
|
'migrated_size': migrated_size,
|
|
'migrated_size_formatted': self._format_size(migrated_size),
|
|
'failed_objects': failed_count,
|
|
'failed_size': total_size - migrated_size,
|
|
'failed_size_formatted': self._format_size(total_size - migrated_size),
|
|
'objects_percentage': 100.0 if migrated == total_objects else round((migrated / total_objects * 100) if total_objects > 0 else 0, 2),
|
|
'size_percentage': 100.0 if migrated_size == total_size else round((migrated_size / total_size * 100) if total_size > 0 else 0, 2),
|
|
'total_time': total_time,
|
|
'total_time_formatted': self._format_time(total_time),
|
|
'average_speed': migrated_size / total_time if total_time > 0 else 0,
|
|
'average_speed_formatted': f"{self._format_size(migrated_size / total_time if total_time > 0 else 0)}/s",
|
|
'successful_objects': successful[:10] if successful else [],
|
|
'failed_objects_list': failed[:10] if failed else [],
|
|
'progress_bar': self._format_progress_bar(100.0 if mig_data.get('success') else 0)
|
|
}
|
|
|
|
def update_progress(self, migration_id, processed_objects=1, processed_size=0,
|
|
failed_objects=0, failed_size=0, current_object=None,
|
|
object_details=None):
|
|
"""تحديث تقدم الترحيل (للاستخدام أثناء الترحيل)"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return False
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if 'progress_tracker' not in mig_data:
|
|
mig_data['progress_tracker'] = {
|
|
'total_objects': mig_data.get('total_objects', 0),
|
|
'total_size': mig_data.get('total_size', 0),
|
|
'processed_objects': 0,
|
|
'processed_size': 0,
|
|
'failed_objects': 0,
|
|
'failed_size': 0,
|
|
'start_time': mig_data.get('started_at', time.time()),
|
|
'speed_samples': []
|
|
}
|
|
|
|
tracker = mig_data['progress_tracker']
|
|
|
|
# تحديث الإحصائيات
|
|
tracker['processed_objects'] += processed_objects
|
|
tracker['processed_size'] += processed_size
|
|
tracker['failed_objects'] += failed_objects
|
|
tracker['failed_size'] += failed_size
|
|
tracker['last_update'] = time.time()
|
|
|
|
# تحديث العنصر الحالي
|
|
if current_object:
|
|
tracker['current_object'] = current_object
|
|
|
|
# إضافة عينة سرعة
|
|
tracker['speed_samples'].append({
|
|
'time': time.time(),
|
|
'size': processed_size
|
|
})
|
|
|
|
# إضافة تفاصيل الكائن إذا وجدت
|
|
if object_details:
|
|
if 'objects_details' not in tracker:
|
|
tracker['objects_details'] = []
|
|
tracker['objects_details'].append({
|
|
'timestamp': time.time(),
|
|
**object_details
|
|
})
|
|
|
|
return True
|
|
|
|
def get_live_progress_stream(self, migration_id, interval=1):
|
|
"""الحصول على تدفق مباشر لتقدم الترحيل (للاستخدام مع WebSockets)"""
|
|
import time
|
|
|
|
while True:
|
|
progress = self.calculate_migration_progress(migration_id)
|
|
if progress.get('status') != 'running':
|
|
yield progress
|
|
break
|
|
|
|
yield progress
|
|
time.sleep(interval)
|
|
|
|
def format_live_status(self, migration_id):
|
|
"""تنسيق الحالة المباشرة كنص مقروء"""
|
|
progress = self.calculate_migration_progress(migration_id)
|
|
|
|
if not progress.get('success', True) and progress.get('status') != 'running':
|
|
return f"❌ Error: {progress.get('error', 'Unknown error')}"
|
|
|
|
if progress['status'] == 'running':
|
|
status_lines = [
|
|
f"🚀 Migration {migration_id} - {progress['status'].upper()}",
|
|
f"📊 Progress: {progress['progress_bar_objects']}",
|
|
f"📦 Objects: {progress['processed_objects']}/{progress['total_objects']} ({progress['objects_percentage']}%)",
|
|
f"💾 Size: {progress['processed_size_formatted']}/{progress['total_size_formatted']} ({progress['size_percentage']}%)",
|
|
f"⚡ Speed: {progress['current_speed_formatted']} (Avg: {progress['average_speed_formatted']})",
|
|
f"⏱️ Elapsed: {progress['elapsed_time_formatted']} | ETA: {progress['eta_formatted']}",
|
|
]
|
|
|
|
if progress['current_object']:
|
|
status_lines.append(f"📄 Current: {progress['current_object']} ({progress['current_object_size_formatted']})")
|
|
|
|
if progress['failed_objects'] > 0:
|
|
status_lines.append(f"❌ Failed: {progress['failed_objects']} objects ({progress['failed_size_formatted']})")
|
|
|
|
return '\n'.join(status_lines)
|
|
|
|
else:
|
|
# عرض ملخص الترحيل المكتمل
|
|
status_lines = [
|
|
f"{'✅' if progress['success'] else '❌'} Migration {migration_id} - {progress['status'].upper()}",
|
|
f"📊 Summary:",
|
|
f"📦 Objects: {progress['migrated_objects']}/{progress['total_objects']} ({progress['objects_percentage']}%)",
|
|
f"💾 Size: {progress['migrated_size_formatted']}/{progress['total_size_formatted']} ({progress['size_percentage']}%)",
|
|
f"⏱️ Total time: {progress['total_time_formatted']}",
|
|
f"⚡ Average speed: {progress['average_speed_formatted']}",
|
|
]
|
|
|
|
if progress.get('failed_objects', 0) > 0:
|
|
status_lines.append(f"❌ Failed: {progress['failed_objects']} objects")
|
|
|
|
return '\n'.join(status_lines)
|
|
|
|
def estimate_transfer_time(self, total_size, current_speed=None):
|
|
"""تقدير وقت النقل بناءً على حجم البيانات"""
|
|
# سرعات تقديرية للشبكة (بالبايت في الثانية)
|
|
speed_profiles = {
|
|
'slow': 1 * 1024 * 1024, # 1 MB/s
|
|
'average': 5 * 1024 * 1024, # 5 MB/s
|
|
'fast': 20 * 1024 * 1024, # 20 MB/s
|
|
'very_fast': 100 * 1024 * 1024, # 100 MB/s
|
|
'gigabit': 125 * 1024 * 1024, # 1 Gbps ~ 125 MB/s
|
|
}
|
|
|
|
estimates = {}
|
|
for profile, speed in speed_profiles.items():
|
|
seconds = total_size / speed if speed > 0 else 0
|
|
estimates[profile] = {
|
|
'speed': speed,
|
|
'speed_formatted': f"{self._format_size(speed)}/s",
|
|
'seconds': seconds,
|
|
'formatted': self._format_time(seconds)
|
|
}
|
|
|
|
# إذا كان لدينا سرعة حالية، أضف تقدير بناءً عليها
|
|
if current_speed and current_speed > 0:
|
|
seconds = total_size / current_speed
|
|
estimates['current'] = {
|
|
'speed': current_speed,
|
|
'speed_formatted': f"{self._format_size(current_speed)}/s",
|
|
'seconds': seconds,
|
|
'formatted': self._format_time(seconds)
|
|
}
|
|
|
|
return estimates
|
|
|
|
def _format_progress_bar(self, percentage, width=30):
|
|
"""تنسيق شريط التقدم كنص"""
|
|
filled = int(width * percentage / 100)
|
|
empty = width - filled
|
|
bar = '█' * filled + '░' * empty
|
|
return f"[{bar}] {percentage:.1f}%"
|
|
|
|
def _format_time(self, seconds):
|
|
"""تنسيق الوقت"""
|
|
if seconds < 60:
|
|
return f"{seconds:.0f}s"
|
|
elif seconds < 3600:
|
|
minutes = seconds / 60
|
|
return f"{minutes:.1f}m"
|
|
elif seconds < 86400:
|
|
hours = seconds / 3600
|
|
return f"{hours:.1f}h"
|
|
else:
|
|
days = seconds / 86400
|
|
return f"{days:.1f}d"
|
|
|
|
def get_migration_speed_history(self, migration_id):
|
|
"""الحصول على تاريخ السرعة للترحيل"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return {'success': False, 'error': 'Migration not found'}
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
tracker = mig_data.get('progress_tracker', {})
|
|
speed_samples = tracker.get('speed_samples', [])
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'speed_samples': speed_samples,
|
|
'average_speed': sum(s['size'] for s in speed_samples) / len(speed_samples) if speed_samples else 0,
|
|
'peak_speed': max((s['size'] for s in speed_samples), default=0),
|
|
'samples_count': len(speed_samples)
|
|
}
|
|
|
|
def compare_source_destination(self, migration_id):
|
|
"""مقارنة المصدر والوجهة بعد الترحيل"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return {'success': False, 'error': 'Migration not found'}
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if not mig_data.get('success'):
|
|
return {'success': False, 'error': 'Migration not completed successfully'}
|
|
|
|
stats = mig_data.get('stats', {})
|
|
|
|
return {
|
|
'success': True,
|
|
'source_objects': stats.get('total_objects', 0),
|
|
'destination_objects': stats.get('migrated', 0),
|
|
'source_size': stats.get('total_size', 0),
|
|
'destination_size': stats.get('migrated_size', 0),
|
|
'match': stats.get('total_objects', 0) == stats.get('migrated', 0),
|
|
'verification_time': datetime.utcnow().isoformat()
|
|
}
|
|
def get_destination_client(self, access_key_id, secret_access_key, region='us-east-1',
|
|
endpoint_url=None, session_token=None):
|
|
"""Get or create destination S3 client"""
|
|
cache_key = f"dst:{access_key_id}:{secret_access_key}:{region}:{endpoint_url}"
|
|
|
|
if cache_key in self.dest_clients:
|
|
return self.dest_clients[cache_key]
|
|
|
|
try:
|
|
s3_config = {
|
|
'aws_access_key_id': access_key_id,
|
|
'aws_secret_access_key': secret_access_key,
|
|
'region_name': region
|
|
}
|
|
|
|
if session_token:
|
|
s3_config['aws_session_token'] = session_token
|
|
|
|
if endpoint_url:
|
|
s3_config['endpoint_url'] = endpoint_url
|
|
if 'localhost' in endpoint_url or '127.0.0.1' in endpoint_url:
|
|
s3_config['verify'] = False
|
|
|
|
client = boto3.client('s3', **s3_config)
|
|
self.dest_clients[cache_key] = client
|
|
return client
|
|
|
|
except Exception as e:
|
|
print(f"❌ Failed to create destination S3 client: {str(e)}")
|
|
return None
|
|
|
|
# ==================== Connection Testing ====================
|
|
|
|
def test_source_connection(self, access_key_id, secret_access_key, region='us-east-1',
|
|
endpoint_url=None, session_token=None):
|
|
"""Test source S3 connection"""
|
|
return self._test_connection('Source', access_key_id, secret_access_key,
|
|
region, endpoint_url, session_token)
|
|
|
|
def test_destination_connection(self, access_key_id, secret_access_key, region='us-east-1',
|
|
endpoint_url=None, session_token=None):
|
|
"""Test destination S3 connection"""
|
|
return self._test_connection('Destination', access_key_id, secret_access_key,
|
|
region, endpoint_url, session_token)
|
|
|
|
def _test_connection(self, conn_type, access_key_id, secret_access_key, region,
|
|
endpoint_url, session_token):
|
|
"""Internal method to test S3 connection"""
|
|
if not access_key_id or not secret_access_key:
|
|
print(f"❌ {conn_type} AWS credentials are required")
|
|
return {
|
|
'success': False,
|
|
'error': f'{conn_type} AWS credentials are required',
|
|
'details': {
|
|
'access_key_id_provided': bool(access_key_id),
|
|
'secret_key_provided': bool(secret_access_key)
|
|
}
|
|
}
|
|
|
|
print(f"🔍 Testing {conn_type} S3 connection...")
|
|
print(f" 🔸 Access Key ID: {access_key_id[:10]}...")
|
|
print(f" 🔸 Region: {region}")
|
|
print(f" 🔸 Endpoint URL: {endpoint_url or 'AWS S3 (default)'}")
|
|
|
|
try:
|
|
if conn_type == 'Source':
|
|
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
else:
|
|
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
|
|
if not client:
|
|
return {
|
|
'success': False,
|
|
'error': f'Failed to create {conn_type} S3 client'
|
|
}
|
|
|
|
# Test connection by listing buckets
|
|
start_time = time.time()
|
|
response = client.list_buckets()
|
|
end_time = time.time()
|
|
|
|
response_time = end_time - start_time
|
|
buckets = [bucket['Name'] for bucket in response.get('Buckets', [])]
|
|
|
|
print(f"✅ {conn_type} S3 Connection Successful!")
|
|
print(f" 🔸 Response time: {response_time:.2f} seconds")
|
|
print(f" 🔸 Available buckets: {len(buckets)}")
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'{conn_type} S3 connection successful',
|
|
'buckets': buckets[:10], # First 10 buckets only
|
|
'bucket_count': len(buckets),
|
|
'response_time': response_time,
|
|
'connection_details': {
|
|
'type': conn_type,
|
|
'region': region,
|
|
'endpoint': endpoint_url or 'AWS S3 (default)',
|
|
'has_credentials': True
|
|
}
|
|
}
|
|
|
|
except NoCredentialsError:
|
|
error_msg = f"{conn_type} No credentials provided or credentials are invalid"
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
except EndpointConnectionError as e:
|
|
error_msg = f"{conn_type} Cannot connect to endpoint: {endpoint_url}"
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg, 'details': str(e)}
|
|
|
|
except ClientError as e:
|
|
error_code = e.response['Error']['Code']
|
|
error_msg = e.response['Error']['Message']
|
|
print(f"❌ {conn_type} AWS Error: {error_code} - {error_msg}")
|
|
|
|
# 404 usually means list_buckets permission denied, but connection works
|
|
if error_code == '404':
|
|
return {
|
|
'success': True,
|
|
'message': f'{conn_type} S3 connection successful (credentials valid, but list_buckets not allowed)',
|
|
'connection_details': {
|
|
'type': conn_type,
|
|
'region': region,
|
|
'endpoint': endpoint_url or 'AWS S3 (default)',
|
|
'note': '404: Not Found - This is usually permission denied for list_buckets'
|
|
}
|
|
}
|
|
|
|
return {
|
|
'success': False,
|
|
'error': f"{error_code}: {error_msg}",
|
|
'aws_error': error_code
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"{conn_type} Unexpected error: {str(e)}"
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
# ==================== Bucket Operations ====================
|
|
|
|
def list_buckets(self, access_key_id, secret_access_key, region='us-east-1',
|
|
endpoint_url=None, session_token=None, is_source=True):
|
|
"""List all S3 buckets with details"""
|
|
try:
|
|
if is_source:
|
|
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
else:
|
|
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
|
|
if not client:
|
|
return {'success': False, 'error': 'Failed to create S3 client'}
|
|
|
|
response = client.list_buckets()
|
|
buckets_info = []
|
|
|
|
for bucket in response.get('Buckets', []):
|
|
bucket_name = bucket['Name']
|
|
creation_date = bucket['CreationDate']
|
|
|
|
# Get bucket location (region)
|
|
try:
|
|
location = client.get_bucket_location(Bucket=bucket_name)
|
|
bucket_region = location.get('LocationConstraint', 'us-east-1')
|
|
if not bucket_region:
|
|
bucket_region = 'us-east-1'
|
|
except:
|
|
bucket_region = region
|
|
|
|
# Get bucket size and object count (optional, may be slow)
|
|
try:
|
|
size_response = client.list_objects_v2(Bucket=bucket_name, MaxKeys=1000)
|
|
object_count = len(size_response.get('Contents', []))
|
|
total_size = sum(obj.get('Size', 0) for obj in size_response.get('Contents', []))
|
|
except:
|
|
object_count = 0
|
|
total_size = 0
|
|
|
|
buckets_info.append({
|
|
'name': bucket_name,
|
|
'creation_date': creation_date.isoformat() if hasattr(creation_date, 'isoformat') else str(creation_date),
|
|
'region': bucket_region,
|
|
'object_count': object_count,
|
|
'total_size': total_size
|
|
})
|
|
|
|
return {
|
|
'success': True,
|
|
'buckets': buckets_info,
|
|
'count': len(buckets_info)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def list_objects(self, bucket_name, prefix='', max_keys=1000, is_source=True,
|
|
access_key_id=None, secret_access_key=None, region='us-east-1',
|
|
endpoint_url=None, session_token=None):
|
|
"""List objects in a bucket with pagination support"""
|
|
try:
|
|
if is_source:
|
|
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
else:
|
|
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
|
|
if not client:
|
|
return {'success': False, 'error': 'Failed to create S3 client'}
|
|
|
|
all_objects = []
|
|
continuation_token = None
|
|
|
|
while True:
|
|
list_kwargs = {
|
|
'Bucket': bucket_name,
|
|
'Prefix': prefix,
|
|
'MaxKeys': max_keys
|
|
}
|
|
|
|
if continuation_token:
|
|
list_kwargs['ContinuationToken'] = continuation_token
|
|
|
|
response = client.list_objects_v2(**list_kwargs)
|
|
|
|
for obj in response.get('Contents', []):
|
|
all_objects.append({
|
|
'key': obj['Key'],
|
|
'size': obj['Size'],
|
|
'last_modified': obj['LastModified'].isoformat() if hasattr(obj['LastModified'], 'isoformat') else str(obj['LastModified']),
|
|
'etag': obj['ETag'].strip('"')
|
|
})
|
|
|
|
if response.get('IsTruncated'):
|
|
continuation_token = response.get('NextContinuationToken')
|
|
else:
|
|
break
|
|
|
|
return {
|
|
'success': True,
|
|
'objects': all_objects,
|
|
'count': len(all_objects),
|
|
'bucket': bucket_name,
|
|
'prefix': prefix,
|
|
'total_size': sum(obj['size'] for obj in all_objects)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def bucket_exists(self, bucket_name, is_source=True, access_key_id=None,
|
|
secret_access_key=None, region='us-east-1', endpoint_url=None,
|
|
session_token=None):
|
|
"""Check if bucket exists and is accessible"""
|
|
try:
|
|
if is_source:
|
|
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
else:
|
|
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
|
|
if not client:
|
|
return {'success': False, 'error': 'Failed to create S3 client'}
|
|
|
|
client.head_bucket(Bucket=bucket_name)
|
|
|
|
# Get bucket location
|
|
try:
|
|
location = client.get_bucket_location(Bucket=bucket_name)
|
|
bucket_region = location.get('LocationConstraint', 'us-east-1')
|
|
if not bucket_region:
|
|
bucket_region = 'us-east-1'
|
|
except:
|
|
bucket_region = region
|
|
|
|
return {
|
|
'success': True,
|
|
'exists': True,
|
|
'bucket': bucket_name,
|
|
'region': bucket_region
|
|
}
|
|
|
|
except ClientError as e:
|
|
error_code = e.response['Error']['Code']
|
|
if error_code == '404':
|
|
return {'success': True, 'exists': False, 'bucket': bucket_name}
|
|
elif error_code == '403':
|
|
return {'success': False, 'error': 'Access denied to bucket', 'exists': False}
|
|
else:
|
|
return {'success': False, 'error': str(e)}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def create_bucket(self, bucket_name, region='us-east-1', endpoint_url=None,
|
|
access_key_id=None, secret_access_key=None, session_token=None):
|
|
"""Create a new S3 bucket in destination"""
|
|
try:
|
|
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
if not client:
|
|
return {'success': False, 'error': 'Failed to create S3 client'}
|
|
|
|
# Check if bucket already exists
|
|
exists_check = self.bucket_exists(bucket_name, is_source=False,
|
|
access_key_id=access_key_id,
|
|
secret_access_key=secret_access_key,
|
|
region=region, endpoint_url=endpoint_url,
|
|
session_token=session_token)
|
|
|
|
if exists_check.get('exists'):
|
|
return {
|
|
'success': True,
|
|
'message': f'Bucket already exists: {bucket_name}',
|
|
'bucket': bucket_name,
|
|
'created': False
|
|
}
|
|
|
|
# Create bucket
|
|
create_kwargs = {'Bucket': bucket_name}
|
|
|
|
# Special handling for us-east-1
|
|
if region != 'us-east-1':
|
|
create_kwargs['CreateBucketConfiguration'] = {
|
|
'LocationConstraint': region
|
|
}
|
|
|
|
client.create_bucket(**create_kwargs)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Bucket created successfully: {bucket_name}',
|
|
'bucket': bucket_name,
|
|
'region': region,
|
|
'created': True
|
|
}
|
|
|
|
except ClientError as e:
|
|
error_code = e.response['Error']['Code']
|
|
error_msg = e.response['Error']['Message']
|
|
return {
|
|
'success': False,
|
|
'error': f"{error_code}: {error_msg}",
|
|
'bucket': bucket_name
|
|
}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e), 'bucket': bucket_name}
|
|
|
|
# ==================== Object Migration ====================
|
|
|
|
def migrate_object(self, source_bucket, source_key, dest_bucket, dest_key=None,
|
|
source_credentials=None, dest_credentials=None,
|
|
preserve_metadata=True, preserve_acl=False,
|
|
metadata=None, storage_class='STANDARD'):
|
|
"""
|
|
Migrate a single object from source S3 to destination S3
|
|
"""
|
|
logs = []
|
|
|
|
def log(msg, level='info'):
|
|
log_entry = {
|
|
'timestamp': time.time(),
|
|
'message': msg,
|
|
'level': level
|
|
}
|
|
logs.append(log_entry)
|
|
print(f"[Migration] {msg}")
|
|
|
|
try:
|
|
if not dest_key:
|
|
dest_key = source_key
|
|
|
|
log(f"📦 Migrating: s3://{source_bucket}/{source_key} → s3://{dest_bucket}/{dest_key}")
|
|
|
|
# Get source client
|
|
source_client = self.get_source_client(
|
|
source_credentials.get('access_key_id'),
|
|
source_credentials.get('secret_access_key'),
|
|
source_credentials.get('region', 'us-east-1'),
|
|
source_credentials.get('endpoint_url'),
|
|
source_credentials.get('session_token')
|
|
)
|
|
|
|
if not source_client:
|
|
return {
|
|
'success': False,
|
|
'error': 'Failed to create source S3 client',
|
|
'logs': logs
|
|
}
|
|
|
|
# Get destination client
|
|
dest_client = self.get_destination_client(
|
|
dest_credentials.get('access_key_id'),
|
|
dest_credentials.get('secret_access_key'),
|
|
dest_credentials.get('region', 'us-east-1'),
|
|
dest_credentials.get('endpoint_url'),
|
|
dest_credentials.get('session_token')
|
|
)
|
|
|
|
if not dest_client:
|
|
return {
|
|
'success': False,
|
|
'error': 'Failed to create destination S3 client',
|
|
'logs': logs
|
|
}
|
|
|
|
# Get object metadata from source
|
|
log("🔍 Getting source object metadata...")
|
|
head_response = source_client.head_object(
|
|
Bucket=source_bucket,
|
|
Key=source_key
|
|
)
|
|
|
|
source_size = head_response['ContentLength']
|
|
source_etag = head_response['ETag'].strip('"')
|
|
source_last_modified = head_response['LastModified']
|
|
|
|
log(f" 📊 Size: {self._format_size(source_size)}")
|
|
log(f" 🏷️ ETag: {source_etag[:16]}...")
|
|
|
|
# Prepare metadata
|
|
final_metadata = {}
|
|
if preserve_metadata:
|
|
final_metadata = head_response.get('Metadata', {}).copy()
|
|
|
|
if metadata:
|
|
final_metadata.update(metadata)
|
|
|
|
# Add migration metadata
|
|
final_metadata['migration_source_bucket'] = source_bucket
|
|
final_metadata['migration_source_key'] = source_key
|
|
final_metadata['migration_timestamp'] = datetime.utcnow().isoformat()
|
|
final_metadata['migration_tool'] = 'S3ToS3Migrator'
|
|
|
|
# Check if destination bucket exists, create if not
|
|
dest_bucket_check = self.bucket_exists(
|
|
dest_bucket, is_source=False,
|
|
access_key_id=dest_credentials.get('access_key_id'),
|
|
secret_access_key=dest_credentials.get('secret_access_key'),
|
|
region=dest_credentials.get('region', 'us-east-1'),
|
|
endpoint_url=dest_credentials.get('endpoint_url'),
|
|
session_token=dest_credentials.get('session_token')
|
|
)
|
|
|
|
if not dest_bucket_check.get('exists'):
|
|
log(f"⚠️ Destination bucket '{dest_bucket}' does not exist. Creating...")
|
|
create_result = self.create_bucket(
|
|
dest_bucket,
|
|
region=dest_credentials.get('region', 'us-east-1'),
|
|
endpoint_url=dest_credentials.get('endpoint_url'),
|
|
access_key_id=dest_credentials.get('access_key_id'),
|
|
secret_access_key=dest_credentials.get('secret_access_key'),
|
|
session_token=dest_credentials.get('session_token')
|
|
)
|
|
|
|
if not create_result['success']:
|
|
return {
|
|
'success': False,
|
|
'error': f"Failed to create destination bucket: {create_result['error']}",
|
|
'logs': logs
|
|
}
|
|
log(f"✅ Created destination bucket: {dest_bucket}")
|
|
|
|
# Perform copy operation
|
|
log("⬆️ Copying object to destination...")
|
|
copy_start = time.time()
|
|
|
|
copy_source = {
|
|
'Bucket': source_bucket,
|
|
'Key': source_key
|
|
}
|
|
|
|
copy_kwargs = {
|
|
'Bucket': dest_bucket,
|
|
'Key': dest_key,
|
|
'CopySource': copy_source,
|
|
'Metadata': final_metadata,
|
|
'MetadataDirective': 'REPLACE' if preserve_metadata else 'COPY'
|
|
}
|
|
|
|
if storage_class:
|
|
copy_kwargs['StorageClass'] = storage_class
|
|
|
|
dest_client.copy_object(**copy_kwargs)
|
|
|
|
copy_time = time.time() - copy_start
|
|
|
|
# Verify copy
|
|
log("🔍 Verifying destination object...")
|
|
dest_head = dest_client.head_object(
|
|
Bucket=dest_bucket,
|
|
Key=dest_key
|
|
)
|
|
|
|
dest_size = dest_head['ContentLength']
|
|
dest_etag = dest_head['ETag'].strip('"')
|
|
|
|
# Generate URL
|
|
if dest_credentials.get('endpoint_url'):
|
|
dest_url = f"{dest_credentials['endpoint_url']}/{dest_bucket}/{dest_key}"
|
|
else:
|
|
dest_url = f"https://{dest_bucket}.s3.{dest_credentials.get('region', 'us-east-1')}.amazonaws.com/{dest_key}"
|
|
|
|
log(f"✅ Migration completed in {copy_time:.2f} seconds")
|
|
log(f" 📍 Destination: {dest_url}")
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Object migrated successfully',
|
|
'source': {
|
|
'bucket': source_bucket,
|
|
'key': source_key,
|
|
'size': source_size,
|
|
'etag': source_etag,
|
|
'last_modified': source_last_modified.isoformat() if hasattr(source_last_modified, 'isoformat') else str(source_last_modified)
|
|
},
|
|
'destination': {
|
|
'bucket': dest_bucket,
|
|
'key': dest_key,
|
|
'size': dest_size,
|
|
'etag': dest_etag,
|
|
'url': dest_url
|
|
},
|
|
'copy_time': copy_time,
|
|
'metadata': final_metadata,
|
|
'logs': logs
|
|
}
|
|
|
|
except ClientError as e:
|
|
error_code = e.response['Error']['Code']
|
|
error_msg = e.response['Error']['Message']
|
|
log(f"❌ AWS Error: {error_code} - {error_msg}", 'error')
|
|
return {
|
|
'success': False,
|
|
'error': f"{error_code}: {error_msg}",
|
|
'aws_error': error_code,
|
|
'logs': logs
|
|
}
|
|
except Exception as e:
|
|
log(f"❌ Error: {str(e)}", 'error')
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'logs': logs
|
|
}
|
|
|
|
def migrate_objects_batch(self, objects, source_bucket, dest_bucket,
|
|
source_credentials, dest_credentials,
|
|
preserve_metadata=True, storage_class='STANDARD',
|
|
max_concurrent=5):
|
|
"""
|
|
Migrate multiple objects in batch
|
|
"""
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
logs = []
|
|
results = {
|
|
'successful': [],
|
|
'failed': [],
|
|
'total_size': 0,
|
|
'total_time': 0
|
|
}
|
|
|
|
def log(msg, level='info'):
|
|
log_entry = {
|
|
'timestamp': time.time(),
|
|
'message': msg,
|
|
'level': level
|
|
}
|
|
logs.append(log_entry)
|
|
print(f"[Batch] {msg}")
|
|
|
|
log(f"🚀 Starting batch migration of {len(objects)} objects")
|
|
log(f" 📦 Source: s3://{source_bucket}")
|
|
log(f" 📦 Destination: s3://{dest_bucket}")
|
|
|
|
start_time = time.time()
|
|
|
|
def migrate_single(obj):
|
|
source_key = obj['key'] if isinstance(obj, dict) else obj
|
|
dest_key = obj.get('dest_key', source_key) if isinstance(obj, dict) else source_key
|
|
|
|
result = self.migrate_object(
|
|
source_bucket=source_bucket,
|
|
source_key=source_key,
|
|
dest_bucket=dest_bucket,
|
|
dest_key=dest_key,
|
|
source_credentials=source_credentials,
|
|
dest_credentials=dest_credentials,
|
|
preserve_metadata=preserve_metadata,
|
|
storage_class=storage_class
|
|
)
|
|
|
|
return {
|
|
'source_key': source_key,
|
|
'dest_key': dest_key,
|
|
'result': result
|
|
}
|
|
|
|
# Execute migrations in parallel
|
|
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
|
|
futures = [executor.submit(migrate_single, obj) for obj in objects]
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
migration = future.result()
|
|
if migration['result']['success']:
|
|
results['successful'].append(migration)
|
|
results['total_size'] += migration['result']['destination']['size']
|
|
else:
|
|
results['failed'].append(migration)
|
|
except Exception as e:
|
|
results['failed'].append({
|
|
'source_key': 'unknown',
|
|
'dest_key': 'unknown',
|
|
'error': str(e)
|
|
})
|
|
|
|
results['total_time'] = time.time() - start_time
|
|
|
|
log(f"✅ Batch migration completed in {results['total_time']:.2f} seconds")
|
|
log(f" ✅ Successful: {len(results['successful'])}")
|
|
log(f" ❌ Failed: {len(results['failed'])}")
|
|
log(f" 📦 Total size: {self._format_size(results['total_size'])}")
|
|
|
|
return {
|
|
'success': len(results['failed']) == 0,
|
|
'message': f"Migrated {len(results['successful'])}/{len(objects)} objects",
|
|
'results': results,
|
|
'logs': logs
|
|
}
|
|
|
|
# ==================== Full Migration ====================
|
|
|
|
def start_migration(self, migration_id, source_config, dest_config,
|
|
source_bucket, dest_bucket, prefix='',
|
|
include_patterns=None, exclude_patterns=None,
|
|
preserve_metadata=True, storage_class='STANDARD',
|
|
create_dest_bucket=True, max_concurrent=5):
|
|
"""
|
|
Start full S3 to S3 migration in background
|
|
"""
|
|
|
|
def migration_task():
|
|
result = self.migrate_bucket(
|
|
migration_id=migration_id,
|
|
source_config=source_config,
|
|
dest_config=dest_config,
|
|
source_bucket=source_bucket,
|
|
dest_bucket=dest_bucket,
|
|
prefix=prefix,
|
|
include_patterns=include_patterns,
|
|
exclude_patterns=exclude_patterns,
|
|
preserve_metadata=preserve_metadata,
|
|
storage_class=storage_class,
|
|
create_dest_bucket=create_dest_bucket,
|
|
max_concurrent=max_concurrent
|
|
)
|
|
with self._lock:
|
|
self.migrations[migration_id] = result
|
|
|
|
thread = threading.Thread(target=migration_task, daemon=True)
|
|
thread.start()
|
|
|
|
# Initialize migration entry
|
|
with self._lock:
|
|
self.migrations[migration_id] = {
|
|
'status': 'running',
|
|
'started_at': time.time(),
|
|
'source_bucket': source_bucket,
|
|
'dest_bucket': dest_bucket,
|
|
'prefix': prefix,
|
|
'source_config': {k: v[:10] + '...' if k in ['access_key_id', 'secret_access_key'] and v else None
|
|
for k, v in source_config.items()},
|
|
'dest_config': {k: v[:10] + '...' if k in ['access_key_id', 'secret_access_key'] and v else None
|
|
for k, v in dest_config.items()}
|
|
}
|
|
|
|
return migration_id
|
|
|
|
def migrate_bucket(self, migration_id, source_config, dest_config,
|
|
source_bucket, dest_bucket, prefix='',
|
|
include_patterns=None, exclude_patterns=None,
|
|
preserve_metadata=True, storage_class='STANDARD',
|
|
create_dest_bucket=True, max_concurrent=5):
|
|
"""
|
|
Migrate entire bucket from source to destination
|
|
"""
|
|
logs = []
|
|
|
|
def log(msg, level='info'):
|
|
log_entry = {
|
|
'timestamp': time.time(),
|
|
'message': msg,
|
|
'level': level
|
|
}
|
|
logs.append(log_entry)
|
|
print(f"[{migration_id}] {msg}")
|
|
|
|
try:
|
|
log(f"🔧 Starting S3 to S3 Migration {migration_id}")
|
|
log(f" 📤 Source: s3://{source_bucket}/{prefix}")
|
|
log(f" 📥 Destination: s3://{dest_bucket}/{prefix}")
|
|
log(f" ⚙️ Preserve metadata: {preserve_metadata}, Storage class: {storage_class}")
|
|
|
|
# Create destination bucket if needed
|
|
if create_dest_bucket:
|
|
dest_client = self.get_destination_client(
|
|
dest_config.get('access_key_id'),
|
|
dest_config.get('secret_access_key'),
|
|
dest_config.get('region', 'us-east-1'),
|
|
dest_config.get('endpoint_url'),
|
|
dest_config.get('session_token')
|
|
)
|
|
|
|
if dest_client:
|
|
exists_check = self.bucket_exists(
|
|
dest_bucket, is_source=False,
|
|
access_key_id=dest_config.get('access_key_id'),
|
|
secret_access_key=dest_config.get('secret_access_key'),
|
|
region=dest_config.get('region', 'us-east-1'),
|
|
endpoint_url=dest_config.get('endpoint_url'),
|
|
session_token=dest_config.get('session_token')
|
|
)
|
|
|
|
if not exists_check.get('exists'):
|
|
log(f"📦 Creating destination bucket: {dest_bucket}")
|
|
create_result = self.create_bucket(
|
|
dest_bucket,
|
|
region=dest_config.get('region', 'us-east-1'),
|
|
endpoint_url=dest_config.get('endpoint_url'),
|
|
access_key_id=dest_config.get('access_key_id'),
|
|
secret_access_key=dest_config.get('secret_access_key'),
|
|
session_token=dest_config.get('session_token')
|
|
)
|
|
|
|
if not create_result['success']:
|
|
log(f"❌ Failed to create destination bucket: {create_result['error']}", 'error')
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': f"Failed to create destination bucket: {create_result['error']}",
|
|
'logs': logs
|
|
}
|
|
log(f"✅ Destination bucket created/verified")
|
|
|
|
# List objects from source
|
|
log(f"🔍 Listing objects from source bucket: {source_bucket}/{prefix}")
|
|
list_result = self.list_objects(
|
|
bucket_name=source_bucket,
|
|
prefix=prefix,
|
|
max_keys=1000,
|
|
is_source=True,
|
|
access_key_id=source_config.get('access_key_id'),
|
|
secret_access_key=source_config.get('secret_access_key'),
|
|
region=source_config.get('region', 'us-east-1'),
|
|
endpoint_url=source_config.get('endpoint_url'),
|
|
session_token=source_config.get('session_token')
|
|
)
|
|
|
|
if not list_result['success']:
|
|
log(f"❌ Failed to list source bucket: {list_result['error']}", 'error')
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': f"Failed to list source bucket: {list_result['error']}",
|
|
'logs': logs
|
|
}
|
|
|
|
objects = list_result['objects']
|
|
total_objects = len(objects)
|
|
total_size = list_result['total_size']
|
|
|
|
log(f"📊 Found {total_objects} objects, total size: {self._format_size(total_size)}")
|
|
|
|
if total_objects == 0:
|
|
log(f"⚠️ No objects to migrate")
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'message': 'No objects to migrate',
|
|
'stats': {
|
|
'total_objects': 0,
|
|
'migrated': 0,
|
|
'failed': 0,
|
|
'total_size': 0
|
|
},
|
|
'logs': logs
|
|
}
|
|
|
|
# Filter objects if patterns provided
|
|
if include_patterns or exclude_patterns:
|
|
import fnmatch
|
|
filtered_objects = []
|
|
|
|
for obj in objects:
|
|
key = obj['key']
|
|
include = True
|
|
|
|
if include_patterns:
|
|
include = any(fnmatch.fnmatch(key, pattern) for pattern in include_patterns)
|
|
|
|
if exclude_patterns:
|
|
include = include and not any(fnmatch.fnmatch(key, pattern) for pattern in exclude_patterns)
|
|
|
|
if include:
|
|
filtered_objects.append(obj)
|
|
|
|
objects = filtered_objects
|
|
log(f"🎯 After filtering: {len(objects)} objects")
|
|
|
|
# Migrate objects in batches
|
|
successful = []
|
|
failed = []
|
|
total_bytes = 0
|
|
|
|
log(f"🚀 Starting migration of {len(objects)} objects...")
|
|
|
|
# Process in chunks
|
|
chunk_size = max_concurrent * 10
|
|
for i in range(0, len(objects), chunk_size):
|
|
chunk = objects[i:i + chunk_size]
|
|
log(f"📦 Processing chunk {i//chunk_size + 1}/{(len(objects)-1)//chunk_size + 1}")
|
|
|
|
batch_result = self.migrate_objects_batch(
|
|
objects=chunk,
|
|
source_bucket=source_bucket,
|
|
dest_bucket=dest_bucket,
|
|
source_credentials=source_config,
|
|
dest_credentials=dest_config,
|
|
preserve_metadata=preserve_metadata,
|
|
storage_class=storage_class,
|
|
max_concurrent=max_concurrent
|
|
)
|
|
|
|
for success in batch_result['results']['successful']:
|
|
successful.append(success)
|
|
total_bytes += success['result']['destination']['size']
|
|
|
|
for fail in batch_result['results']['failed']:
|
|
failed.append(fail)
|
|
|
|
log(f" ✅ Progress: {len(successful)}/{len(objects)} objects, "
|
|
f"📦 {self._format_size(total_bytes)}")
|
|
|
|
# Summary
|
|
log(f"📊 Migration Summary:")
|
|
log(f" ✅ Successful: {len(successful)} objects")
|
|
log(f" ❌ Failed: {len(failed)} objects")
|
|
log(f" 📦 Total size: {self._format_size(total_size)}")
|
|
log(f" ⏱️ Total time: {time.time() - self.migrations[migration_id]['started_at']:.2f} seconds")
|
|
|
|
return {
|
|
'success': len(failed) == 0,
|
|
'migration_id': migration_id,
|
|
'message': f"Migration completed: {len(successful)}/{total_objects} objects migrated",
|
|
'stats': {
|
|
'total_objects': total_objects,
|
|
'migrated': len(successful),
|
|
'failed': len(failed),
|
|
'total_size': total_size,
|
|
'migrated_size': total_bytes
|
|
},
|
|
'successful': successful[:100], # First 100 only
|
|
'failed': failed[:100], # First 100 only
|
|
'logs': logs
|
|
}
|
|
|
|
except Exception as e:
|
|
log(f"❌ Migration failed: {str(e)}", 'error')
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': str(e),
|
|
'logs': logs
|
|
}
|
|
|
|
# ==================== Migration Status ====================
|
|
|
|
def get_migration_status(self, migration_id):
|
|
"""Get status of a migration"""
|
|
with self._lock:
|
|
return self.migrations.get(migration_id)
|
|
"""List all migrations"""
|
|
with self._lock:
|
|
migrations_list = []
|
|
for mig_id, mig_data in self.migrations.items():
|
|
if isinstance(mig_data, dict) and 'success' in mig_data:
|
|
status = 'completed' if mig_data['success'] else 'failed'
|
|
elif isinstance(mig_data, dict) and mig_data.get('status') == 'running':
|
|
status = 'running'
|
|
else:
|
|
status = 'unknown'
|
|
|
|
migrations_list.append({
|
|
'id': mig_id,
|
|
'status': status,
|
|
'started_at': mig_data.get('started_at') if isinstance(mig_data, dict) else None,
|
|
'source_bucket': mig_data.get('source_bucket') if isinstance(mig_data, dict) else None,
|
|
'dest_bucket': mig_data.get('dest_bucket') if isinstance(mig_data, dict) else None,
|
|
'data': mig_data
|
|
})
|
|
|
|
# Sort by started_at descending
|
|
migrations_list.sort(key=lambda x: x.get('started_at', 0), reverse=True)
|
|
return migrations_list
|
|
def list_migrations(self):
|
|
"""List all migrations"""
|
|
with self._lock:
|
|
migrations_list = []
|
|
for mig_id, mig_data in self.migrations.items():
|
|
if isinstance(mig_data, dict) and 'success' in mig_data:
|
|
status = 'completed' if mig_data['success'] else 'failed'
|
|
elif isinstance(mig_data, dict) and mig_data.get('status') == 'running':
|
|
status = 'running'
|
|
else:
|
|
status = 'unknown'
|
|
|
|
migrations_list.append({
|
|
'id': mig_id,
|
|
'status': status,
|
|
'started_at': mig_data.get('started_at') if isinstance(mig_data, dict) else None,
|
|
'source_bucket': mig_data.get('source_bucket') if isinstance(mig_data, dict) else None,
|
|
'dest_bucket': mig_data.get('dest_bucket') if isinstance(mig_data, dict) else None,
|
|
'data': mig_data
|
|
})
|
|
|
|
# Sort by started_at descending
|
|
migrations_list.sort(key=lambda x: x.get('started_at', 0), reverse=True)
|
|
return migrations_list
|
|
def cancel_migration(self, migration_id):
|
|
"""Cancel a running migration"""
|
|
with self._lock:
|
|
if migration_id in self.migrations:
|
|
mig_data = self.migrations[migration_id]
|
|
if isinstance(mig_data, dict) and mig_data.get('status') == 'running':
|
|
mig_data['status'] = 'cancelled'
|
|
mig_data['cancelled_at'] = time.time()
|
|
return {'success': True, 'message': 'Migration cancelled'}
|
|
return {'success': False, 'error': 'Migration not found or not running'}
|
|
|
|
# ==================== Helper Functions ====================
|
|
|
|
def _format_size(self, size_bytes):
|
|
"""Format file size in human-readable format"""
|
|
if size_bytes == 0:
|
|
return '0 B'
|
|
|
|
size_names = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
|
|
i = 0
|
|
while size_bytes >= 1024 and i < len(size_names) - 1:
|
|
size_bytes /= 1024.0
|
|
i += 1
|
|
|
|
return f"{size_bytes:.2f} {size_names[i]}"
|
|
|
|
def parse_s3_uri(self, s3_uri):
|
|
"""Parse S3 URI into bucket and key"""
|
|
try:
|
|
if not s3_uri.startswith('s3://'):
|
|
s3_uri = 's3://' + s3_uri
|
|
|
|
parsed = urlparse(s3_uri)
|
|
bucket = parsed.netloc
|
|
key = parsed.path.lstrip('/') if parsed.path else ''
|
|
|
|
return {
|
|
'bucket': bucket,
|
|
'key': key,
|
|
'uri': s3_uri
|
|
}
|
|
except Exception as e:
|
|
return None
|
|
|
|
def generate_presigned_url(self, bucket, key, expiration=3600, is_source=True,
|
|
access_key_id=None, secret_access_key=None,
|
|
region='us-east-1', endpoint_url=None,
|
|
session_token=None):
|
|
"""Generate presigned URL for temporary access"""
|
|
try:
|
|
if is_source:
|
|
client = self.get_source_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
else:
|
|
client = self.get_destination_client(access_key_id, secret_access_key, region, endpoint_url, session_token)
|
|
|
|
if not client:
|
|
return {'success': False, 'error': 'Failed to create S3 client'}
|
|
|
|
url = client.generate_presigned_url(
|
|
'get_object',
|
|
Params={'Bucket': bucket, 'Key': key},
|
|
ExpiresIn=expiration
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'url': url,
|
|
'expires_in': expiration,
|
|
'bucket': bucket,
|
|
'key': key
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
# ============================================================================
|
|
# Part 3: PostgreSQL to S3 Migrator (PostgresToS3Migrator)
|
|
# ============================================================================
|
|
|
|
class PostgresToS3Migrator:
|
|
"""PostgreSQL to S3 migration engine"""
|
|
|
|
def __init__(self):
|
|
self.migrations = {}
|
|
self._lock = threading.Lock()
|
|
self.s3_clients = {}
|
|
|
|
def parse_postgres_uri(self, uri):
|
|
"""Parse PostgreSQL URI and extract connection parameters"""
|
|
try:
|
|
if not uri.startswith(('postgresql://', 'postgres://')):
|
|
uri = 'postgresql://' + uri
|
|
|
|
parsed = urlparse(uri)
|
|
|
|
return {
|
|
'host': parsed.hostname,
|
|
'port': parsed.port or 5432,
|
|
'user': parsed.username or '',
|
|
'password': parsed.password or '',
|
|
'database': parsed.path.lstrip('/') if parsed.path else 'postgres',
|
|
'uri': uri
|
|
}
|
|
except Exception as e:
|
|
return None
|
|
|
|
def parse_uri_to_env(self, uri, prefix=''):
|
|
"""Parse URI to environment variables format"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return None
|
|
|
|
env_vars = {
|
|
f'{prefix}DB_HOST': parsed['host'],
|
|
f'{prefix}DB_PORT': str(parsed['port']),
|
|
f'{prefix}DB_USER': parsed['user'],
|
|
f'{prefix}DB_PASSWORD': parsed['password'],
|
|
f'{prefix}DB_NAME': parsed['database'],
|
|
f'{prefix}DB_URI': parsed['uri']
|
|
}
|
|
|
|
# Create connection string variants
|
|
env_vars[f'{prefix}DB_CONNECTION_STRING'] = f"host={parsed['host']} port={parsed['port']} dbname={parsed['database']} user={parsed['user']} password={parsed['password']}"
|
|
env_vars[f'{prefix}DB_URL'] = parsed['uri']
|
|
|
|
return env_vars
|
|
def get_live_migration_progress(self, migration_id):
|
|
"""الحصول على التقدم المباشر للترحيل من PostgreSQL إلى S3"""
|
|
with self._lock:
|
|
if migration_id not in self.migrations:
|
|
return {'success': False, 'error': 'Migration not found'}
|
|
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
if mig_data.get('status') != 'running':
|
|
return self._get_completed_migration_stats(migration_id)
|
|
|
|
if 'progress_tracker' in mig_data:
|
|
return self._get_live_tracker_progress(migration_id)
|
|
|
|
return self._get_estimated_progress(migration_id)
|
|
|
|
def _get_live_tracker_progress(self, migration_id):
|
|
"""الحصول على التقدم من المتتبع المباشر"""
|
|
mig_data = self.migrations[migration_id]
|
|
tracker = mig_data['progress_tracker']
|
|
|
|
current_time = time.time()
|
|
elapsed_time = current_time - tracker['start_time']
|
|
|
|
# حساب النسب المئوية
|
|
tables_percentage = (tracker['processed_tables'] / tracker['total_tables'] * 100) if tracker['total_tables'] > 0 else 0
|
|
size_percentage = (tracker['processed_size'] / tracker['total_size'] * 100) if tracker['total_size'] > 0 else 0
|
|
rows_percentage = (tracker['processed_rows'] / tracker['total_rows'] * 100) if tracker['total_rows'] > 0 else 0
|
|
|
|
# حساب السرعة الحالية
|
|
recent_samples = [s for s in tracker.get('speed_samples', []) if current_time - s['time'] < 10]
|
|
if recent_samples:
|
|
recent_size = sum(s['size'] for s in recent_samples)
|
|
recent_time = recent_samples[-1]['time'] - recent_samples[0]['time'] if len(recent_samples) > 1 else 1
|
|
current_speed = recent_size / recent_time if recent_time > 0 else 0
|
|
else:
|
|
current_speed = tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0
|
|
|
|
# ETA بناءً على السرعة الحالية
|
|
remaining_size = tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0)
|
|
eta_seconds = remaining_size / current_speed if current_speed > 0 else 0
|
|
|
|
# تفاصيل الجدول الحالي
|
|
current_table_info = None
|
|
if tracker.get('current_table'):
|
|
ct = tracker['current_table']
|
|
current_table_info = {
|
|
'name': ct['name'],
|
|
'size': ct.get('size', 0),
|
|
'size_formatted': self._format_size(ct.get('size', 0)),
|
|
'rows': ct.get('rows', 0),
|
|
'rows_processed': ct.get('rows_processed', 0),
|
|
'rows_percentage': round((ct.get('rows_processed', 0) / max(ct.get('rows', 1), 1)) * 100, 2),
|
|
'stage': ct.get('stage', 'querying'), # querying, compressing, uploading
|
|
'progress': ct.get('progress', 0),
|
|
'elapsed': current_time - ct.get('start_time', current_time),
|
|
'elapsed_formatted': self._format_time(current_time - ct.get('start_time', current_time))
|
|
}
|
|
|
|
# تجميع النتائج
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'status': 'running',
|
|
'type': 'live',
|
|
'timestamp': current_time,
|
|
|
|
'total': {
|
|
'tables': tracker['total_tables'],
|
|
'size': tracker['total_size'],
|
|
'size_formatted': self._format_size(tracker['total_size']),
|
|
'rows': tracker.get('total_rows', 0)
|
|
},
|
|
|
|
'processed': {
|
|
'tables': tracker['processed_tables'],
|
|
'size': tracker['processed_size'],
|
|
'size_formatted': self._format_size(tracker['processed_size']),
|
|
'rows': tracker.get('processed_rows', 0)
|
|
},
|
|
|
|
'failed': {
|
|
'tables': tracker.get('failed_tables', 0),
|
|
'size': tracker.get('failed_size', 0),
|
|
'size_formatted': self._format_size(tracker.get('failed_size', 0)),
|
|
'rows': tracker.get('failed_rows', 0)
|
|
},
|
|
|
|
'remaining': {
|
|
'tables': tracker['total_tables'] - tracker['processed_tables'] - tracker.get('failed_tables', 0),
|
|
'size': tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0),
|
|
'size_formatted': self._format_size(tracker['total_size'] - tracker['processed_size'] - tracker.get('failed_size', 0))
|
|
},
|
|
|
|
'percentages': {
|
|
'tables': round(tables_percentage, 2),
|
|
'size': round(size_percentage, 2),
|
|
'rows': round(rows_percentage, 2) if tracker.get('total_rows', 0) > 0 else 0
|
|
},
|
|
|
|
'time': {
|
|
'elapsed': elapsed_time,
|
|
'elapsed_formatted': self._format_time(elapsed_time),
|
|
'eta': eta_seconds,
|
|
'eta_formatted': self._format_time(eta_seconds)
|
|
},
|
|
|
|
'speed': {
|
|
'current': current_speed,
|
|
'current_formatted': f"{self._format_size(current_speed)}/s",
|
|
'average': tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0,
|
|
'average_formatted': f"{self._format_size(tracker['processed_size'] / elapsed_time if elapsed_time > 0 else 0)}/s",
|
|
'peak': max((s['size'] for s in tracker.get('speed_samples', [])), default=0),
|
|
'peak_formatted': self._format_size(max((s['size'] for s in tracker.get('speed_samples', [])), default=0))
|
|
},
|
|
|
|
'current_table': current_table_info,
|
|
|
|
'recent_exports': tracker.get('tables_details', [])[-5:],
|
|
|
|
's3_info': {
|
|
'bucket': mig_data.get('s3_bucket'),
|
|
'prefix': mig_data.get('s3_prefix'),
|
|
'region': mig_data.get('region', 'us-east-1'),
|
|
'endpoint': mig_data.get('endpoint_url')
|
|
},
|
|
|
|
'format': {
|
|
'type': tracker.get('format', 'csv'),
|
|
'compressed': tracker.get('compress', True)
|
|
},
|
|
|
|
'progress_bars': {
|
|
'tables': self._format_progress_bar(tables_percentage),
|
|
'size': self._format_progress_bar(size_percentage),
|
|
'rows': self._format_progress_bar(rows_percentage) if tracker.get('total_rows', 0) > 0 else None
|
|
}
|
|
}
|
|
|
|
def _get_estimated_progress(self, migration_id):
|
|
"""تقدير التقدم للترحيل الجاري"""
|
|
mig_data = self.migrations[migration_id]
|
|
elapsed_time = time.time() - mig_data['started_at']
|
|
|
|
# تقدير عدد وحجم الجداول
|
|
estimated_total_tables = mig_data.get('estimated_tables', 10)
|
|
estimated_total_size = mig_data.get('estimated_size', 100 * 1024 * 1024)
|
|
|
|
# تقدير التقدم (افترض 5MB/s كسرعة متوسطة)
|
|
estimated_duration = estimated_total_size / (5 * 1024 * 1024)
|
|
progress_percentage = min((elapsed_time / estimated_duration) * 100, 99) if estimated_duration > 0 else 0
|
|
|
|
tables_exported = int(estimated_total_tables * progress_percentage / 100)
|
|
size_exported = int(estimated_total_size * progress_percentage / 100)
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'status': 'running',
|
|
'type': 'estimated',
|
|
'timestamp': time.time(),
|
|
'estimated': True,
|
|
|
|
'total': {
|
|
'tables': estimated_total_tables,
|
|
'size': estimated_total_size,
|
|
'size_formatted': self._format_size(estimated_total_size)
|
|
},
|
|
|
|
'processed': {
|
|
'tables': tables_exported,
|
|
'size': size_exported,
|
|
'size_formatted': self._format_size(size_exported)
|
|
},
|
|
|
|
'percentages': {
|
|
'tables': round(progress_percentage, 2),
|
|
'size': round(progress_percentage, 2)
|
|
},
|
|
|
|
'time': {
|
|
'elapsed': elapsed_time,
|
|
'elapsed_formatted': self._format_time(elapsed_time)
|
|
},
|
|
|
|
'progress_bars': {
|
|
'main': self._format_progress_bar(progress_percentage)
|
|
},
|
|
|
|
'note': 'تقدير تقريبي - يتم جمع معلومات دقيقة عن الجداول...'
|
|
}
|
|
|
|
def _get_completed_migration_stats(self, migration_id):
|
|
"""الحصول على إحصائيات الترحيل المكتمل"""
|
|
mig_data = self.migrations[migration_id]
|
|
|
|
stats = mig_data.get('stats', {})
|
|
successful_exports = mig_data.get('successful_exports', [])
|
|
failed_exports = mig_data.get('failed_exports', [])
|
|
|
|
total_tables = stats.get('total_tables', 0)
|
|
successful = stats.get('successful_exports', 0)
|
|
failed = stats.get('failed_exports', 0)
|
|
total_size = stats.get('total_size', 0)
|
|
|
|
execution_time = mig_data.get('execution_time', time.time() - mig_data.get('started_at', time.time()))
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'status': 'completed' if mig_data.get('success') else 'failed',
|
|
'type': 'completed',
|
|
'timestamp': time.time(),
|
|
|
|
'total': {
|
|
'tables': total_tables,
|
|
'size': total_size,
|
|
'size_formatted': self._format_size(total_size)
|
|
},
|
|
|
|
'processed': {
|
|
'tables': successful,
|
|
'size': total_size,
|
|
'size_formatted': self._format_size(total_size)
|
|
},
|
|
|
|
'failed': {
|
|
'tables': failed,
|
|
'size': 0,
|
|
'size_formatted': '0 B'
|
|
},
|
|
|
|
'percentages': {
|
|
'tables': 100.0 if successful == total_tables else round((successful / total_tables * 100) if total_tables > 0 else 0, 2),
|
|
'size': 100.0
|
|
},
|
|
|
|
'time': {
|
|
'total': execution_time,
|
|
'total_formatted': self._format_time(execution_time),
|
|
'average_speed': total_size / execution_time if execution_time > 0 else 0,
|
|
'average_speed_formatted': f"{self._format_size(total_size / execution_time if execution_time > 0 else 0)}/s"
|
|
},
|
|
|
|
'successful_exports': successful_exports[-10:],
|
|
'failed_exports': failed_exports,
|
|
|
|
's3_config': mig_data.get('s3_config', {})
|
|
}
|
|
|
|
def start_migration_with_progress(self, migration_id, postgres_uri, s3_bucket, s3_prefix='',
|
|
schemas=None, tables=None, compress=True, format='csv',
|
|
access_key_id=None, secret_access_key=None,
|
|
region='us-east-1', endpoint_url=None):
|
|
"""بدء الترحيل مع تتبع التقدم المباشر"""
|
|
|
|
# تهيئة متتبع التقدم
|
|
self._init_progress_tracker(migration_id, postgres_uri, schemas, tables,
|
|
compress, format, region, endpoint_url)
|
|
|
|
def migration_task():
|
|
try:
|
|
result = self._run_migration_with_progress(
|
|
migration_id, postgres_uri, s3_bucket, s3_prefix,
|
|
schemas, tables, compress, format,
|
|
access_key_id, secret_access_key, region, endpoint_url
|
|
)
|
|
with self._lock:
|
|
self.migrations[migration_id].update(result)
|
|
self.migrations[migration_id]['status'] = 'completed' if result['success'] else 'failed'
|
|
self.migrations[migration_id]['completed_at'] = time.time()
|
|
except Exception as e:
|
|
with self._lock:
|
|
self.migrations[migration_id]['status'] = 'failed'
|
|
self.migrations[migration_id]['error'] = str(e)
|
|
|
|
thread = threading.Thread(target=migration_task, daemon=True)
|
|
thread.start()
|
|
|
|
return migration_id
|
|
|
|
def _init_progress_tracker(self, migration_id, postgres_uri, schemas, tables, compress, format, region, endpoint_url):
|
|
"""تهيئة متتبع التقدم بالبيانات الفعلية"""
|
|
|
|
# الحصول على قائمة الجداول وأحجامها
|
|
tables_list = []
|
|
total_size = 0
|
|
total_rows = 0
|
|
|
|
if tables:
|
|
for table in tables:
|
|
if '.' in table:
|
|
schema, name = table.split('.', 1)
|
|
else:
|
|
schema = 'public'
|
|
name = table
|
|
|
|
# تقدير حجم الجدول (صعب مع PostgreSQL)
|
|
size = 10 * 1024 * 1024 # تقدير افتراضي 10MB
|
|
rows = self._get_table_row_count(postgres_uri, schema, name) or 0
|
|
|
|
tables_list.append({
|
|
'schema': schema,
|
|
'name': name,
|
|
'size': size,
|
|
'rows': rows,
|
|
'status': 'pending'
|
|
})
|
|
total_size += size
|
|
total_rows += rows
|
|
|
|
elif schemas:
|
|
for schema in schemas:
|
|
result = self.get_tables(postgres_uri, schema)
|
|
if result['success']:
|
|
for table_info in result['tables']:
|
|
rows = self._get_table_row_count(postgres_uri, schema, table_info['name']) or 0
|
|
tables_list.append({
|
|
'schema': schema,
|
|
'name': table_info['name'],
|
|
'size': 10 * 1024 * 1024, # تقدير افتراضي
|
|
'rows': rows,
|
|
'status': 'pending'
|
|
})
|
|
total_size += 10 * 1024 * 1024
|
|
total_rows += rows
|
|
|
|
else:
|
|
result = self.get_tables(postgres_uri)
|
|
if result['success']:
|
|
for table_info in result['tables']:
|
|
rows = self._get_table_row_count(postgres_uri, table_info['schema'], table_info['name']) or 0
|
|
tables_list.append({
|
|
'schema': table_info['schema'],
|
|
'name': table_info['name'],
|
|
'size': 10 * 1024 * 1024, # تقدير افتراضي
|
|
'rows': rows,
|
|
'status': 'pending'
|
|
})
|
|
total_size += 10 * 1024 * 1024
|
|
total_rows += rows
|
|
|
|
with self._lock:
|
|
self.migrations[migration_id] = {
|
|
'status': 'running',
|
|
'started_at': time.time(),
|
|
'postgres_uri': postgres_uri,
|
|
's3_bucket': s3_bucket,
|
|
's3_prefix': s3_prefix,
|
|
'region': region,
|
|
'endpoint_url': endpoint_url,
|
|
'estimated_tables': len(tables_list),
|
|
'estimated_size': total_size,
|
|
'progress_tracker': {
|
|
'total_tables': len(tables_list),
|
|
'total_size': total_size,
|
|
'total_rows': total_rows,
|
|
'processed_tables': 0,
|
|
'processed_size': 0,
|
|
'processed_rows': 0,
|
|
'failed_tables': 0,
|
|
'failed_size': 0,
|
|
'failed_rows': 0,
|
|
'current_table': None,
|
|
'start_time': time.time(),
|
|
'last_update': time.time(),
|
|
'tables_list': tables_list,
|
|
'tables_details': [],
|
|
'speed_samples': [],
|
|
'format': format,
|
|
'compress': compress
|
|
}
|
|
}
|
|
|
|
def _run_migration_with_progress(self, migration_id, postgres_uri, s3_bucket, s3_prefix,
|
|
schemas, tables, compress, format,
|
|
access_key_id, secret_access_key, region, endpoint_url):
|
|
"""تشغيل الترحيل مع تتبع التقدم"""
|
|
|
|
successful_exports = []
|
|
failed_exports = []
|
|
|
|
with self._lock:
|
|
tracker = self.migrations[migration_id]['progress_tracker']
|
|
tables_list = tracker['tables_list']
|
|
|
|
for i, table_info in enumerate(tables_list, 1):
|
|
table_name = f"{table_info['schema']}.{table_info['name']}"
|
|
|
|
# تحديث معلومات الجدول الحالي
|
|
current_table = {
|
|
'name': table_name,
|
|
'size': table_info['size'],
|
|
'rows': table_info['rows'],
|
|
'rows_processed': 0,
|
|
'stage': 'querying',
|
|
'progress': 0,
|
|
'start_time': time.time()
|
|
}
|
|
|
|
with self._lock:
|
|
self.migrations[migration_id]['progress_tracker']['current_table'] = current_table
|
|
|
|
print(f"[{migration_id}] 📊 Exporting table {i}/{len(tables_list)}: {table_name}")
|
|
|
|
# تصدير الجدول مع تتبع التقدم
|
|
result = self._export_table_with_progress(
|
|
migration_id, postgres_uri,
|
|
table_info['schema'], table_info['name'],
|
|
s3_bucket, s3_prefix, compress, format,
|
|
access_key_id, secret_access_key, region, endpoint_url,
|
|
table_info['rows']
|
|
)
|
|
|
|
if result['success']:
|
|
# تحديث الإحصائيات
|
|
with self._lock:
|
|
tracker = self.migrations[migration_id]['progress_tracker']
|
|
tracker['processed_tables'] += 1
|
|
tracker['processed_size'] += result['file_size']
|
|
tracker['processed_rows'] += result.get('rows_exported', 0)
|
|
|
|
# إضافة تفاصيل التصدير
|
|
tracker['tables_details'].append({
|
|
'name': table_name,
|
|
'size': result['file_size'],
|
|
'size_formatted': self._format_size(result['file_size']),
|
|
'rows': result.get('rows_exported', 0),
|
|
'query_time': result.get('query_time', 0),
|
|
'upload_time': result.get('upload_time', 0),
|
|
'total_time': result.get('query_time', 0) + result.get('upload_time', 0),
|
|
'speed': result['file_size'] / (result.get('upload_time', 1) or 1),
|
|
'compressed': result.get('compressed', False),
|
|
'format': format,
|
|
'timestamp': time.time()
|
|
})
|
|
|
|
# إضافة عينة سرعة
|
|
tracker['speed_samples'].append({
|
|
'time': time.time(),
|
|
'size': result['file_size'],
|
|
'tables': 1
|
|
})
|
|
|
|
successful_exports.append({
|
|
'table': table_name,
|
|
's3_key': result['s3_key'],
|
|
'size': result['file_size'],
|
|
's3_url': result['s3_url']
|
|
})
|
|
|
|
print(f"[{migration_id}] ✅ Exported {table_name} - {self._format_size(result['file_size'])}")
|
|
|
|
else:
|
|
# تحديث الإحصائيات الفاشلة
|
|
with self._lock:
|
|
tracker = self.migrations[migration_id]['progress_tracker']
|
|
tracker['failed_tables'] += 1
|
|
tracker['failed_size'] += table_info['size']
|
|
|
|
failed_exports.append({
|
|
'table': table_name,
|
|
'error': result.get('error', 'Unknown error')
|
|
})
|
|
|
|
print(f"[{migration_id}] ❌ Failed {table_name}: {result.get('error')}")
|
|
|
|
# حساب الإحصائيات النهائية
|
|
total_size = sum(e['size'] for e in successful_exports)
|
|
|
|
return {
|
|
'success': len(failed_exports) == 0,
|
|
'migration_id': migration_id,
|
|
'execution_time': time.time() - self.migrations[migration_id]['started_at'],
|
|
'stats': {
|
|
'total_tables': len(tables_list),
|
|
'successful_exports': len(successful_exports),
|
|
'failed_exports': len(failed_exports),
|
|
'total_size': total_size
|
|
},
|
|
'successful_exports': successful_exports,
|
|
'failed_exports': failed_exports,
|
|
's3_config': {
|
|
'bucket': s3_bucket,
|
|
'prefix': s3_prefix,
|
|
'region': region,
|
|
'endpoint': endpoint_url
|
|
}
|
|
}
|
|
|
|
def _export_table_with_progress(self, migration_id, postgres_uri, schema, table,
|
|
s3_bucket, s3_prefix, compress, format,
|
|
access_key_id, secret_access_key, region, endpoint_url,
|
|
estimated_rows=0):
|
|
"""تصدير جدول مع تتبع التقدم"""
|
|
|
|
try:
|
|
start_time = time.time()
|
|
|
|
# تحديث المرحلة: الاستعلام
|
|
self._update_table_stage(migration_id, 'querying', 10)
|
|
|
|
# تصدير الجدول
|
|
result = self.export_table_to_s3(
|
|
postgres_uri=postgres_uri,
|
|
schema=schema,
|
|
table=table,
|
|
s3_bucket=s3_bucket,
|
|
s3_key=f"{s3_prefix}{schema}_{table}_{int(time.time())}.{format}{'.gz' if compress else ''}",
|
|
compress=compress,
|
|
format=format,
|
|
access_key_id=access_key_id,
|
|
secret_access_key=secret_access_key,
|
|
region=region,
|
|
endpoint_url=endpoint_url
|
|
)
|
|
|
|
if result['success']:
|
|
result['rows_exported'] = estimated_rows
|
|
return result
|
|
else:
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def _update_table_stage(self, migration_id, stage, progress):
|
|
"""تحديث مرحلة تصدير الجدول الحالي"""
|
|
with self._lock:
|
|
if migration_id in self.migrations:
|
|
tracker = self.migrations[migration_id].get('progress_tracker')
|
|
if tracker and tracker.get('current_table'):
|
|
tracker['current_table']['stage'] = stage
|
|
tracker['current_table']['progress'] = progress
|
|
tracker['last_update'] = time.time()
|
|
|
|
def _get_table_row_count(self, uri, schema, table):
|
|
"""الحصول على عدد صفوف الجدول"""
|
|
try:
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return None
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
sql = f'SELECT COUNT(*) FROM "{schema}"."{table}";'
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
count_line = result.stdout.strip()
|
|
if count_line and count_line.isdigit():
|
|
return int(count_line)
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
def format_live_status(self, migration_id):
|
|
"""تنسيق الحالة المباشرة كنص مقروء"""
|
|
progress = self.get_live_migration_progress(migration_id)
|
|
|
|
if not progress.get('success', True):
|
|
return f"❌ Error: {progress.get('error', 'Unknown error')}"
|
|
|
|
lines = []
|
|
lines.append("=" * 70)
|
|
lines.append(f"🚀 PostgreSQL to S3 Migration {migration_id} - {progress['status'].upper()}")
|
|
lines.append("=" * 70)
|
|
|
|
if progress.get('type') == 'estimated':
|
|
lines.append("⚠️ تقدير تقريبي - يتم جمع معلومات دقيقة...")
|
|
|
|
# معلومات S3
|
|
if 's3_info' in progress:
|
|
s3 = progress['s3_info']
|
|
lines.append(f"\n📦 S3 Destination: s3://{s3['bucket']}/{s3['prefix']}")
|
|
lines.append(f" Region: {s3['region']}")
|
|
|
|
# شريط التقدم الرئيسي
|
|
if 'progress_bars' in progress:
|
|
if 'size' in progress['progress_bars']:
|
|
lines.append(f"\n📊 Progress: {progress['progress_bars']['size']}")
|
|
|
|
# إحصائيات الجداول
|
|
lines.append(f"\n📋 Tables:")
|
|
lines.append(f" Total: {progress['total']['tables']} tables")
|
|
lines.append(f" Exported: {progress['processed']['tables']} tables ({progress['percentages']['tables']}%)")
|
|
if progress.get('failed', {}).get('tables', 0) > 0:
|
|
lines.append(f" ❌ Failed: {progress['failed']['tables']} tables")
|
|
|
|
# إحصائيات الحجم
|
|
lines.append(f"\n💾 Size:")
|
|
lines.append(f" Total: {progress['total']['size_formatted']}")
|
|
lines.append(f" Exported: {progress['processed']['size_formatted']} ({progress['percentages']['size']}%)")
|
|
if progress.get('remaining', {}).get('size', 0) > 0:
|
|
lines.append(f" Remaining: {progress['remaining']['size_formatted']}")
|
|
|
|
# معلومات الصفوف
|
|
if progress['total'].get('rows', 0) > 0:
|
|
lines.append(f"\n📊 Rows:")
|
|
lines.append(f" Total: {progress['total']['rows']:,} rows")
|
|
lines.append(f" Exported: {progress['processed']['rows']:,} rows ({progress['percentages']['rows']}%)")
|
|
|
|
# معلومات السرعة والوقت
|
|
lines.append(f"\n⏱️ Time:")
|
|
lines.append(f" Elapsed: {progress['time']['elapsed_formatted']}")
|
|
if 'eta' in progress['time']:
|
|
lines.append(f" ETA: {progress['time']['eta_formatted']}")
|
|
|
|
lines.append(f"\n⚡ Speed:")
|
|
lines.append(f" Current: {progress['speed']['current_formatted']}")
|
|
lines.append(f" Average: {progress['speed']['average_formatted']}")
|
|
|
|
# الجدول الحالي
|
|
if progress.get('current_table'):
|
|
ct = progress['current_table']
|
|
lines.append(f"\n📄 Current Table: {ct['name']}")
|
|
lines.append(f" Size: {ct['size_formatted']}")
|
|
lines.append(f" Stage: {ct['stage'].upper()}")
|
|
if ct.get('rows', 0) > 0:
|
|
lines.append(f" Rows: {ct['rows_processed']}/{ct['rows']} ({ct['rows_percentage']}%)")
|
|
lines.append(f" Elapsed: {ct['elapsed_formatted']}")
|
|
|
|
# آخر التصديرات الناجحة
|
|
if progress.get('recent_exports'):
|
|
lines.append(f"\n✅ Recent Exports:")
|
|
for exp in progress['recent_exports'][-3:]:
|
|
lines.append(f" • {exp['name']} - {exp['size_formatted']}")
|
|
|
|
lines.append("\n" + "=" * 70)
|
|
|
|
return '\n'.join(lines)
|
|
|
|
def _format_size(self, size_bytes):
|
|
"""تنسيق حجم الملف"""
|
|
if size_bytes == 0:
|
|
return '0 B'
|
|
|
|
size_names = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
|
|
i = 0
|
|
while size_bytes >= 1024 and i < len(size_names) - 1:
|
|
size_bytes /= 1024.0
|
|
i += 1
|
|
|
|
return f"{size_bytes:.2f} {size_names[i]}"
|
|
|
|
def _format_time(self, seconds):
|
|
"""تنسيق الوقت"""
|
|
if seconds < 60:
|
|
return f"{seconds:.0f}s"
|
|
elif seconds < 3600:
|
|
minutes = seconds / 60
|
|
return f"{minutes:.1f}m"
|
|
elif seconds < 86400:
|
|
hours = seconds / 3600
|
|
return f"{hours:.1f}h"
|
|
else:
|
|
days = seconds / 86400
|
|
return f"{days:.1f}d"
|
|
|
|
def _format_progress_bar(self, percentage, width=30):
|
|
"""تنسيق شريط التقدم كنص"""
|
|
filled = int(width * percentage / 100)
|
|
empty = width - filled
|
|
bar = '█' * filled + '░' * empty
|
|
return f"[{bar}] {percentage:.1f}%"
|
|
def get_s3_client(self, access_key_id, secret_access_key, region='us-east-1', endpoint_url=None):
|
|
"""Get or create S3 client with given credentials"""
|
|
cache_key = f"{access_key_id}:{secret_access_key}:{region}:{endpoint_url}"
|
|
|
|
if cache_key in self.s3_clients:
|
|
return self.s3_clients[cache_key]
|
|
|
|
try:
|
|
# Create S3 client configuration
|
|
s3_config = {
|
|
'aws_access_key_id': access_key_id,
|
|
'aws_secret_access_key': secret_access_key,
|
|
'region_name': region
|
|
}
|
|
|
|
# Add endpoint if provided (for S3-compatible services like MinIO)
|
|
if endpoint_url:
|
|
s3_config['endpoint_url'] = endpoint_url
|
|
|
|
# Create S3 client
|
|
client = boto3.client('s3', **s3_config)
|
|
|
|
# Cache the client
|
|
self.s3_clients[cache_key] = client
|
|
return client
|
|
|
|
except Exception as e:
|
|
print(f"❌ Failed to create S3 client: {str(e)}")
|
|
return None
|
|
|
|
def test_s3_connection(self, access_key_id, secret_access_key, region='us-east-1', endpoint_url=None):
|
|
"""Test S3 connection with the four parameters"""
|
|
# التحقق من بيانات الاعتماد
|
|
if not access_key_id or not secret_access_key:
|
|
print(f"❌ AWS credentials are required")
|
|
print(f" 🔸 Access Key ID: {'Missing' if not access_key_id else 'Provided'}")
|
|
print(f" 🔸 Secret Key: {'Missing' if not secret_access_key else 'Provided'}")
|
|
return {
|
|
'success': False,
|
|
'error': 'AWS credentials are required',
|
|
'details': {
|
|
'access_key_id_provided': bool(access_key_id),
|
|
'secret_key_provided': bool(secret_access_key)
|
|
}
|
|
}
|
|
|
|
print(f"🔍 Testing S3 connection...")
|
|
print(f" 🔸 Access Key ID: {access_key_id[:10]}...")
|
|
print(f" 🔸 Secret Key: {secret_access_key[:10]}...")
|
|
print(f" 🔸 Region: {region}")
|
|
print(f" 🔸 Endpoint URL: {endpoint_url or 'AWS S3 (default)'}")
|
|
|
|
try:
|
|
# Get S3 client
|
|
s3_client = self.get_s3_client(access_key_id, secret_access_key, region, endpoint_url)
|
|
if not s3_client:
|
|
return {
|
|
'success': False,
|
|
'error': 'Failed to create S3 client'
|
|
}
|
|
|
|
# Test connection by listing buckets
|
|
start_time = time.time()
|
|
response = s3_client.list_buckets()
|
|
end_time = time.time()
|
|
|
|
response_time = end_time - start_time
|
|
|
|
buckets = [bucket['Name'] for bucket in response.get('Buckets', [])]
|
|
|
|
print(f"✅ S3 Connection Successful!")
|
|
print(f" 🔸 Response time: {response_time:.2f} seconds")
|
|
print(f" 🔸 Available buckets: {len(buckets)}")
|
|
|
|
if buckets:
|
|
print(f" 🔸 Buckets: {', '.join(buckets[:5])}{'...' if len(buckets) > 5 else ''}")
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'S3 connection successful',
|
|
'buckets': buckets,
|
|
'bucket_count': len(buckets),
|
|
'response_time': response_time,
|
|
'connection_details': {
|
|
'region': region,
|
|
'endpoint': endpoint_url or 'AWS S3 (default)',
|
|
'has_credentials': bool(access_key_id and secret_access_key)
|
|
}
|
|
}
|
|
|
|
except NoCredentialsError:
|
|
error_msg = "No credentials provided or credentials are invalid"
|
|
print(f"❌ {error_msg}")
|
|
return {
|
|
'success': False,
|
|
'error': error_msg
|
|
}
|
|
|
|
except EndpointConnectionError as e:
|
|
error_msg = f"Cannot connect to endpoint: {endpoint_url}"
|
|
print(f"❌ {error_msg}")
|
|
return {
|
|
'success': False,
|
|
'error': error_msg,
|
|
'details': str(e)
|
|
}
|
|
|
|
except ClientError as e:
|
|
error_code = e.response['Error']['Code']
|
|
error_msg = e.response['Error']['Message']
|
|
print(f"❌ AWS Error: {error_code} - {error_msg}")
|
|
return {
|
|
'success': False,
|
|
'error': f"{error_code}: {error_msg}",
|
|
'aws_error': error_code
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error: {str(e)}"
|
|
print(f"❌ {error_msg}")
|
|
return {
|
|
'success': False,
|
|
'error': error_msg
|
|
}
|
|
|
|
def list_s3_buckets(self, access_key_id, secret_access_key, region='us-east-1', endpoint_url=None):
|
|
"""List all S3 buckets with details"""
|
|
try:
|
|
s3_client = self.get_s3_client(access_key_id, secret_access_key, region, endpoint_url)
|
|
if not s3_client:
|
|
return {
|
|
'success': False,
|
|
'error': 'Failed to create S3 client'
|
|
}
|
|
|
|
response = s3_client.list_buckets()
|
|
buckets_info = []
|
|
|
|
for bucket in response.get('Buckets', []):
|
|
bucket_name = bucket['Name']
|
|
creation_date = bucket['CreationDate']
|
|
|
|
# Get bucket location (region)
|
|
try:
|
|
location = s3_client.get_bucket_location(Bucket=bucket_name)
|
|
bucket_region = location.get('LocationConstraint', 'us-east-1')
|
|
if not bucket_region:
|
|
bucket_region = 'us-east-1'
|
|
except:
|
|
bucket_region = region
|
|
|
|
buckets_info.append({
|
|
'name': bucket_name,
|
|
'creation_date': creation_date.isoformat() if hasattr(creation_date, 'isoformat') else str(creation_date),
|
|
'region': bucket_region
|
|
})
|
|
|
|
return {
|
|
'success': True,
|
|
'buckets': buckets_info,
|
|
'count': len(buckets_info)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
def test_postgres_connection(self, uri_input):
|
|
"""Test PostgreSQL connection with detailed error info"""
|
|
|
|
if isinstance(uri_input, dict):
|
|
print(f"⚠️ Received JSON object, extracting URI string...")
|
|
|
|
if 'uri' in uri_input:
|
|
uri = uri_input['uri']
|
|
print(f"✅ Extracted URI from 'uri' key")
|
|
elif 'url' in uri_input:
|
|
uri = uri_input['url']
|
|
print(f"✅ Extracted URI from 'url' key")
|
|
else:
|
|
error_msg = f'JSON must contain "uri" or "url" key. Received keys: {list(uri_input.keys())}'
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
else:
|
|
uri = uri_input
|
|
|
|
if not isinstance(uri, str):
|
|
error_msg = f'URI must be a string. Got type: {type(uri)}'
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
print(f"🔍 Testing PostgreSQL connection to: {uri}")
|
|
parsed = self.parse_postgres_uri(uri)
|
|
|
|
if not parsed:
|
|
error_msg = f'Invalid URI format: {uri}'
|
|
print(f"❌ {error_msg}")
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
host = parsed['host']
|
|
port = parsed['port']
|
|
user = parsed['user']
|
|
database = parsed['database']
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', host,
|
|
'-p', str(port),
|
|
'-U', user,
|
|
'-d', database,
|
|
'-c', 'SELECT version(); SELECT current_database();',
|
|
'-t'
|
|
]
|
|
|
|
try:
|
|
print(f" 🔗 Attempting connection to host: {host}:{port}")
|
|
print(f" 👤 User: {user}, Database: {database}")
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
lines = result.stdout.strip().split('\n')
|
|
version = lines[0] if len(lines) > 0 else ''
|
|
db_name = lines[1] if len(lines) > 1 else ''
|
|
|
|
print(f"✅ Connection successful to {host}:{port}")
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Connection successful',
|
|
'version': version,
|
|
'database': db_name,
|
|
'connection': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
else:
|
|
error_output = result.stderr.strip()
|
|
print(f"❌ Connection FAILED to host: {host}:{port}")
|
|
print(f" 🔸 User: {user}")
|
|
print(f" 🔸 Database: {database}")
|
|
print(f" 🔸 Error details: {error_output[:200]}")
|
|
|
|
return {
|
|
'success': False,
|
|
'error': error_output,
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f"❌ Connection TIMEOUT to host: {host}:{port}")
|
|
print(f" 🔸 Server not responding after 10 seconds")
|
|
print(f" 🔸 Please check: firewall, network, server status")
|
|
|
|
return {
|
|
'success': False,
|
|
'error': 'Connection timeout',
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
print(f"❌ PSQL CLIENT NOT FOUND")
|
|
print(f" 🔸 The 'psql' command-line tool is not installed")
|
|
print(f" 🔸 Install PostgreSQL client tools and try again")
|
|
|
|
return {
|
|
'success': False,
|
|
'error': 'psql command not found. Install PostgreSQL client.',
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"❌ UNEXPECTED ERROR connecting to host: {host}:{port}")
|
|
print(f" 🔸 Error type: {type(e).__name__}")
|
|
print(f" 🔸 Error message: {str(e)}")
|
|
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'connection_details': {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'database': database
|
|
}
|
|
}
|
|
|
|
def get_schemas(self, uri):
|
|
"""Get list of schemas from PostgreSQL database"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return {'success': False, 'error': 'Invalid URI format'}
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
sql = """
|
|
SELECT schema_name
|
|
FROM information_schema.schemata
|
|
WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
|
|
ORDER BY schema_name;
|
|
"""
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
schemas = [s.strip() for s in result.stdout.splitlines() if s.strip()]
|
|
return {
|
|
'success': True,
|
|
'schemas': schemas,
|
|
'count': len(schemas)
|
|
}
|
|
else:
|
|
return {'success': False, 'error': result.stderr.strip()}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def get_tables(self, uri, schema=''):
|
|
"""Get list of tables from PostgreSQL database"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return {'success': False, 'error': 'Invalid URI format'}
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
if schema:
|
|
sql = f"""
|
|
SELECT table_name, table_type
|
|
FROM information_schema.tables
|
|
WHERE table_schema = '{schema}'
|
|
ORDER BY table_name;
|
|
"""
|
|
else:
|
|
sql = """
|
|
SELECT table_schema, table_name, table_type
|
|
FROM information_schema.tables
|
|
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
|
|
ORDER BY table_schema, table_name;
|
|
"""
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
lines = [line.strip() for line in result.stdout.splitlines() if line.strip()]
|
|
tables = []
|
|
|
|
if schema:
|
|
for line in lines:
|
|
if '|' in line:
|
|
table_name, table_type = line.split('|')
|
|
tables.append({
|
|
'name': table_name.strip(),
|
|
'type': table_type.strip(),
|
|
'schema': schema
|
|
})
|
|
else:
|
|
for line in lines:
|
|
if '|' in line:
|
|
parts = line.split('|')
|
|
if len(parts) >= 3:
|
|
table_schema, table_name, table_type = parts[:3]
|
|
tables.append({
|
|
'schema': table_schema.strip(),
|
|
'name': table_name.strip(),
|
|
'type': table_type.strip()
|
|
})
|
|
|
|
return {
|
|
'success': True,
|
|
'tables': tables,
|
|
'count': len(tables)
|
|
}
|
|
else:
|
|
return {'success': False, 'error': result.stderr.strip()}
|
|
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def get_table_counts(self, uri, schema=None):
|
|
"""Get row counts for tables"""
|
|
parsed = self.parse_postgres_uri(uri)
|
|
if not parsed:
|
|
return {}
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
if schema:
|
|
sql = f"""
|
|
SELECT table_name,
|
|
(SELECT COUNT(*) FROM "{schema}"."{table_name}") as row_count
|
|
FROM information_schema.tables
|
|
WHERE table_schema = '{schema}'
|
|
ORDER BY table_name;
|
|
"""
|
|
else:
|
|
sql = """
|
|
SELECT table_schema, table_name,
|
|
(SELECT COUNT(*) FROM information_schema.tables t2
|
|
WHERE t2.table_schema = t1.table_schema
|
|
AND t2.table_name = t1.table_name) as row_count
|
|
FROM information_schema.tables t1
|
|
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
|
|
ORDER BY table_schema, table_name;
|
|
"""
|
|
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=30)
|
|
|
|
table_counts = {}
|
|
if result.returncode == 0:
|
|
for line in result.stdout.splitlines():
|
|
if line.strip() and '|' in line:
|
|
parts = line.split('|')
|
|
if len(parts) >= 3:
|
|
table_schema, table_name, count = parts[:3]
|
|
key = f"{table_schema.strip()}.{table_name.strip()}"
|
|
table_counts[key] = int(count.strip()) if count.strip().isdigit() else 0
|
|
return table_counts
|
|
|
|
except Exception:
|
|
return {}
|
|
|
|
def export_table_to_s3(self, postgres_uri, schema, table, s3_bucket, s3_key,
|
|
compress=True, format='csv',
|
|
access_key_id=None, secret_access_key=None,
|
|
region='us-east-1', endpoint_url=None):
|
|
"""Export a single PostgreSQL table to S3"""
|
|
logs = []
|
|
|
|
def log(msg, level='info'):
|
|
log_entry = {
|
|
'timestamp': time.time(),
|
|
'message': msg,
|
|
'level': level
|
|
}
|
|
logs.append(log_entry)
|
|
print(f"[Export] {msg}")
|
|
|
|
try:
|
|
log(f"🔧 Starting export of {schema}.{table} to S3", 'info')
|
|
log(f"PostgreSQL: {postgres_uri}", 'info')
|
|
log(f"S3 Destination: s3://{s3_bucket}/{s3_key}", 'info')
|
|
log(f"Format: {format}, Compress: {compress}", 'info')
|
|
|
|
# Parse PostgreSQL URI
|
|
parsed = self.parse_postgres_uri(postgres_uri)
|
|
if not parsed:
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid PostgreSQL URI',
|
|
'logs': logs
|
|
}
|
|
|
|
# Set environment for psql
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = parsed['password']
|
|
|
|
# Build SQL command based on format
|
|
if format.lower() == 'csv':
|
|
sql = f"COPY (SELECT * FROM \"{schema}\".\"{table}\") TO STDOUT WITH CSV HEADER"
|
|
else: # json
|
|
sql = f"""
|
|
SELECT json_agg(row_to_json(t))
|
|
FROM (SELECT * FROM "{schema}"."{table}") t
|
|
"""
|
|
|
|
# Build psql command
|
|
cmd = [
|
|
'psql',
|
|
'-h', parsed['host'],
|
|
'-p', str(parsed['port']),
|
|
'-U', parsed['user'],
|
|
'-d', parsed['database'],
|
|
'-t',
|
|
'-c', sql
|
|
]
|
|
|
|
# Execute query and capture output
|
|
log(f"📊 Querying PostgreSQL table...", 'info')
|
|
start_time = time.time()
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True, timeout=300)
|
|
|
|
if result.returncode != 0:
|
|
error_msg = result.stderr[:500]
|
|
log(f"❌ Failed to query table: {error_msg}", 'error')
|
|
return {
|
|
'success': False,
|
|
'error': f"Query failed: {error_msg}",
|
|
'logs': logs
|
|
}
|
|
|
|
query_time = time.time() - start_time
|
|
log(f"✅ Query completed in {query_time:.2f} seconds", 'success')
|
|
|
|
# Get data
|
|
data = result.stdout
|
|
|
|
# Prepare data for upload
|
|
if format.lower() == 'csv':
|
|
file_content = data.encode('utf-8')
|
|
content_type = 'text/csv'
|
|
file_extension = 'csv'
|
|
else: # json
|
|
file_content = json.dumps(json.loads(data), indent=2).encode('utf-8') if data.strip() else b'[]'
|
|
content_type = 'application/json'
|
|
file_extension = 'json'
|
|
|
|
# Compress if requested
|
|
if compress:
|
|
log(f"📦 Compressing data...", 'info')
|
|
buffer = io.BytesIO()
|
|
with gzip.GzipFile(fileobj=buffer, mode='wb') as f:
|
|
f.write(file_content)
|
|
file_content = buffer.getvalue()
|
|
file_extension = f"{file_extension}.gz"
|
|
content_type = 'application/gzip'
|
|
log(f"✅ Compression complete: {len(data)} → {len(file_content)} bytes", 'success')
|
|
|
|
# Upload to S3
|
|
log(f"⬆️ Uploading to S3...", 'info')
|
|
|
|
# Get S3 client
|
|
s3_client = self.get_s3_client(access_key_id, secret_access_key, region, endpoint_url)
|
|
if not s3_client:
|
|
return {
|
|
'success': False,
|
|
'error': 'Failed to create S3 client',
|
|
'logs': logs
|
|
}
|
|
|
|
# Upload file
|
|
upload_start = time.time()
|
|
s3_client.put_object(
|
|
Bucket=s3_bucket,
|
|
Key=s3_key,
|
|
Body=file_content,
|
|
ContentType=content_type,
|
|
Metadata={
|
|
'source_schema': schema,
|
|
'source_table': table,
|
|
'export_time': datetime.utcnow().isoformat(),
|
|
'compressed': str(compress),
|
|
'format': format
|
|
}
|
|
)
|
|
upload_time = time.time() - upload_start
|
|
|
|
# Generate S3 URL
|
|
if endpoint_url:
|
|
s3_url = f"{endpoint_url}/{s3_bucket}/{s3_key}"
|
|
else:
|
|
s3_url = f"https://{s3_bucket}.s3.{region}.amazonaws.com/{s3_key}"
|
|
|
|
log(f"✅ Upload completed in {upload_time:.2f} seconds", 'success')
|
|
log(f"🔗 S3 URL: {s3_url}", 'info')
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f"Exported {schema}.{table} to S3 successfully",
|
|
's3_url': s3_url,
|
|
's3_bucket': s3_bucket,
|
|
's3_key': s3_key,
|
|
'file_size': len(file_content),
|
|
'original_size': len(data) if compress else None,
|
|
'compressed': compress,
|
|
'format': format,
|
|
'upload_time': upload_time,
|
|
'query_time': query_time,
|
|
'logs': logs
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
log("❌ Query timeout (5 minutes)", 'error')
|
|
return {
|
|
'success': False,
|
|
'error': 'Query timeout',
|
|
'logs': logs
|
|
}
|
|
except Exception as e:
|
|
log(f"❌ Error: {str(e)}", 'error')
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'logs': logs
|
|
}
|
|
|
|
def migrate_to_s3(self, migration_id, postgres_uri, s3_bucket, s3_prefix='',
|
|
schemas=None, tables=None, compress=True, format='csv',
|
|
access_key_id=None, secret_access_key=None,
|
|
region='us-east-1', endpoint_url=None):
|
|
"""Migrate PostgreSQL data to S3"""
|
|
logs = []
|
|
|
|
def log(msg, level='info'):
|
|
log_entry = {
|
|
'timestamp': time.time(),
|
|
'message': msg,
|
|
'level': level
|
|
}
|
|
logs.append(log_entry)
|
|
print(f"[{migration_id}] {msg}")
|
|
|
|
try:
|
|
log(f"🔧 Starting PostgreSQL to S3 Migration {migration_id}", 'info')
|
|
log(f"PostgreSQL: {postgres_uri}", 'info')
|
|
log(f"S3 Destination: s3://{s3_bucket}/{s3_prefix}", 'info')
|
|
log(f"Format: {format}, Compress: {compress}", 'info')
|
|
log(f"S3 Region: {region}, Endpoint: {endpoint_url or 'AWS S3 (default)'}", 'info')
|
|
|
|
# Get tables to export
|
|
tables_to_export = []
|
|
if tables:
|
|
tables_to_export = tables
|
|
log(f"📋 Selected tables: {len(tables_to_export)} tables", 'info')
|
|
elif schemas:
|
|
# Get all tables from selected schemas
|
|
for schema in schemas:
|
|
result = self.get_tables(postgres_uri, schema)
|
|
if result['success']:
|
|
for table_info in result['tables']:
|
|
tables_to_export.append(f"{table_info['schema']}.{table_info['name']}")
|
|
log(f"📋 Selected schemas: {', '.join(schemas)} → {len(tables_to_export)} tables", 'info')
|
|
else:
|
|
# Get all tables
|
|
result = self.get_tables(postgres_uri)
|
|
if not result['success']:
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': f"Failed to get tables: {result['error']}",
|
|
'logs': logs
|
|
}
|
|
for table_info in result['tables']:
|
|
tables_to_export.append(f"{table_info['schema']}.{table_info['name']}")
|
|
log(f"📋 All tables: {len(tables_to_export)} tables", 'info')
|
|
|
|
if not tables_to_export:
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': 'No tables to export',
|
|
'logs': logs
|
|
}
|
|
|
|
# Export each table
|
|
successful_exports = []
|
|
failed_exports = []
|
|
|
|
total_tables = len(tables_to_export)
|
|
log(f"🚀 Starting export of {total_tables} tables...", 'info')
|
|
|
|
for i, table_ref in enumerate(tables_to_export, 1):
|
|
if '.' in table_ref:
|
|
schema, table = table_ref.split('.', 1)
|
|
else:
|
|
schema = 'public'
|
|
table = table_ref
|
|
|
|
# Create S3 key
|
|
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
|
|
extension = 'csv.gz' if compress else 'csv' if format == 'csv' else 'json'
|
|
s3_key = f"{s3_prefix}{schema}_{table}_{timestamp}.{extension}" if s3_prefix else f"{schema}_{table}_{timestamp}.{extension}"
|
|
|
|
log(f"📊 Exporting table {i}/{total_tables}: {schema}.{table}", 'info')
|
|
|
|
# Export table
|
|
result = self.export_table_to_s3(
|
|
postgres_uri=postgres_uri,
|
|
schema=schema,
|
|
table=table,
|
|
s3_bucket=s3_bucket,
|
|
s3_key=s3_key,
|
|
compress=compress,
|
|
format=format,
|
|
access_key_id=access_key_id,
|
|
secret_access_key=secret_access_key,
|
|
region=region,
|
|
endpoint_url=endpoint_url
|
|
)
|
|
|
|
if result['success']:
|
|
successful_exports.append({
|
|
'table': f"{schema}.{table}",
|
|
's3_key': s3_key,
|
|
'size': result.get('file_size', 0),
|
|
's3_url': result.get('s3_url')
|
|
})
|
|
log(f"✅ Successfully exported {schema}.{table}", 'success')
|
|
else:
|
|
failed_exports.append({
|
|
'table': f"{schema}.{table}",
|
|
'error': result.get('error', 'Unknown error')
|
|
})
|
|
log(f"❌ Failed to export {schema}.{table}: {result.get('error')}", 'error')
|
|
|
|
# Calculate statistics
|
|
total_size = sum(export['size'] for export in successful_exports)
|
|
|
|
log(f"📊 Migration Summary:", 'info')
|
|
log(f" ✅ Successful: {len(successful_exports)} tables", 'success')
|
|
log(f" ❌ Failed: {len(failed_exports)} tables", 'error' if failed_exports else 'info')
|
|
log(f" 📦 Total size: {total_size:,} bytes", 'info')
|
|
|
|
# Generate S3 configuration
|
|
s3_config = {
|
|
'access_key_id': access_key_id[:10] + '...' if access_key_id else None,
|
|
'secret_access_key': '***' if secret_access_key else None,
|
|
'region': region,
|
|
'endpoint_url': endpoint_url,
|
|
'bucket': s3_bucket,
|
|
'prefix': s3_prefix
|
|
}
|
|
|
|
return {
|
|
'success': True,
|
|
'migration_id': migration_id,
|
|
'message': f"Migration completed: {len(successful_exports)}/{total_tables} tables exported",
|
|
'stats': {
|
|
'total_tables': total_tables,
|
|
'successful_exports': len(successful_exports),
|
|
'failed_exports': len(failed_exports),
|
|
'total_size': total_size
|
|
},
|
|
'successful_exports': successful_exports,
|
|
'failed_exports': failed_exports,
|
|
's3_config': s3_config,
|
|
'logs': logs
|
|
}
|
|
|
|
except Exception as e:
|
|
log(f"❌ Migration failed: {str(e)}", 'error')
|
|
return {
|
|
'success': False,
|
|
'migration_id': migration_id,
|
|
'error': str(e),
|
|
'logs': logs
|
|
}
|
|
|
|
def start_migration_async(self, migration_id, postgres_uri, s3_bucket, s3_prefix='',
|
|
schemas=None, tables=None, compress=True, format='csv',
|
|
access_key_id=None, secret_access_key=None,
|
|
region='us-east-1', endpoint_url=None):
|
|
"""Start migration in background thread"""
|
|
def migration_task():
|
|
result = self.migrate_to_s3(
|
|
migration_id=migration_id,
|
|
postgres_uri=postgres_uri,
|
|
s3_bucket=s3_bucket,
|
|
s3_prefix=s3_prefix,
|
|
schemas=schemas,
|
|
tables=tables,
|
|
compress=compress,
|
|
format=format,
|
|
access_key_id=access_key_id,
|
|
secret_access_key=secret_access_key,
|
|
region=region,
|
|
endpoint_url=endpoint_url
|
|
)
|
|
with self._lock:
|
|
self.migrations[migration_id] = result
|
|
|
|
thread = threading.Thread(target=migration_task, daemon=True)
|
|
thread.start()
|
|
|
|
# Initialize migration entry
|
|
with self._lock:
|
|
self.migrations[migration_id] = {
|
|
'status': 'running',
|
|
'started_at': time.time(),
|
|
'postgres_uri': postgres_uri,
|
|
's3_bucket': s3_bucket,
|
|
's3_prefix': s3_prefix
|
|
}
|
|
|
|
return migration_id
|
|
|
|
def get_migration_status(self, migration_id):
|
|
"""Get status of a migration"""
|
|
with self._lock:
|
|
return self.migrations.get(migration_id)
|
|
|
|
def list_migrations(self):
|
|
"""List all migrations"""
|
|
with self._lock:
|
|
migrations_list = []
|
|
for mig_id, mig_data in self.migrations.items():
|
|
if isinstance(mig_data, dict) and 'success' in mig_data:
|
|
status = 'completed' if mig_data['success'] else 'failed'
|
|
else:
|
|
status = 'running'
|
|
|
|
migrations_list.append({
|
|
'id': mig_id,
|
|
'status': status,
|
|
'data': mig_data
|
|
})
|
|
|
|
return migrations_list
|
|
|
|
|
|
# ============================================================================
|
|
# Global instances for backward compatibility
|
|
# ============================================================================
|
|
|
|
# Create instances for each migrator type
|
|
postgres_migrator = PostgresMigrator()
|
|
s3_to_s3_migrator = S3ToS3Migrator()
|
|
postgres_to_s3_migrator = PostgresToS3Migrator()
|
|
|
|
# For backward compatibility with existing code that imports 'migrator'
|
|
# You can choose which one to export as 'migrator' based on your needs
|
|
# migrator = postgres_migrator # For PostgreSQL to PostgreSQL
|
|
# migrator = s3_to_s3_migrator # For S3 to S3
|
|
# migrator = postgres_to_s3_migrator # For PostgreSQL to S3
|
|
|
|
# By default, let's export all three with clear names
|
|
__all__ = [
|
|
'PostgresMigrator',
|
|
'S3ToS3Migrator',
|
|
'PostgresToS3Migrator',
|
|
'postgres_migrator',
|
|
's3_to_s3_migrator',
|
|
'postgres_to_s3_migrator'
|
|
] |