489 أسطر
19 KiB
Python
489 أسطر
19 KiB
Python
# بسم الله الرحمن الرحيم
|
|
|
|
|
|
# بسم الله الرحمن الرحيم
|
|
|
|
import boto3
|
|
import json
|
|
import os
|
|
import csv
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
|
|
class DynamoDBExporter:
|
|
def __init__(self, profile_name=None, region_name='us-east-1'):
|
|
"""
|
|
Initialize DynamoDB Exporter
|
|
|
|
Args:
|
|
profile_name (str): AWS profile name (optional)
|
|
region_name (str): AWS region name
|
|
"""
|
|
try:
|
|
if profile_name:
|
|
session = boto3.Session(profile_name=profile_name)
|
|
self.dynamodb = session.client('dynamodb', region_name=region_name)
|
|
self.dynamodb_resource = session.resource('dynamodb', region_name=region_name)
|
|
else:
|
|
self.dynamodb = boto3.client('dynamodb', region_name=region_name)
|
|
self.dynamodb_resource = boto3.resource('dynamodb', region_name=region_name)
|
|
|
|
print("DynamoDB client initialized successfully")
|
|
except Exception as e:
|
|
print(f"Error initializing DynamoDB client: {e}")
|
|
raise
|
|
|
|
def get_all_tables(self):
|
|
"""Retrieve all DynamoDB tables from the region"""
|
|
try:
|
|
tables = []
|
|
paginator = self.dynamodb.get_paginator('list_tables')
|
|
|
|
for page in paginator.paginate():
|
|
tables.extend(page['TableNames'])
|
|
|
|
print(f"Found {len(tables)} DynamoDB tables")
|
|
return tables
|
|
except Exception as e:
|
|
print(f"Error fetching tables: {e}")
|
|
return []
|
|
|
|
def get_table_details(self, table_name):
|
|
"""Get detailed information about a specific table"""
|
|
try:
|
|
response = self.dynamodb.describe_table(TableName=table_name)
|
|
table_info = response['Table']
|
|
|
|
# Get additional metrics
|
|
try:
|
|
continuous_backups = self.dynamodb.describe_continuous_backups(TableName=table_name)
|
|
table_info['ContinuousBackups'] = continuous_backups['ContinuousBackupsDescription']
|
|
except Exception as e:
|
|
table_info['ContinuousBackups'] = f"Error: {str(e)}"
|
|
|
|
return table_info
|
|
except Exception as e:
|
|
print(f"Error getting details for table {table_name}: {e}")
|
|
return None
|
|
|
|
def get_table_schema(self, table_info):
|
|
"""Extract schema information from table details"""
|
|
schema = {
|
|
'table_name': table_info['TableName'],
|
|
'table_arn': table_info['TableArn'],
|
|
'table_status': table_info['TableStatus'],
|
|
'creation_date': table_info['CreationDateTime'].isoformat(),
|
|
'item_count': table_info.get('ItemCount', 0),
|
|
'table_size_bytes': table_info.get('TableSizeBytes', 0)
|
|
}
|
|
|
|
# Key schema
|
|
schema['key_schema'] = {
|
|
'hash_key': None,
|
|
'range_key': None
|
|
}
|
|
|
|
for key in table_info['KeySchema']:
|
|
if key['KeyType'] == 'HASH':
|
|
schema['key_schema']['hash_key'] = key['AttributeName']
|
|
elif key['KeyType'] == 'RANGE':
|
|
schema['key_schema']['range_key'] = key['AttributeName']
|
|
|
|
# Attribute definitions
|
|
schema['attribute_definitions'] = table_info['AttributeDefinitions']
|
|
|
|
# Throughput information
|
|
if 'ProvisionedThroughput' in table_info:
|
|
schema['provisioned_throughput'] = table_info['ProvisionedThroughput']
|
|
|
|
# Global Secondary Indexes
|
|
if 'GlobalSecondaryIndexes' in table_info:
|
|
schema['global_secondary_indexes'] = table_info['GlobalSecondaryIndexes']
|
|
|
|
# Local Secondary Indexes
|
|
if 'LocalSecondaryIndexes' in table_info:
|
|
schema['local_secondary_indexes'] = table_info['LocalSecondaryIndexes']
|
|
|
|
# Stream specification
|
|
if 'StreamSpecification' in table_info:
|
|
schema['stream_specification'] = table_info['StreamSpecification']
|
|
|
|
# SSEDescription
|
|
if 'SSEDescription' in table_info:
|
|
schema['sse_description'] = table_info['SSEDescription']
|
|
|
|
return schema
|
|
|
|
def export_table_data_sample(self, table_name, max_items=100):
|
|
"""Export a sample of table data"""
|
|
try:
|
|
table = self.dynamodb_resource.Table(table_name)
|
|
|
|
# Scan with limit to get sample data
|
|
response = table.scan(Limit=max_items)
|
|
items = response.get('Items', [])
|
|
|
|
# Convert Decimal to float for JSON serialization
|
|
def convert_decimals(obj):
|
|
if isinstance(obj, Decimal):
|
|
return float(obj) if obj % 1 != 0 else int(obj)
|
|
elif isinstance(obj, dict):
|
|
return {k: convert_decimals(v) for k, v in obj.items()}
|
|
elif isinstance(obj, list):
|
|
return [convert_decimals(v) for v in obj]
|
|
else:
|
|
return obj
|
|
|
|
items = convert_decimals(items)
|
|
return items
|
|
|
|
except Exception as e:
|
|
print(f"Error exporting data sample from {table_name}: {e}")
|
|
return []
|
|
|
|
def export_table_to_json(self, table_name, table_info, schema, data_sample, export_dir):
|
|
"""Export table information to JSON format"""
|
|
try:
|
|
json_file = os.path.join(export_dir, f"{table_name}.json")
|
|
|
|
table_data = {
|
|
'export_date': datetime.now().isoformat(),
|
|
'table_name': table_name,
|
|
'table_details': table_info,
|
|
'schema': schema,
|
|
'data_sample': data_sample
|
|
}
|
|
|
|
with open(json_file, 'w') as f:
|
|
json.dump(table_data, f, indent=2, default=str)
|
|
|
|
print(f"JSON format exported: {json_file}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error exporting JSON for {table_name}: {e}")
|
|
return False
|
|
|
|
def export_schema_to_csv(self, table_name, schema, export_dir):
|
|
"""Export schema information to CSV format"""
|
|
try:
|
|
csv_file = os.path.join(export_dir, f"{table_name}_schema.csv")
|
|
|
|
with open(csv_file, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
|
|
# Write basic table info
|
|
writer.writerow(['Table Information'])
|
|
writer.writerow(['Table Name', table_name])
|
|
writer.writerow(['Table ARN', schema['table_arn']])
|
|
writer.writerow(['Status', schema['table_status']])
|
|
writer.writerow(['Item Count', schema['item_count']])
|
|
writer.writerow(['Table Size (Bytes)', schema['table_size_bytes']])
|
|
writer.writerow([])
|
|
|
|
# Write key schema
|
|
writer.writerow(['Key Schema'])
|
|
writer.writerow(['Key Type', 'Attribute Name'])
|
|
writer.writerow(['HASH', schema['key_schema']['hash_key']])
|
|
if schema['key_schema']['range_key']:
|
|
writer.writerow(['RANGE', schema['key_schema']['range_key']])
|
|
writer.writerow([])
|
|
|
|
# Write attribute definitions
|
|
writer.writerow(['Attribute Definitions'])
|
|
writer.writerow(['Attribute Name', 'Attribute Type'])
|
|
for attr in schema['attribute_definitions']:
|
|
writer.writerow([attr['AttributeName'], attr['AttributeType']])
|
|
writer.writerow([])
|
|
|
|
# Write throughput info
|
|
if 'provisioned_throughput' in schema:
|
|
writer.writerow(['Provisioned Throughput'])
|
|
throughput = schema['provisioned_throughput']
|
|
writer.writerow(['Read Capacity', throughput['ReadCapacityUnits']])
|
|
writer.writerow(['Write Capacity', throughput['WriteCapacityUnits']])
|
|
writer.writerow([])
|
|
|
|
print(f"CSV schema exported: {csv_file}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error exporting CSV schema for {table_name}: {e}")
|
|
return False
|
|
|
|
def export_data_sample_to_csv(self, table_name, data_sample, export_dir):
|
|
"""Export data sample to CSV format"""
|
|
if not data_sample:
|
|
return False
|
|
|
|
try:
|
|
csv_file = os.path.join(export_dir, f"{table_name}_data_sample.csv")
|
|
|
|
if data_sample:
|
|
# Get all unique field names from the sample
|
|
fieldnames = set()
|
|
for item in data_sample:
|
|
fieldnames.update(item.keys())
|
|
fieldnames = sorted(list(fieldnames))
|
|
|
|
with open(csv_file, 'w', newline='') as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for item in data_sample:
|
|
# Convert all values to strings for CSV
|
|
row = {}
|
|
for key in fieldnames:
|
|
value = item.get(key, '')
|
|
if isinstance(value, (dict, list)):
|
|
value = json.dumps(value)
|
|
row[key] = str(value)
|
|
writer.writerow(row)
|
|
|
|
print(f"CSV data sample exported: {csv_file}")
|
|
return True
|
|
else:
|
|
print(f"No data to export for {table_name}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Error exporting CSV data sample for {table_name}: {e}")
|
|
return False
|
|
|
|
def export_global_secondary_indexes(self, table_name, schema, export_dir):
|
|
"""Export Global Secondary Indexes information"""
|
|
try:
|
|
if 'global_secondary_indexes' in schema and schema['global_secondary_indexes']:
|
|
gsi_file = os.path.join(export_dir, f"{table_name}_gsi.json")
|
|
|
|
gsi_data = {
|
|
'table_name': table_name,
|
|
'global_secondary_indexes': schema['global_secondary_indexes'],
|
|
'export_date': datetime.now().isoformat()
|
|
}
|
|
|
|
with open(gsi_file, 'w') as f:
|
|
json.dump(gsi_data, f, indent=2, default=str)
|
|
|
|
print(f"GSI information exported: {gsi_file}")
|
|
return True
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error exporting GSI for {table_name}: {e}")
|
|
return False
|
|
|
|
def create_export_summary(self, tables, schemas, export_dir):
|
|
"""Create a comprehensive export summary"""
|
|
summary = {
|
|
'export_date': datetime.now().isoformat(),
|
|
'total_tables': len(tables),
|
|
'tables': [],
|
|
'total_items': 0,
|
|
'total_size_bytes': 0
|
|
}
|
|
|
|
for schema in schemas:
|
|
table_info = {
|
|
'table_name': schema['table_name'],
|
|
'status': schema['table_status'],
|
|
'item_count': schema['item_count'],
|
|
'table_size_bytes': schema['table_size_bytes'],
|
|
'key_schema': schema['key_schema'],
|
|
'has_gsi': 'global_secondary_indexes' in schema and bool(schema['global_secondary_indexes']),
|
|
'has_lsi': 'local_secondary_indexes' in schema and bool(schema['local_secondary_indexes'])
|
|
}
|
|
summary['tables'].append(table_info)
|
|
summary['total_items'] += schema['item_count']
|
|
summary['total_size_bytes'] += schema['table_size_bytes']
|
|
|
|
summary_file = os.path.join(export_dir, 'export_summary.json')
|
|
with open(summary_file, 'w') as f:
|
|
json.dump(summary, f, indent=2)
|
|
|
|
print(f"Export summary created: {summary_file}")
|
|
return summary
|
|
|
|
def export_all_dynamodb_data(self, base_export_dir='dynamodb_export', include_data_samples=True, sample_size=50):
|
|
"""Main method to export all DynamoDB data"""
|
|
|
|
# Create export directory with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
export_dir = os.path.join(base_export_dir, f"export_{timestamp}")
|
|
os.makedirs(export_dir, exist_ok=True)
|
|
|
|
print(f"Starting DynamoDB export to: {export_dir}")
|
|
|
|
# Get all tables
|
|
tables = self.get_all_tables()
|
|
|
|
if not tables:
|
|
print("No DynamoDB tables found or error retrieving tables")
|
|
return
|
|
|
|
successful_exports = 0
|
|
all_schemas = []
|
|
|
|
# Export each table
|
|
for table_name in tables:
|
|
print(f"\nProcessing table: {table_name}")
|
|
|
|
# Create table-specific directory
|
|
table_dir = os.path.join(export_dir, 'tables', table_name)
|
|
os.makedirs(table_dir, exist_ok=True)
|
|
|
|
# Get table details
|
|
table_info = self.get_table_details(table_name)
|
|
if not table_info:
|
|
print(f"Failed to get details for table: {table_name}")
|
|
continue
|
|
|
|
# Extract schema
|
|
schema = self.get_table_schema(table_info)
|
|
all_schemas.append(schema)
|
|
|
|
# Get data sample
|
|
data_sample = []
|
|
if include_data_samples:
|
|
data_sample = self.export_table_data_sample(table_name, sample_size)
|
|
|
|
# Export in multiple formats
|
|
json_success = self.export_table_to_json(table_name, table_info, schema, data_sample, table_dir)
|
|
csv_schema_success = self.export_schema_to_csv(table_name, schema, table_dir)
|
|
csv_data_success = self.export_data_sample_to_csv(table_name, data_sample, table_dir)
|
|
gsi_success = self.export_global_secondary_indexes(table_name, schema, table_dir)
|
|
|
|
if json_success or csv_schema_success or csv_data_success:
|
|
successful_exports += 1
|
|
print(f"Successfully exported table: {table_name}")
|
|
else:
|
|
print(f"Failed to export table: {table_name}")
|
|
|
|
# Create export summary
|
|
summary = self.create_export_summary(tables, all_schemas, export_dir)
|
|
|
|
# Final report
|
|
print(f"\nDynamoDB Export Completed Successfully!")
|
|
print(f"Export Location: {os.path.abspath(export_dir)}")
|
|
print(f"Summary:")
|
|
print(f" Tables Exported: {successful_exports}/{len(tables)}")
|
|
print(f" Total Items: {summary['total_items']:,}")
|
|
print(f" Total Size: {summary['total_size_bytes']:,} bytes")
|
|
print(f" Export Formats: JSON, CSV")
|
|
if include_data_samples:
|
|
print(f" Data Samples: {sample_size} items per table")
|
|
|
|
return export_dir
|
|
|
|
# Quick export function
|
|
def export_dynamodb_simple(export_dir='dynamodb_simple_export'):
|
|
"""Simple one-function export for quick use"""
|
|
try:
|
|
exporter = DynamoDBExporter()
|
|
|
|
# Create export directory
|
|
os.makedirs(export_dir, exist_ok=True)
|
|
|
|
# Get tables
|
|
tables = exporter.get_all_tables()
|
|
|
|
all_tables_data = {
|
|
'export_date': datetime.now().isoformat(),
|
|
'tables': []
|
|
}
|
|
|
|
for table_name in tables:
|
|
print(f"Exporting: {table_name}")
|
|
table_info = exporter.get_table_details(table_name)
|
|
|
|
if table_info:
|
|
schema = exporter.get_table_schema(table_info)
|
|
table_data = {
|
|
'table_name': table_name,
|
|
'schema': schema
|
|
}
|
|
all_tables_data['tables'].append(table_data)
|
|
|
|
# Save all data to a single file
|
|
output_file = os.path.join(export_dir, 'all_dynamodb_schemas.json')
|
|
with open(output_file, 'w') as f:
|
|
json.dump(all_tables_data, f, indent=2, default=str)
|
|
|
|
print(f"Simple export completed: {output_file}")
|
|
return output_file
|
|
|
|
except Exception as e:
|
|
print(f"Error in simple export: {e}")
|
|
return None
|
|
|
|
# Function to export table creation templates
|
|
def export_cloudformation_templates(schemas, export_dir):
|
|
"""Export CloudFormation templates for tables"""
|
|
try:
|
|
templates_dir = os.path.join(export_dir, 'cloudformation_templates')
|
|
os.makedirs(templates_dir, exist_ok=True)
|
|
|
|
for schema in schemas:
|
|
table_name = schema['table_name']
|
|
template = {
|
|
'AWSTemplateFormatVersion': '2010-09-09',
|
|
'Description': f'CloudFormation template for DynamoDB table {table_name}',
|
|
'Resources': {
|
|
table_name: {
|
|
'Type': 'AWS::DynamoDB::Table',
|
|
'Properties': {
|
|
'TableName': table_name,
|
|
'AttributeDefinitions': schema['attribute_definitions'],
|
|
'KeySchema': schema.get('key_schema', {}),
|
|
'BillingMode': 'PROVISIONED',
|
|
'ProvisionedThroughput': schema.get('provisioned_throughput', {
|
|
'ReadCapacityUnits': 5,
|
|
'WriteCapacityUnits': 5
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# Add Global Secondary Indexes if present
|
|
if 'global_secondary_indexes' in schema:
|
|
template['Resources'][table_name]['Properties']['GlobalSecondaryIndexes'] = schema['global_secondary_indexes']
|
|
|
|
template_file = os.path.join(templates_dir, f"{table_name}_template.json")
|
|
with open(template_file, 'w') as f:
|
|
json.dump(template, f, indent=2)
|
|
|
|
print(f"CloudFormation templates exported to: {templates_dir}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error exporting CloudFormation templates: {e}")
|
|
return False
|
|
|
|
def main():
|
|
print("Starting DynamoDB Export...")
|
|
|
|
try:
|
|
# Full comprehensive export
|
|
exporter = DynamoDBExporter()
|
|
export_path = exporter.export_all_dynamodb_data(
|
|
base_export_dir='dynamodb_backup',
|
|
include_data_samples=True,
|
|
sample_size=100
|
|
)
|
|
|
|
print(f"\nExport completed successfully!")
|
|
print(f"All DynamoDB data has been saved to: {export_path}")
|
|
|
|
except Exception as e:
|
|
print(f"Export failed: {e}")
|
|
print("\nTrying simple export method...")
|
|
|
|
# Fallback to simple export
|
|
simple_export = export_dynamodb_simple()
|
|
if simple_export:
|
|
print(f"Simple export completed: {simple_export}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|
|
|