# بسم الله الرحمن الرحيم # بسم الله الرحمن الرحيم import boto3 import json import os import csv from datetime import datetime from decimal import Decimal class DynamoDBExporter: def __init__(self, profile_name=None, region_name='us-east-1'): """ Initialize DynamoDB Exporter Args: profile_name (str): AWS profile name (optional) region_name (str): AWS region name """ try: if profile_name: session = boto3.Session(profile_name=profile_name) self.dynamodb = session.client('dynamodb', region_name=region_name) self.dynamodb_resource = session.resource('dynamodb', region_name=region_name) else: self.dynamodb = boto3.client('dynamodb', region_name=region_name) self.dynamodb_resource = boto3.resource('dynamodb', region_name=region_name) print("DynamoDB client initialized successfully") except Exception as e: print(f"Error initializing DynamoDB client: {e}") raise def get_all_tables(self): """Retrieve all DynamoDB tables from the region""" try: tables = [] paginator = self.dynamodb.get_paginator('list_tables') for page in paginator.paginate(): tables.extend(page['TableNames']) print(f"Found {len(tables)} DynamoDB tables") return tables except Exception as e: print(f"Error fetching tables: {e}") return [] def get_table_details(self, table_name): """Get detailed information about a specific table""" try: response = self.dynamodb.describe_table(TableName=table_name) table_info = response['Table'] # Get additional metrics try: continuous_backups = self.dynamodb.describe_continuous_backups(TableName=table_name) table_info['ContinuousBackups'] = continuous_backups['ContinuousBackupsDescription'] except Exception as e: table_info['ContinuousBackups'] = f"Error: {str(e)}" return table_info except Exception as e: print(f"Error getting details for table {table_name}: {e}") return None def get_table_schema(self, table_info): """Extract schema information from table details""" schema = { 'table_name': table_info['TableName'], 'table_arn': table_info['TableArn'], 'table_status': table_info['TableStatus'], 'creation_date': table_info['CreationDateTime'].isoformat(), 'item_count': table_info.get('ItemCount', 0), 'table_size_bytes': table_info.get('TableSizeBytes', 0) } # Key schema schema['key_schema'] = { 'hash_key': None, 'range_key': None } for key in table_info['KeySchema']: if key['KeyType'] == 'HASH': schema['key_schema']['hash_key'] = key['AttributeName'] elif key['KeyType'] == 'RANGE': schema['key_schema']['range_key'] = key['AttributeName'] # Attribute definitions schema['attribute_definitions'] = table_info['AttributeDefinitions'] # Throughput information if 'ProvisionedThroughput' in table_info: schema['provisioned_throughput'] = table_info['ProvisionedThroughput'] # Global Secondary Indexes if 'GlobalSecondaryIndexes' in table_info: schema['global_secondary_indexes'] = table_info['GlobalSecondaryIndexes'] # Local Secondary Indexes if 'LocalSecondaryIndexes' in table_info: schema['local_secondary_indexes'] = table_info['LocalSecondaryIndexes'] # Stream specification if 'StreamSpecification' in table_info: schema['stream_specification'] = table_info['StreamSpecification'] # SSEDescription if 'SSEDescription' in table_info: schema['sse_description'] = table_info['SSEDescription'] return schema def export_table_data_sample(self, table_name, max_items=100): """Export a sample of table data""" try: table = self.dynamodb_resource.Table(table_name) # Scan with limit to get sample data response = table.scan(Limit=max_items) items = response.get('Items', []) # Convert Decimal to float for JSON serialization def convert_decimals(obj): if isinstance(obj, Decimal): return float(obj) if obj % 1 != 0 else int(obj) elif isinstance(obj, dict): return {k: convert_decimals(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_decimals(v) for v in obj] else: return obj items = convert_decimals(items) return items except Exception as e: print(f"Error exporting data sample from {table_name}: {e}") return [] def export_table_to_json(self, table_name, table_info, schema, data_sample, export_dir): """Export table information to JSON format""" try: json_file = os.path.join(export_dir, f"{table_name}.json") table_data = { 'export_date': datetime.now().isoformat(), 'table_name': table_name, 'table_details': table_info, 'schema': schema, 'data_sample': data_sample } with open(json_file, 'w') as f: json.dump(table_data, f, indent=2, default=str) print(f"JSON format exported: {json_file}") return True except Exception as e: print(f"Error exporting JSON for {table_name}: {e}") return False def export_schema_to_csv(self, table_name, schema, export_dir): """Export schema information to CSV format""" try: csv_file = os.path.join(export_dir, f"{table_name}_schema.csv") with open(csv_file, 'w', newline='') as f: writer = csv.writer(f) # Write basic table info writer.writerow(['Table Information']) writer.writerow(['Table Name', table_name]) writer.writerow(['Table ARN', schema['table_arn']]) writer.writerow(['Status', schema['table_status']]) writer.writerow(['Item Count', schema['item_count']]) writer.writerow(['Table Size (Bytes)', schema['table_size_bytes']]) writer.writerow([]) # Write key schema writer.writerow(['Key Schema']) writer.writerow(['Key Type', 'Attribute Name']) writer.writerow(['HASH', schema['key_schema']['hash_key']]) if schema['key_schema']['range_key']: writer.writerow(['RANGE', schema['key_schema']['range_key']]) writer.writerow([]) # Write attribute definitions writer.writerow(['Attribute Definitions']) writer.writerow(['Attribute Name', 'Attribute Type']) for attr in schema['attribute_definitions']: writer.writerow([attr['AttributeName'], attr['AttributeType']]) writer.writerow([]) # Write throughput info if 'provisioned_throughput' in schema: writer.writerow(['Provisioned Throughput']) throughput = schema['provisioned_throughput'] writer.writerow(['Read Capacity', throughput['ReadCapacityUnits']]) writer.writerow(['Write Capacity', throughput['WriteCapacityUnits']]) writer.writerow([]) print(f"CSV schema exported: {csv_file}") return True except Exception as e: print(f"Error exporting CSV schema for {table_name}: {e}") return False def export_data_sample_to_csv(self, table_name, data_sample, export_dir): """Export data sample to CSV format""" if not data_sample: return False try: csv_file = os.path.join(export_dir, f"{table_name}_data_sample.csv") if data_sample: # Get all unique field names from the sample fieldnames = set() for item in data_sample: fieldnames.update(item.keys()) fieldnames = sorted(list(fieldnames)) with open(csv_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for item in data_sample: # Convert all values to strings for CSV row = {} for key in fieldnames: value = item.get(key, '') if isinstance(value, (dict, list)): value = json.dumps(value) row[key] = str(value) writer.writerow(row) print(f"CSV data sample exported: {csv_file}") return True else: print(f"No data to export for {table_name}") return False except Exception as e: print(f"Error exporting CSV data sample for {table_name}: {e}") return False def export_global_secondary_indexes(self, table_name, schema, export_dir): """Export Global Secondary Indexes information""" try: if 'global_secondary_indexes' in schema and schema['global_secondary_indexes']: gsi_file = os.path.join(export_dir, f"{table_name}_gsi.json") gsi_data = { 'table_name': table_name, 'global_secondary_indexes': schema['global_secondary_indexes'], 'export_date': datetime.now().isoformat() } with open(gsi_file, 'w') as f: json.dump(gsi_data, f, indent=2, default=str) print(f"GSI information exported: {gsi_file}") return True else: return False except Exception as e: print(f"Error exporting GSI for {table_name}: {e}") return False def create_export_summary(self, tables, schemas, export_dir): """Create a comprehensive export summary""" summary = { 'export_date': datetime.now().isoformat(), 'total_tables': len(tables), 'tables': [], 'total_items': 0, 'total_size_bytes': 0 } for schema in schemas: table_info = { 'table_name': schema['table_name'], 'status': schema['table_status'], 'item_count': schema['item_count'], 'table_size_bytes': schema['table_size_bytes'], 'key_schema': schema['key_schema'], 'has_gsi': 'global_secondary_indexes' in schema and bool(schema['global_secondary_indexes']), 'has_lsi': 'local_secondary_indexes' in schema and bool(schema['local_secondary_indexes']) } summary['tables'].append(table_info) summary['total_items'] += schema['item_count'] summary['total_size_bytes'] += schema['table_size_bytes'] summary_file = os.path.join(export_dir, 'export_summary.json') with open(summary_file, 'w') as f: json.dump(summary, f, indent=2) print(f"Export summary created: {summary_file}") return summary def export_all_dynamodb_data(self, base_export_dir='dynamodb_export', include_data_samples=True, sample_size=50): """Main method to export all DynamoDB data""" # Create export directory with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") export_dir = os.path.join(base_export_dir, f"export_{timestamp}") os.makedirs(export_dir, exist_ok=True) print(f"Starting DynamoDB export to: {export_dir}") # Get all tables tables = self.get_all_tables() if not tables: print("No DynamoDB tables found or error retrieving tables") return successful_exports = 0 all_schemas = [] # Export each table for table_name in tables: print(f"\nProcessing table: {table_name}") # Create table-specific directory table_dir = os.path.join(export_dir, 'tables', table_name) os.makedirs(table_dir, exist_ok=True) # Get table details table_info = self.get_table_details(table_name) if not table_info: print(f"Failed to get details for table: {table_name}") continue # Extract schema schema = self.get_table_schema(table_info) all_schemas.append(schema) # Get data sample data_sample = [] if include_data_samples: data_sample = self.export_table_data_sample(table_name, sample_size) # Export in multiple formats json_success = self.export_table_to_json(table_name, table_info, schema, data_sample, table_dir) csv_schema_success = self.export_schema_to_csv(table_name, schema, table_dir) csv_data_success = self.export_data_sample_to_csv(table_name, data_sample, table_dir) gsi_success = self.export_global_secondary_indexes(table_name, schema, table_dir) if json_success or csv_schema_success or csv_data_success: successful_exports += 1 print(f"Successfully exported table: {table_name}") else: print(f"Failed to export table: {table_name}") # Create export summary summary = self.create_export_summary(tables, all_schemas, export_dir) # Final report print(f"\nDynamoDB Export Completed Successfully!") print(f"Export Location: {os.path.abspath(export_dir)}") print(f"Summary:") print(f" Tables Exported: {successful_exports}/{len(tables)}") print(f" Total Items: {summary['total_items']:,}") print(f" Total Size: {summary['total_size_bytes']:,} bytes") print(f" Export Formats: JSON, CSV") if include_data_samples: print(f" Data Samples: {sample_size} items per table") return export_dir # Quick export function def export_dynamodb_simple(export_dir='dynamodb_simple_export'): """Simple one-function export for quick use""" try: exporter = DynamoDBExporter() # Create export directory os.makedirs(export_dir, exist_ok=True) # Get tables tables = exporter.get_all_tables() all_tables_data = { 'export_date': datetime.now().isoformat(), 'tables': [] } for table_name in tables: print(f"Exporting: {table_name}") table_info = exporter.get_table_details(table_name) if table_info: schema = exporter.get_table_schema(table_info) table_data = { 'table_name': table_name, 'schema': schema } all_tables_data['tables'].append(table_data) # Save all data to a single file output_file = os.path.join(export_dir, 'all_dynamodb_schemas.json') with open(output_file, 'w') as f: json.dump(all_tables_data, f, indent=2, default=str) print(f"Simple export completed: {output_file}") return output_file except Exception as e: print(f"Error in simple export: {e}") return None # Function to export table creation templates def export_cloudformation_templates(schemas, export_dir): """Export CloudFormation templates for tables""" try: templates_dir = os.path.join(export_dir, 'cloudformation_templates') os.makedirs(templates_dir, exist_ok=True) for schema in schemas: table_name = schema['table_name'] template = { 'AWSTemplateFormatVersion': '2010-09-09', 'Description': f'CloudFormation template for DynamoDB table {table_name}', 'Resources': { table_name: { 'Type': 'AWS::DynamoDB::Table', 'Properties': { 'TableName': table_name, 'AttributeDefinitions': schema['attribute_definitions'], 'KeySchema': schema.get('key_schema', {}), 'BillingMode': 'PROVISIONED', 'ProvisionedThroughput': schema.get('provisioned_throughput', { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }) } } } } # Add Global Secondary Indexes if present if 'global_secondary_indexes' in schema: template['Resources'][table_name]['Properties']['GlobalSecondaryIndexes'] = schema['global_secondary_indexes'] template_file = os.path.join(templates_dir, f"{table_name}_template.json") with open(template_file, 'w') as f: json.dump(template, f, indent=2) print(f"CloudFormation templates exported to: {templates_dir}") return True except Exception as e: print(f"Error exporting CloudFormation templates: {e}") return False def main(): print("Starting DynamoDB Export...") try: # Full comprehensive export exporter = DynamoDBExporter() export_path = exporter.export_all_dynamodb_data( base_export_dir='dynamodb_backup', include_data_samples=True, sample_size=100 ) print(f"\nExport completed successfully!") print(f"All DynamoDB data has been saved to: {export_path}") except Exception as e: print(f"Export failed: {e}") print("\nTrying simple export method...") # Fallback to simple export simple_export = export_dynamodb_simple() if simple_export: print(f"Simple export completed: {simple_export}") if __name__ == "__main__": main()