From e158ecccb0677c8b6a5369352ad6c6ca43662881 Mon Sep 17 00:00:00 2001 From: ghaymah_dev Date: Tue, 18 Nov 2025 12:17:16 +0000 Subject: [PATCH] Add main.py --- main.py | 372 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 372 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..654a975 --- /dev/null +++ b/main.py @@ -0,0 +1,372 @@ +import boto3 +import json +import os +import csv +from datetime import datetime + +class Route53Exporter: + def __init__(self, profile_name=None): + """ + Initialize Route53 Exporter + + Args: + profile_name (str): AWS profile name (optional) + """ + try: + if profile_name: + session = boto3.Session(profile_name=profile_name) + self.route53 = session.client('route53') + else: + self.route53 = boto3.client('route53') + + print("Route53 client initialized successfully") + except Exception as e: + print(f"Error initializing Route53 client: {e}") + raise + + def get_all_hosted_zones(self): + """Retrieve all hosted zones from Route53""" + try: + hosted_zones = [] + paginator = self.route53.get_paginator('list_hosted_zones') + + for page in paginator.paginate(): + hosted_zones.extend(page['HostedZones']) + + print(f"Found {len(hosted_zones)} hosted zones") + return hosted_zones + except Exception as e: + print(f"Error fetching hosted zones: {e}") + return [] + + def get_zone_records(self, zone_id, zone_name): + """Get all records for a specific hosted zone""" + try: + # Remove the leading /hostedzone/ from zone_id if present + zone_id = zone_id.replace('/hostedzone/', '') + + records = [] + paginator = self.route53.get_paginator('list_resource_record_sets') + + for page in paginator.paginate(HostedZoneId=zone_id): + records.extend(page['ResourceRecordSets']) + + print(f"Found {len(records)} records in zone: {zone_name}") + return records + except Exception as e: + print(f"Error fetching records for zone {zone_name}: {e}") + return [] + + def export_zone_to_bind_format(self, zone_name, records, export_dir): + """Export zone records to BIND format""" + try: + bind_file = os.path.join(export_dir, f"{zone_name}.zone") + + with open(bind_file, 'w') as f: + f.write(f"; BIND format export for {zone_name}\n") + f.write(f"; Exported on: {datetime.now().isoformat()}\n") + f.write(f"$ORIGIN {zone_name}.\n\n") + + for record in records: + name = record['Name'].rstrip('.') + record_type = record['Type'] + ttl = record.get('TTL', 300) + + if record_type in ['A', 'AAAA', 'CNAME', 'TXT', 'MX', 'NS', 'PTR', 'SRV']: + f.write(f"{name}.\t{ttl}\tIN\t{record_type}\t") + + if 'ResourceRecords' in record: + values = [rr['Value'] for rr in record['ResourceRecords']] + if record_type == 'TXT': + values = [f'"{v}"' for v in values] + f.write(' '.join(values)) + elif 'AliasTarget' in record: + alias = record['AliasTarget'] + f.write(f"{alias['DNSName'].rstrip('.')} ; ALIAS") + + f.write("\n") + + print(f"BIND format exported: {bind_file}") + return True + except Exception as e: + print(f"Error exporting BIND format for {zone_name}: {e}") + return False + + def export_zone_to_json(self, zone_name, records, export_dir): + """Export zone records to JSON format""" + try: + json_file = os.path.join(export_dir, f"{zone_name}.json") + + zone_data = { + 'zone_name': zone_name, + 'export_date': datetime.now().isoformat(), + 'total_records': len(records), + 'records': records + } + + with open(json_file, 'w') as f: + json.dump(zone_data, f, indent=2, default=str) + + print(f"JSON format exported: {json_file}") + return True + except Exception as e: + print(f"Error exporting JSON for {zone_name}: {e}") + return False + + def export_zone_to_csv(self, zone_name, records, export_dir): + """Export zone records to CSV format""" + try: + csv_file = os.path.join(export_dir, f"{zone_name}.csv") + + with open(csv_file, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['Name', 'Type', 'TTL', 'Value', 'AliasTarget']) + + for record in records: + name = record['Name'] + record_type = record['Type'] + ttl = record.get('TTL', '') + + if 'ResourceRecords' in record: + values = ' | '.join([rr['Value'] for rr in record['ResourceRecords']]) + alias = '' + elif 'AliasTarget' in record: + values = '' + alias = record['AliasTarget']['DNSName'] + else: + values = '' + alias = '' + + writer.writerow([name, record_type, ttl, values, alias]) + + print(f"CSV format exported: {csv_file}") + return True + except Exception as e: + print(f"Error exporting CSV for {zone_name}: {e}") + return False + + def export_health_checks(self, export_dir): + """Export Route53 health checks""" + try: + health_checks = [] + paginator = self.route53.get_paginator('list_health_checks') + + for page in paginator.paginate(): + health_checks.extend(page['HealthChecks']) + + if health_checks: + health_checks_file = os.path.join(export_dir, 'health_checks.json') + with open(health_checks_file, 'w') as f: + json.dump(health_checks, f, indent=2, default=str) + + print(f"Health checks exported: {len(health_checks)} found") + else: + print("No health checks found") + + return health_checks + except Exception as e: + print(f"Error exporting health checks: {e}") + return [] + + def export_traffic_policies(self, export_dir): + """Export Route53 traffic policies""" + try: + policies = [] + paginator = self.route53.get_paginator('list_traffic_policies') + + for page in paginator.paginate(): + policies.extend(page['TrafficPolicySummaries']) + + # Get detailed policy information + detailed_policies = [] + for policy in policies: + try: + policy_detail = self.route53.get_traffic_policy( + Id=policy['Id'], + Version=policy['LatestVersion'] + ) + detailed_policies.append(policy_detail) + except Exception as e: + print(f"Error fetching details for policy {policy['Id']}: {e}") + + if detailed_policies: + policies_file = os.path.join(export_dir, 'traffic_policies.json') + with open(policies_file, 'w') as f: + json.dump(detailed_policies, f, indent=2, default=str) + + print(f"Traffic policies exported: {len(detailed_policies)} found") + else: + print("No traffic policies found") + + return detailed_policies + except Exception as e: + print(f"Error exporting traffic policies: {e}") + return [] + + def create_export_summary(self, hosted_zones, export_dir): + """Create a comprehensive export summary""" + summary = { + 'export_date': datetime.now().isoformat(), + 'total_hosted_zones': len(hosted_zones), + 'hosted_zones': [], + 'formats_exported': ['JSON', 'CSV', 'BIND'] + } + + for zone in hosted_zones: + zone_info = { + 'name': zone['Name'], + 'id': zone['Id'], + 'record_count': zone.get('ResourceRecordSetCount', 0), + 'private_zone': zone.get('Config', {}).get('PrivateZone', False) + } + summary['hosted_zones'].append(zone_info) + + summary_file = os.path.join(export_dir, 'export_summary.json') + with open(summary_file, 'w') as f: + json.dump(summary, f, indent=2) + + print(f"Export summary created: {summary_file}") + return summary + + def export_all_route53_data(self, base_export_dir='route53_export'): + """Main method to export all Route53 data""" + + # Create export directory with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + export_dir = os.path.join(base_export_dir, f"export_{timestamp}") + os.makedirs(export_dir, exist_ok=True) + + print(f"Starting Route53 export to: {export_dir}") + + # Get all hosted zones + hosted_zones = self.get_all_hosted_zones() + + if not hosted_zones: + print("No hosted zones found or error retrieving zones") + return + + total_records_exported = 0 + successful_zones = 0 + + # Export each hosted zone + for zone in hosted_zones: + zone_name = zone['Name'].rstrip('.') + zone_id = zone['Id'] + + print(f"\nProcessing zone: {zone_name}") + + # Create zone-specific directory + zone_dir = os.path.join(export_dir, 'zones', zone_name) + os.makedirs(zone_dir, exist_ok=True) + + # Get all records for this zone + records = self.get_zone_records(zone_id, zone_name) + + if records: + # Export in multiple formats + json_success = self.export_zone_to_json(zone_name, records, zone_dir) + csv_success = self.export_zone_to_csv(zone_name, records, zone_dir) + bind_success = self.export_zone_to_bind_format(zone_name, records, zone_dir) + + if json_success or csv_success or bind_success: + successful_zones += 1 + total_records_exported += len(records) + print(f"Successfully exported zone: {zone_name}") + else: + print(f"Failed to export zone: {zone_name}") + else: + print(f"No records found for zone: {zone_name}") + + # Export additional Route53 components + print("\nExporting additional Route53 components...") + + # Export health checks + health_checks_dir = os.path.join(export_dir, 'health_checks') + os.makedirs(health_checks_dir, exist_ok=True) + self.export_health_checks(health_checks_dir) + + # Export traffic policies + policies_dir = os.path.join(export_dir, 'traffic_policies') + os.makedirs(policies_dir, exist_ok=True) + self.export_traffic_policies(policies_dir) + + # Create export summary + summary = self.create_export_summary(hosted_zones, export_dir) + + # Final report + print(f"\nRoute53 Export Completed Successfully!") + print(f"Export Location: {os.path.abspath(export_dir)}") + print(f"Summary:") + print(f" Hosted Zones: {successful_zones}/{len(hosted_zones)}") + print(f" Total Records: {total_records_exported}") + print(f" Export Formats: JSON, CSV, BIND") + print(f" Additional Data: Health Checks, Traffic Policies") + + return export_dir + +# Quick export function +def export_route53_simple(export_dir='route53_simple_export'): + """Simple one-function export for quick use""" + try: + exporter = Route53Exporter() + + # Create export directory + os.makedirs(export_dir, exist_ok=True) + + # Get hosted zones + hosted_zones = exporter.get_all_hosted_zones() + + all_zones_data = { + 'export_date': datetime.now().isoformat(), + 'zones': [] + } + + for zone in hosted_zones: + zone_name = zone['Name'].rstrip('.') + zone_id = zone['Id'] + + print(f"Exporting: {zone_name}") + records = exporter.get_zone_records(zone_id, zone_name) + + zone_data = { + 'zone_info': zone, + 'records': records + } + + all_zones_data['zones'].append(zone_data) + + # Save all data to a single file + output_file = os.path.join(export_dir, 'all_route53_data.json') + with open(output_file, 'w') as f: + json.dump(all_zones_data, f, indent=2, default=str) + + print(f"Simple export completed: {output_file}") + return output_file + + except Exception as e: + print(f"Error in simple export: {e}") + return None + +# Simple CLI version +def main(): + print("Starting Route53 DNS Export...") + + try: + # Full comprehensive export + exporter = Route53Exporter() + export_path = exporter.export_all_route53_data() + + print(f"\nExport completed successfully!") + print(f"All Route53 data has been saved to: {export_path}") + + except Exception as e: + print(f"Export failed: {e}") + print("\nTrying simple export method...") + + # Fallback to simple export + simple_export = export_route53_simple() + if simple_export: + print(f"Simple export completed: {simple_export}") + +if __name__ == "__main__": + main() \ No newline at end of file