commit f04cf269261adaf503f095361910a55a7cab6c61
Author: ghaymah_dev
Date: Tue Nov 18 13:12:17 2025 +0000
Add main.py
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..337cdb9
--- /dev/null
+++ b/main.py
@@ -0,0 +1,489 @@
+# بسم الله الرحمن الرحيم
+
+
+# بسم الله الرحمن الرحيم
+
+import boto3
+import json
+import os
+import csv
+from datetime import datetime
+from decimal import Decimal
+
+class DynamoDBExporter:
+ def __init__(self, profile_name=None, region_name='us-east-1'):
+ """
+ Initialize DynamoDB Exporter
+
+ Args:
+ profile_name (str): AWS profile name (optional)
+ region_name (str): AWS region name
+ """
+ try:
+ if profile_name:
+ session = boto3.Session(profile_name=profile_name)
+ self.dynamodb = session.client('dynamodb', region_name=region_name)
+ self.dynamodb_resource = session.resource('dynamodb', region_name=region_name)
+ else:
+ self.dynamodb = boto3.client('dynamodb', region_name=region_name)
+ self.dynamodb_resource = boto3.resource('dynamodb', region_name=region_name)
+
+ print("DynamoDB client initialized successfully")
+ except Exception as e:
+ print(f"Error initializing DynamoDB client: {e}")
+ raise
+
+ def get_all_tables(self):
+ """Retrieve all DynamoDB tables from the region"""
+ try:
+ tables = []
+ paginator = self.dynamodb.get_paginator('list_tables')
+
+ for page in paginator.paginate():
+ tables.extend(page['TableNames'])
+
+ print(f"Found {len(tables)} DynamoDB tables")
+ return tables
+ except Exception as e:
+ print(f"Error fetching tables: {e}")
+ return []
+
+ def get_table_details(self, table_name):
+ """Get detailed information about a specific table"""
+ try:
+ response = self.dynamodb.describe_table(TableName=table_name)
+ table_info = response['Table']
+
+ # Get additional metrics
+ try:
+ continuous_backups = self.dynamodb.describe_continuous_backups(TableName=table_name)
+ table_info['ContinuousBackups'] = continuous_backups['ContinuousBackupsDescription']
+ except Exception as e:
+ table_info['ContinuousBackups'] = f"Error: {str(e)}"
+
+ return table_info
+ except Exception as e:
+ print(f"Error getting details for table {table_name}: {e}")
+ return None
+
+ def get_table_schema(self, table_info):
+ """Extract schema information from table details"""
+ schema = {
+ 'table_name': table_info['TableName'],
+ 'table_arn': table_info['TableArn'],
+ 'table_status': table_info['TableStatus'],
+ 'creation_date': table_info['CreationDateTime'].isoformat(),
+ 'item_count': table_info.get('ItemCount', 0),
+ 'table_size_bytes': table_info.get('TableSizeBytes', 0)
+ }
+
+ # Key schema
+ schema['key_schema'] = {
+ 'hash_key': None,
+ 'range_key': None
+ }
+
+ for key in table_info['KeySchema']:
+ if key['KeyType'] == 'HASH':
+ schema['key_schema']['hash_key'] = key['AttributeName']
+ elif key['KeyType'] == 'RANGE':
+ schema['key_schema']['range_key'] = key['AttributeName']
+
+ # Attribute definitions
+ schema['attribute_definitions'] = table_info['AttributeDefinitions']
+
+ # Throughput information
+ if 'ProvisionedThroughput' in table_info:
+ schema['provisioned_throughput'] = table_info['ProvisionedThroughput']
+
+ # Global Secondary Indexes
+ if 'GlobalSecondaryIndexes' in table_info:
+ schema['global_secondary_indexes'] = table_info['GlobalSecondaryIndexes']
+
+ # Local Secondary Indexes
+ if 'LocalSecondaryIndexes' in table_info:
+ schema['local_secondary_indexes'] = table_info['LocalSecondaryIndexes']
+
+ # Stream specification
+ if 'StreamSpecification' in table_info:
+ schema['stream_specification'] = table_info['StreamSpecification']
+
+ # SSEDescription
+ if 'SSEDescription' in table_info:
+ schema['sse_description'] = table_info['SSEDescription']
+
+ return schema
+
+ def export_table_data_sample(self, table_name, max_items=100):
+ """Export a sample of table data"""
+ try:
+ table = self.dynamodb_resource.Table(table_name)
+
+ # Scan with limit to get sample data
+ response = table.scan(Limit=max_items)
+ items = response.get('Items', [])
+
+ # Convert Decimal to float for JSON serialization
+ def convert_decimals(obj):
+ if isinstance(obj, Decimal):
+ return float(obj) if obj % 1 != 0 else int(obj)
+ elif isinstance(obj, dict):
+ return {k: convert_decimals(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [convert_decimals(v) for v in obj]
+ else:
+ return obj
+
+ items = convert_decimals(items)
+ return items
+
+ except Exception as e:
+ print(f"Error exporting data sample from {table_name}: {e}")
+ return []
+
+ def export_table_to_json(self, table_name, table_info, schema, data_sample, export_dir):
+ """Export table information to JSON format"""
+ try:
+ json_file = os.path.join(export_dir, f"{table_name}.json")
+
+ table_data = {
+ 'export_date': datetime.now().isoformat(),
+ 'table_name': table_name,
+ 'table_details': table_info,
+ 'schema': schema,
+ 'data_sample': data_sample
+ }
+
+ with open(json_file, 'w') as f:
+ json.dump(table_data, f, indent=2, default=str)
+
+ print(f"JSON format exported: {json_file}")
+ return True
+ except Exception as e:
+ print(f"Error exporting JSON for {table_name}: {e}")
+ return False
+
+ def export_schema_to_csv(self, table_name, schema, export_dir):
+ """Export schema information to CSV format"""
+ try:
+ csv_file = os.path.join(export_dir, f"{table_name}_schema.csv")
+
+ with open(csv_file, 'w', newline='') as f:
+ writer = csv.writer(f)
+
+ # Write basic table info
+ writer.writerow(['Table Information'])
+ writer.writerow(['Table Name', table_name])
+ writer.writerow(['Table ARN', schema['table_arn']])
+ writer.writerow(['Status', schema['table_status']])
+ writer.writerow(['Item Count', schema['item_count']])
+ writer.writerow(['Table Size (Bytes)', schema['table_size_bytes']])
+ writer.writerow([])
+
+ # Write key schema
+ writer.writerow(['Key Schema'])
+ writer.writerow(['Key Type', 'Attribute Name'])
+ writer.writerow(['HASH', schema['key_schema']['hash_key']])
+ if schema['key_schema']['range_key']:
+ writer.writerow(['RANGE', schema['key_schema']['range_key']])
+ writer.writerow([])
+
+ # Write attribute definitions
+ writer.writerow(['Attribute Definitions'])
+ writer.writerow(['Attribute Name', 'Attribute Type'])
+ for attr in schema['attribute_definitions']:
+ writer.writerow([attr['AttributeName'], attr['AttributeType']])
+ writer.writerow([])
+
+ # Write throughput info
+ if 'provisioned_throughput' in schema:
+ writer.writerow(['Provisioned Throughput'])
+ throughput = schema['provisioned_throughput']
+ writer.writerow(['Read Capacity', throughput['ReadCapacityUnits']])
+ writer.writerow(['Write Capacity', throughput['WriteCapacityUnits']])
+ writer.writerow([])
+
+ print(f"CSV schema exported: {csv_file}")
+ return True
+ except Exception as e:
+ print(f"Error exporting CSV schema for {table_name}: {e}")
+ return False
+
+ def export_data_sample_to_csv(self, table_name, data_sample, export_dir):
+ """Export data sample to CSV format"""
+ if not data_sample:
+ return False
+
+ try:
+ csv_file = os.path.join(export_dir, f"{table_name}_data_sample.csv")
+
+ if data_sample:
+ # Get all unique field names from the sample
+ fieldnames = set()
+ for item in data_sample:
+ fieldnames.update(item.keys())
+ fieldnames = sorted(list(fieldnames))
+
+ with open(csv_file, 'w', newline='') as f:
+ writer = csv.DictWriter(f, fieldnames=fieldnames)
+ writer.writeheader()
+
+ for item in data_sample:
+ # Convert all values to strings for CSV
+ row = {}
+ for key in fieldnames:
+ value = item.get(key, '')
+ if isinstance(value, (dict, list)):
+ value = json.dumps(value)
+ row[key] = str(value)
+ writer.writerow(row)
+
+ print(f"CSV data sample exported: {csv_file}")
+ return True
+ else:
+ print(f"No data to export for {table_name}")
+ return False
+
+ except Exception as e:
+ print(f"Error exporting CSV data sample for {table_name}: {e}")
+ return False
+
+ def export_global_secondary_indexes(self, table_name, schema, export_dir):
+ """Export Global Secondary Indexes information"""
+ try:
+ if 'global_secondary_indexes' in schema and schema['global_secondary_indexes']:
+ gsi_file = os.path.join(export_dir, f"{table_name}_gsi.json")
+
+ gsi_data = {
+ 'table_name': table_name,
+ 'global_secondary_indexes': schema['global_secondary_indexes'],
+ 'export_date': datetime.now().isoformat()
+ }
+
+ with open(gsi_file, 'w') as f:
+ json.dump(gsi_data, f, indent=2, default=str)
+
+ print(f"GSI information exported: {gsi_file}")
+ return True
+ else:
+ return False
+ except Exception as e:
+ print(f"Error exporting GSI for {table_name}: {e}")
+ return False
+
+ def create_export_summary(self, tables, schemas, export_dir):
+ """Create a comprehensive export summary"""
+ summary = {
+ 'export_date': datetime.now().isoformat(),
+ 'total_tables': len(tables),
+ 'tables': [],
+ 'total_items': 0,
+ 'total_size_bytes': 0
+ }
+
+ for schema in schemas:
+ table_info = {
+ 'table_name': schema['table_name'],
+ 'status': schema['table_status'],
+ 'item_count': schema['item_count'],
+ 'table_size_bytes': schema['table_size_bytes'],
+ 'key_schema': schema['key_schema'],
+ 'has_gsi': 'global_secondary_indexes' in schema and bool(schema['global_secondary_indexes']),
+ 'has_lsi': 'local_secondary_indexes' in schema and bool(schema['local_secondary_indexes'])
+ }
+ summary['tables'].append(table_info)
+ summary['total_items'] += schema['item_count']
+ summary['total_size_bytes'] += schema['table_size_bytes']
+
+ summary_file = os.path.join(export_dir, 'export_summary.json')
+ with open(summary_file, 'w') as f:
+ json.dump(summary, f, indent=2)
+
+ print(f"Export summary created: {summary_file}")
+ return summary
+
+ def export_all_dynamodb_data(self, base_export_dir='dynamodb_export', include_data_samples=True, sample_size=50):
+ """Main method to export all DynamoDB data"""
+
+ # Create export directory with timestamp
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ export_dir = os.path.join(base_export_dir, f"export_{timestamp}")
+ os.makedirs(export_dir, exist_ok=True)
+
+ print(f"Starting DynamoDB export to: {export_dir}")
+
+ # Get all tables
+ tables = self.get_all_tables()
+
+ if not tables:
+ print("No DynamoDB tables found or error retrieving tables")
+ return
+
+ successful_exports = 0
+ all_schemas = []
+
+ # Export each table
+ for table_name in tables:
+ print(f"\nProcessing table: {table_name}")
+
+ # Create table-specific directory
+ table_dir = os.path.join(export_dir, 'tables', table_name)
+ os.makedirs(table_dir, exist_ok=True)
+
+ # Get table details
+ table_info = self.get_table_details(table_name)
+ if not table_info:
+ print(f"Failed to get details for table: {table_name}")
+ continue
+
+ # Extract schema
+ schema = self.get_table_schema(table_info)
+ all_schemas.append(schema)
+
+ # Get data sample
+ data_sample = []
+ if include_data_samples:
+ data_sample = self.export_table_data_sample(table_name, sample_size)
+
+ # Export in multiple formats
+ json_success = self.export_table_to_json(table_name, table_info, schema, data_sample, table_dir)
+ csv_schema_success = self.export_schema_to_csv(table_name, schema, table_dir)
+ csv_data_success = self.export_data_sample_to_csv(table_name, data_sample, table_dir)
+ gsi_success = self.export_global_secondary_indexes(table_name, schema, table_dir)
+
+ if json_success or csv_schema_success or csv_data_success:
+ successful_exports += 1
+ print(f"Successfully exported table: {table_name}")
+ else:
+ print(f"Failed to export table: {table_name}")
+
+ # Create export summary
+ summary = self.create_export_summary(tables, all_schemas, export_dir)
+
+ # Final report
+ print(f"\nDynamoDB Export Completed Successfully!")
+ print(f"Export Location: {os.path.abspath(export_dir)}")
+ print(f"Summary:")
+ print(f" Tables Exported: {successful_exports}/{len(tables)}")
+ print(f" Total Items: {summary['total_items']:,}")
+ print(f" Total Size: {summary['total_size_bytes']:,} bytes")
+ print(f" Export Formats: JSON, CSV")
+ if include_data_samples:
+ print(f" Data Samples: {sample_size} items per table")
+
+ return export_dir
+
+# Quick export function
+def export_dynamodb_simple(export_dir='dynamodb_simple_export'):
+ """Simple one-function export for quick use"""
+ try:
+ exporter = DynamoDBExporter()
+
+ # Create export directory
+ os.makedirs(export_dir, exist_ok=True)
+
+ # Get tables
+ tables = exporter.get_all_tables()
+
+ all_tables_data = {
+ 'export_date': datetime.now().isoformat(),
+ 'tables': []
+ }
+
+ for table_name in tables:
+ print(f"Exporting: {table_name}")
+ table_info = exporter.get_table_details(table_name)
+
+ if table_info:
+ schema = exporter.get_table_schema(table_info)
+ table_data = {
+ 'table_name': table_name,
+ 'schema': schema
+ }
+ all_tables_data['tables'].append(table_data)
+
+ # Save all data to a single file
+ output_file = os.path.join(export_dir, 'all_dynamodb_schemas.json')
+ with open(output_file, 'w') as f:
+ json.dump(all_tables_data, f, indent=2, default=str)
+
+ print(f"Simple export completed: {output_file}")
+ return output_file
+
+ except Exception as e:
+ print(f"Error in simple export: {e}")
+ return None
+
+# Function to export table creation templates
+def export_cloudformation_templates(schemas, export_dir):
+ """Export CloudFormation templates for tables"""
+ try:
+ templates_dir = os.path.join(export_dir, 'cloudformation_templates')
+ os.makedirs(templates_dir, exist_ok=True)
+
+ for schema in schemas:
+ table_name = schema['table_name']
+ template = {
+ 'AWSTemplateFormatVersion': '2010-09-09',
+ 'Description': f'CloudFormation template for DynamoDB table {table_name}',
+ 'Resources': {
+ table_name: {
+ 'Type': 'AWS::DynamoDB::Table',
+ 'Properties': {
+ 'TableName': table_name,
+ 'AttributeDefinitions': schema['attribute_definitions'],
+ 'KeySchema': schema.get('key_schema', {}),
+ 'BillingMode': 'PROVISIONED',
+ 'ProvisionedThroughput': schema.get('provisioned_throughput', {
+ 'ReadCapacityUnits': 5,
+ 'WriteCapacityUnits': 5
+ })
+ }
+ }
+ }
+ }
+
+ # Add Global Secondary Indexes if present
+ if 'global_secondary_indexes' in schema:
+ template['Resources'][table_name]['Properties']['GlobalSecondaryIndexes'] = schema['global_secondary_indexes']
+
+ template_file = os.path.join(templates_dir, f"{table_name}_template.json")
+ with open(template_file, 'w') as f:
+ json.dump(template, f, indent=2)
+
+ print(f"CloudFormation templates exported to: {templates_dir}")
+ return True
+ except Exception as e:
+ print(f"Error exporting CloudFormation templates: {e}")
+ return False
+
+def main():
+ print("Starting DynamoDB Export...")
+
+ try:
+ # Full comprehensive export
+ exporter = DynamoDBExporter()
+ export_path = exporter.export_all_dynamodb_data(
+ base_export_dir='dynamodb_backup',
+ include_data_samples=True,
+ sample_size=100
+ )
+
+ print(f"\nExport completed successfully!")
+ print(f"All DynamoDB data has been saved to: {export_path}")
+
+ except Exception as e:
+ print(f"Export failed: {e}")
+ print("\nTrying simple export method...")
+
+ # Fallback to simple export
+ simple_export = export_dynamodb_simple()
+ if simple_export:
+ print(f"Simple export completed: {simple_export}")
+
+if __name__ == "__main__":
+ main()
+
+
+
+
\ No newline at end of file