372 أسطر
14 KiB
Python
372 أسطر
14 KiB
Python
import boto3
|
|
import json
|
|
import os
|
|
import csv
|
|
from datetime import datetime
|
|
|
|
class Route53Exporter:
|
|
def __init__(self, profile_name=None):
|
|
"""
|
|
Initialize Route53 Exporter
|
|
|
|
Args:
|
|
profile_name (str): AWS profile name (optional)
|
|
"""
|
|
try:
|
|
if profile_name:
|
|
session = boto3.Session(profile_name=profile_name)
|
|
self.route53 = session.client('route53')
|
|
else:
|
|
self.route53 = boto3.client('route53')
|
|
|
|
print("Route53 client initialized successfully")
|
|
except Exception as e:
|
|
print(f"Error initializing Route53 client: {e}")
|
|
raise
|
|
|
|
def get_all_hosted_zones(self):
|
|
"""Retrieve all hosted zones from Route53"""
|
|
try:
|
|
hosted_zones = []
|
|
paginator = self.route53.get_paginator('list_hosted_zones')
|
|
|
|
for page in paginator.paginate():
|
|
hosted_zones.extend(page['HostedZones'])
|
|
|
|
print(f"Found {len(hosted_zones)} hosted zones")
|
|
return hosted_zones
|
|
except Exception as e:
|
|
print(f"Error fetching hosted zones: {e}")
|
|
return []
|
|
|
|
def get_zone_records(self, zone_id, zone_name):
|
|
"""Get all records for a specific hosted zone"""
|
|
try:
|
|
# Remove the leading /hostedzone/ from zone_id if present
|
|
zone_id = zone_id.replace('/hostedzone/', '')
|
|
|
|
records = []
|
|
paginator = self.route53.get_paginator('list_resource_record_sets')
|
|
|
|
for page in paginator.paginate(HostedZoneId=zone_id):
|
|
records.extend(page['ResourceRecordSets'])
|
|
|
|
print(f"Found {len(records)} records in zone: {zone_name}")
|
|
return records
|
|
except Exception as e:
|
|
print(f"Error fetching records for zone {zone_name}: {e}")
|
|
return []
|
|
|
|
def export_zone_to_bind_format(self, zone_name, records, export_dir):
|
|
"""Export zone records to BIND format"""
|
|
try:
|
|
bind_file = os.path.join(export_dir, f"{zone_name}.zone")
|
|
|
|
with open(bind_file, 'w') as f:
|
|
f.write(f"; BIND format export for {zone_name}\n")
|
|
f.write(f"; Exported on: {datetime.now().isoformat()}\n")
|
|
f.write(f"$ORIGIN {zone_name}.\n\n")
|
|
|
|
for record in records:
|
|
name = record['Name'].rstrip('.')
|
|
record_type = record['Type']
|
|
ttl = record.get('TTL', 300)
|
|
|
|
if record_type in ['A', 'AAAA', 'CNAME', 'TXT', 'MX', 'NS', 'PTR', 'SRV']:
|
|
f.write(f"{name}.\t{ttl}\tIN\t{record_type}\t")
|
|
|
|
if 'ResourceRecords' in record:
|
|
values = [rr['Value'] for rr in record['ResourceRecords']]
|
|
if record_type == 'TXT':
|
|
values = [f'"{v}"' for v in values]
|
|
f.write(' '.join(values))
|
|
elif 'AliasTarget' in record:
|
|
alias = record['AliasTarget']
|
|
f.write(f"{alias['DNSName'].rstrip('.')} ; ALIAS")
|
|
|
|
f.write("\n")
|
|
|
|
print(f"BIND format exported: {bind_file}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error exporting BIND format for {zone_name}: {e}")
|
|
return False
|
|
|
|
def export_zone_to_json(self, zone_name, records, export_dir):
|
|
"""Export zone records to JSON format"""
|
|
try:
|
|
json_file = os.path.join(export_dir, f"{zone_name}.json")
|
|
|
|
zone_data = {
|
|
'zone_name': zone_name,
|
|
'export_date': datetime.now().isoformat(),
|
|
'total_records': len(records),
|
|
'records': records
|
|
}
|
|
|
|
with open(json_file, 'w') as f:
|
|
json.dump(zone_data, f, indent=2, default=str)
|
|
|
|
print(f"JSON format exported: {json_file}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error exporting JSON for {zone_name}: {e}")
|
|
return False
|
|
|
|
def export_zone_to_csv(self, zone_name, records, export_dir):
|
|
"""Export zone records to CSV format"""
|
|
try:
|
|
csv_file = os.path.join(export_dir, f"{zone_name}.csv")
|
|
|
|
with open(csv_file, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(['Name', 'Type', 'TTL', 'Value', 'AliasTarget'])
|
|
|
|
for record in records:
|
|
name = record['Name']
|
|
record_type = record['Type']
|
|
ttl = record.get('TTL', '')
|
|
|
|
if 'ResourceRecords' in record:
|
|
values = ' | '.join([rr['Value'] for rr in record['ResourceRecords']])
|
|
alias = ''
|
|
elif 'AliasTarget' in record:
|
|
values = ''
|
|
alias = record['AliasTarget']['DNSName']
|
|
else:
|
|
values = ''
|
|
alias = ''
|
|
|
|
writer.writerow([name, record_type, ttl, values, alias])
|
|
|
|
print(f"CSV format exported: {csv_file}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error exporting CSV for {zone_name}: {e}")
|
|
return False
|
|
|
|
def export_health_checks(self, export_dir):
|
|
"""Export Route53 health checks"""
|
|
try:
|
|
health_checks = []
|
|
paginator = self.route53.get_paginator('list_health_checks')
|
|
|
|
for page in paginator.paginate():
|
|
health_checks.extend(page['HealthChecks'])
|
|
|
|
if health_checks:
|
|
health_checks_file = os.path.join(export_dir, 'health_checks.json')
|
|
with open(health_checks_file, 'w') as f:
|
|
json.dump(health_checks, f, indent=2, default=str)
|
|
|
|
print(f"Health checks exported: {len(health_checks)} found")
|
|
else:
|
|
print("No health checks found")
|
|
|
|
return health_checks
|
|
except Exception as e:
|
|
print(f"Error exporting health checks: {e}")
|
|
return []
|
|
|
|
def export_traffic_policies(self, export_dir):
|
|
"""Export Route53 traffic policies"""
|
|
try:
|
|
policies = []
|
|
paginator = self.route53.get_paginator('list_traffic_policies')
|
|
|
|
for page in paginator.paginate():
|
|
policies.extend(page['TrafficPolicySummaries'])
|
|
|
|
# Get detailed policy information
|
|
detailed_policies = []
|
|
for policy in policies:
|
|
try:
|
|
policy_detail = self.route53.get_traffic_policy(
|
|
Id=policy['Id'],
|
|
Version=policy['LatestVersion']
|
|
)
|
|
detailed_policies.append(policy_detail)
|
|
except Exception as e:
|
|
print(f"Error fetching details for policy {policy['Id']}: {e}")
|
|
|
|
if detailed_policies:
|
|
policies_file = os.path.join(export_dir, 'traffic_policies.json')
|
|
with open(policies_file, 'w') as f:
|
|
json.dump(detailed_policies, f, indent=2, default=str)
|
|
|
|
print(f"Traffic policies exported: {len(detailed_policies)} found")
|
|
else:
|
|
print("No traffic policies found")
|
|
|
|
return detailed_policies
|
|
except Exception as e:
|
|
print(f"Error exporting traffic policies: {e}")
|
|
return []
|
|
|
|
def create_export_summary(self, hosted_zones, export_dir):
|
|
"""Create a comprehensive export summary"""
|
|
summary = {
|
|
'export_date': datetime.now().isoformat(),
|
|
'total_hosted_zones': len(hosted_zones),
|
|
'hosted_zones': [],
|
|
'formats_exported': ['JSON', 'CSV', 'BIND']
|
|
}
|
|
|
|
for zone in hosted_zones:
|
|
zone_info = {
|
|
'name': zone['Name'],
|
|
'id': zone['Id'],
|
|
'record_count': zone.get('ResourceRecordSetCount', 0),
|
|
'private_zone': zone.get('Config', {}).get('PrivateZone', False)
|
|
}
|
|
summary['hosted_zones'].append(zone_info)
|
|
|
|
summary_file = os.path.join(export_dir, 'export_summary.json')
|
|
with open(summary_file, 'w') as f:
|
|
json.dump(summary, f, indent=2)
|
|
|
|
print(f"Export summary created: {summary_file}")
|
|
return summary
|
|
|
|
def export_all_route53_data(self, base_export_dir='route53_export'):
|
|
"""Main method to export all Route53 data"""
|
|
|
|
# Create export directory with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
export_dir = os.path.join(base_export_dir, f"export_{timestamp}")
|
|
os.makedirs(export_dir, exist_ok=True)
|
|
|
|
print(f"Starting Route53 export to: {export_dir}")
|
|
|
|
# Get all hosted zones
|
|
hosted_zones = self.get_all_hosted_zones()
|
|
|
|
if not hosted_zones:
|
|
print("No hosted zones found or error retrieving zones")
|
|
return
|
|
|
|
total_records_exported = 0
|
|
successful_zones = 0
|
|
|
|
# Export each hosted zone
|
|
for zone in hosted_zones:
|
|
zone_name = zone['Name'].rstrip('.')
|
|
zone_id = zone['Id']
|
|
|
|
print(f"\nProcessing zone: {zone_name}")
|
|
|
|
# Create zone-specific directory
|
|
zone_dir = os.path.join(export_dir, 'zones', zone_name)
|
|
os.makedirs(zone_dir, exist_ok=True)
|
|
|
|
# Get all records for this zone
|
|
records = self.get_zone_records(zone_id, zone_name)
|
|
|
|
if records:
|
|
# Export in multiple formats
|
|
json_success = self.export_zone_to_json(zone_name, records, zone_dir)
|
|
csv_success = self.export_zone_to_csv(zone_name, records, zone_dir)
|
|
bind_success = self.export_zone_to_bind_format(zone_name, records, zone_dir)
|
|
|
|
if json_success or csv_success or bind_success:
|
|
successful_zones += 1
|
|
total_records_exported += len(records)
|
|
print(f"Successfully exported zone: {zone_name}")
|
|
else:
|
|
print(f"Failed to export zone: {zone_name}")
|
|
else:
|
|
print(f"No records found for zone: {zone_name}")
|
|
|
|
# Export additional Route53 components
|
|
print("\nExporting additional Route53 components...")
|
|
|
|
# Export health checks
|
|
health_checks_dir = os.path.join(export_dir, 'health_checks')
|
|
os.makedirs(health_checks_dir, exist_ok=True)
|
|
self.export_health_checks(health_checks_dir)
|
|
|
|
# Export traffic policies
|
|
policies_dir = os.path.join(export_dir, 'traffic_policies')
|
|
os.makedirs(policies_dir, exist_ok=True)
|
|
self.export_traffic_policies(policies_dir)
|
|
|
|
# Create export summary
|
|
summary = self.create_export_summary(hosted_zones, export_dir)
|
|
|
|
# Final report
|
|
print(f"\nRoute53 Export Completed Successfully!")
|
|
print(f"Export Location: {os.path.abspath(export_dir)}")
|
|
print(f"Summary:")
|
|
print(f" Hosted Zones: {successful_zones}/{len(hosted_zones)}")
|
|
print(f" Total Records: {total_records_exported}")
|
|
print(f" Export Formats: JSON, CSV, BIND")
|
|
print(f" Additional Data: Health Checks, Traffic Policies")
|
|
|
|
return export_dir
|
|
|
|
# Quick export function
|
|
def export_route53_simple(export_dir='route53_simple_export'):
|
|
"""Simple one-function export for quick use"""
|
|
try:
|
|
exporter = Route53Exporter()
|
|
|
|
# Create export directory
|
|
os.makedirs(export_dir, exist_ok=True)
|
|
|
|
# Get hosted zones
|
|
hosted_zones = exporter.get_all_hosted_zones()
|
|
|
|
all_zones_data = {
|
|
'export_date': datetime.now().isoformat(),
|
|
'zones': []
|
|
}
|
|
|
|
for zone in hosted_zones:
|
|
zone_name = zone['Name'].rstrip('.')
|
|
zone_id = zone['Id']
|
|
|
|
print(f"Exporting: {zone_name}")
|
|
records = exporter.get_zone_records(zone_id, zone_name)
|
|
|
|
zone_data = {
|
|
'zone_info': zone,
|
|
'records': records
|
|
}
|
|
|
|
all_zones_data['zones'].append(zone_data)
|
|
|
|
# Save all data to a single file
|
|
output_file = os.path.join(export_dir, 'all_route53_data.json')
|
|
with open(output_file, 'w') as f:
|
|
json.dump(all_zones_data, f, indent=2, default=str)
|
|
|
|
print(f"Simple export completed: {output_file}")
|
|
return output_file
|
|
|
|
except Exception as e:
|
|
print(f"Error in simple export: {e}")
|
|
return None
|
|
|
|
# Simple CLI version
|
|
def main():
|
|
print("Starting Route53 DNS Export...")
|
|
|
|
try:
|
|
# Full comprehensive export
|
|
exporter = Route53Exporter()
|
|
export_path = exporter.export_all_route53_data()
|
|
|
|
print(f"\nExport completed successfully!")
|
|
print(f"All Route53 data has been saved to: {export_path}")
|
|
|
|
except Exception as e:
|
|
print(f"Export failed: {e}")
|
|
print("\nTrying simple export method...")
|
|
|
|
# Fallback to simple export
|
|
simple_export = export_route53_simple()
|
|
if simple_export:
|
|
print(f"Simple export completed: {simple_export}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |