#!/usr/bin/env python3 """ AWS Resource Inventory Script Generates a comprehensive report of all resources in an AWS account Improved version with error handling and additional features """ import boto3 import json import pandas as pd from datetime import datetime import csv import logging from typing import List, Dict, Any import os # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AWSResourceInventory: def __init__(self, profile_name: str = None, regions: List[str] = None): """ Initialize AWS Resource Inventory Args: profile_name: AWS profile name to use (optional) regions: List of regions to scan (optional) """ try: if profile_name: self.session = boto3.Session(profile_name=profile_name) else: self.session = boto3.Session() self.resources = {} self.regions = regions or self.get_all_regions('ec2') self.account_id = self.get_account_id() logger.info(f"Initialized for account {self.account_id}, regions: {self.regions}") except Exception as e: logger.error(f"Failed to initialize AWS session: {e}") raise def get_account_id(self) -> str: """Get AWS account ID""" try: sts = self.session.client('sts') return sts.get_caller_identity()['Account'] except Exception as e: logger.error(f"Error getting account ID: {e}") return "Unknown" def get_all_regions(self, service: str = 'ec2') -> List[str]: """Get all available regions for a service""" try: client = self.session.client(service, region_name='us-east-1') regions = [region['RegionName'] for region in client.describe_regions()['Regions']] logger.info(f"Found {len(regions)} regions for {service}") return regions except Exception as e: logger.warning(f"Error getting regions for {service}: {e}. Using default regions.") return ['us-east-1', 'us-west-2', 'eu-west-1'] def safe_boto_call(self, client_method, **kwargs): """Safe wrapper for boto3 calls with error handling""" try: return client_method(**kwargs) except Exception as e: logger.error(f"Boto3 call failed: {e}") return None def get_s3_buckets(self): """Get all S3 buckets with detailed information""" try: s3 = self.session.client('s3') response = self.safe_boto_call(s3.list_buckets) if not response: return buckets = [] for bucket in response['Buckets']: bucket_info = { 'Name': bucket['Name'], 'CreationDate': bucket['CreationDate'], 'Type': 'S3 Bucket', 'Service': 'S3', 'AccountId': self.account_id } # Get bucket location and other details try: location = self.safe_boto_call( s3.get_bucket_location, Bucket=bucket['Name'] ) bucket_info['Region'] = location.get('LocationConstraint', 'us-east-1') # Get bucket size and object count (requires ListBucket permission) try: cloudwatch = self.session.client('cloudwatch') end_time = datetime.utcnow() start_time = datetime.utcnow().replace(day=1) # Start of current month # Get bucket size size_response = cloudwatch.get_metric_statistics( Namespace='AWS/S3', MetricName='BucketSizeBytes', Dimensions=[ {'Name': 'BucketName', 'Value': bucket['Name']}, {'Name': 'StorageType', 'Value': 'StandardStorage'} ], StartTime=start_time, EndTime=end_time, Period=86400, Statistics=['Average'] ) if size_response['Datapoints']: bucket_info['SizeBytes'] = size_response['Datapoints'][0]['Average'] except Exception as e: logger.debug(f"Could not get bucket metrics for {bucket['Name']}: {e}") except Exception as e: logger.warning(f"Could not get location for bucket {bucket['Name']}: {e}") bucket_info['Region'] = 'Unknown' buckets.append(bucket_info) self.resources['S3'] = buckets logger.info(f"Found {len(buckets)} S3 buckets") except Exception as e: logger.error(f"Error getting S3 buckets: {e}") def get_lambda_functions(self): """Get all Lambda functions across regions with enhanced details""" lambda_functions = [] for region in self.regions: try: lambda_client = self.session.client('lambda', region_name=region) paginator = lambda_client.get_paginator('list_functions') for page in paginator.paginate(): for func in page['Functions']: function_info = { 'Name': func['FunctionName'], 'Runtime': func.get('Runtime', 'N/A'), 'MemorySize': func['MemorySize'], 'Timeout': func['Timeout'], 'LastModified': func['LastModified'], 'Region': region, 'Type': 'Lambda Function', 'Service': 'Lambda', 'AccountId': self.account_id, 'CodeSize': func.get('CodeSize', 0), 'Handler': func.get('Handler', 'N/A') } # Get environment variables count if 'Environment' in func and 'Variables' in func['Environment']: function_info['EnvVarsCount'] = len(func['Environment']['Variables']) lambda_functions.append(function_info) except Exception as e: logger.error(f"Error getting Lambda functions in {region}: {e}") self.resources['Lambda'] = lambda_functions logger.info(f"Found {len(lambda_functions)} Lambda functions") def get_route53_records(self): """Get Route53 hosted zones and records""" try: route53 = self.session.client('route53') hosted_zones = [] paginator = route53.get_paginator('list_hosted_zones') for page in paginator.paginate(): for zone in page['HostedZones']: zone_info = { 'Name': zone['Name'], 'Id': zone['Id'].split('/')[-1], # Clean up ID 'ResourceRecordSetCount': zone['ResourceRecordSetCount'], 'Type': 'Route53 Hosted Zone', 'Service': 'Route53', 'AccountId': self.account_id, 'Region': 'global' } hosted_zones.append(zone_info) # Get records for each zone with pagination try: record_paginator = route53.get_paginator('list_resource_record_sets') for record_page in record_paginator.paginate(HostedZoneId=zone['Id']): for record in record_page['ResourceRecordSets']: record_info = { 'Name': record['Name'], 'Type': record['Type'], 'TTL': record.get('TTL', 'N/A'), 'HostedZone': zone['Name'], 'RecordType': 'DNS Record', 'Service': 'Route53', 'AccountId': self.account_id, 'Region': 'global' } # Add record values if 'ResourceRecords' in record: record_info['Values'] = [rr['Value'] for rr in record['ResourceRecords']] elif 'AliasTarget' in record: record_info['AliasTarget'] = record['AliasTarget']['DNSName'] hosted_zones.append(record_info) except Exception as e: logger.warning(f"Could not get records for zone {zone['Name']}: {e}") self.resources['Route53'] = hosted_zones logger.info(f"Found {len(hosted_zones)} Route53 resources") except Exception as e: logger.error(f"Error getting Route53 records: {e}") def get_ec2_instances(self): """Get all EC2 instances across regions with enhanced details""" ec2_instances = [] for region in self.regions: try: ec2 = self.session.resource('ec2', region_name=region) for instance in ec2.instances.all(): instance_info = { 'InstanceId': instance.id, 'InstanceType': instance.instance_type, 'State': instance.state['Name'], 'LaunchTime': instance.launch_time, 'Region': region, 'Type': 'EC2 Instance', 'Service': 'EC2', 'AccountId': self.account_id, 'VpcId': instance.vpc_id, 'SubnetId': instance.subnet_id, 'Platform': instance.platform or 'Linux', 'PublicIpAddress': instance.public_ip_address, 'PrivateIpAddress': instance.private_ip_address } # Get tags name = 'Unnamed' if instance.tags: for tag in instance.tags: if tag['Key'] == 'Name': name = tag['Value'] instance_info[f"Tag_{tag['Key']}"] = tag['Value'] instance_info['Name'] = name ec2_instances.append(instance_info) except Exception as e: logger.error(f"Error getting EC2 instances in {region}: {e}") self.resources['EC2'] = ec2_instances logger.info(f"Found {len(ec2_instances)} EC2 instances") def get_rds_instances(self): """Get all RDS instances across regions""" rds_instances = [] for region in self.regions: try: rds = self.session.client('rds', region_name=region) paginator = rds.get_paginator('describe_db_instances') for page in paginator.paginate(): for db in page['DBInstances']: db_info = { 'DBInstanceIdentifier': db['DBInstanceIdentifier'], 'Engine': db['Engine'], 'DBInstanceStatus': db['DBInstanceStatus'], 'AllocatedStorage': db['AllocatedStorage'], 'Region': region, 'Type': 'RDS Instance', 'Service': 'RDS', 'AccountId': self.account_id, 'DBInstanceClass': db['DBInstanceClass'], 'Endpoint': db.get('Endpoint', {}).get('Address', 'N/A') } rds_instances.append(db_info) except Exception as e: logger.error(f"Error getting RDS instances in {region}: {e}") self.resources['RDS'] = rds_instances logger.info(f"Found {len(rds_instances)} RDS instances") def get_iam_users(self): """Get IAM users with enhanced details""" try: iam = self.session.client('iam') users = [] paginator = iam.get_paginator('list_users') for page in paginator.paginate(): for user in page['Users']: user_info = { 'UserName': user['UserName'], 'UserId': user['UserId'], 'CreateDate': user['CreateDate'], 'Type': 'IAM User', 'Service': 'IAM', 'AccountId': self.account_id, 'Region': 'global' } # Get user groups try: groups = iam.list_groups_for_user(UserName=user['UserName']) user_info['Groups'] = [group['GroupName'] for group in groups['Groups']] except Exception as e: logger.debug(f"Could not get groups for user {user['UserName']}: {e}") # Get attached policies count try: attached_policies = iam.list_attached_user_policies(UserName=user['UserName']) user_info['AttachedPoliciesCount'] = len(attached_policies['AttachedPolicies']) except Exception as e: logger.debug(f"Could not get policies for user {user['UserName']}: {e}") users.append(user_info) self.resources['IAM'] = users logger.info(f"Found {len(users)} IAM users") except Exception as e: logger.error(f"Error getting IAM users: {e}") def get_cloudfront_distributions(self): """Get CloudFront distributions""" try: cloudfront = self.session.client('cloudfront') distributions = [] paginator = cloudfront.get_paginator('list_distributions') for page in paginator.paginate(): if 'Items' in page['DistributionList']: for dist in page['DistributionList']['Items']: dist_info = { 'Id': dist['Id'], 'DomainName': dist['DomainName'], 'Status': dist['Status'], 'Enabled': dist['Enabled'], 'Type': 'CloudFront Distribution', 'Service': 'CloudFront', 'AccountId': self.account_id, 'Region': 'global' } distributions.append(dist_info) self.resources['CloudFront'] = distributions logger.info(f"Found {len(distributions)} CloudFront distributions") except Exception as e: logger.error(f"Error getting CloudFront distributions: {e}") def get_dynamodb_tables(self): """Get DynamoDB tables (standard and global) with key metrics""" tables = [] for region in self.regions: try: dynamodb = self.session.client('dynamodb', region_name=region) paginator = dynamodb.get_paginator('list_tables') for page in paginator.paginate(): for table_name in page['TableNames']: try: desc = dynamodb.describe_table(TableName=table_name)['Table'] table_info = { 'Name': table_name, 'TableStatus': desc['TableStatus'], 'CreationDateTime': desc['CreationDateTime'], 'ItemCount': desc.get('ItemCount', 0), 'TableSizeBytes': desc.get('TableSizeBytes', 0), 'BillingMode': desc.get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED'), 'Region': region, 'Type': 'DynamoDB Table', 'Service': 'DynamoDB', 'AccountId': self.account_id } # Add key schema if 'KeySchema' in desc: table_info['PartitionKey'] = next( (k['AttributeName'] for k in desc['KeySchema'] if k['KeyType'] == 'HASH'), 'N/A' ) sort_key = next( (k['AttributeName'] for k in desc['KeySchema'] if k['KeyType'] == 'RANGE'), None ) table_info['SortKey'] = sort_key if sort_key else 'N/A' tables.append(table_info) except Exception as e: logger.warning(f"Could not describe DynamoDB table {table_name} in {region}: {e}") continue # Check for Global Tables (v2019.11.21+) try: global_tables = dynamodb.list_global_tables()['GlobalTables'] for gt in global_tables: tables.append({ 'Name': gt['GlobalTableName'], 'ReplicationRegions': ', '.join([r['RegionName'] for r in gt['ReplicationGroup']]), 'TableStatus': 'Global Table', 'Region': region, 'Type': 'DynamoDB Global Table', 'Service': 'DynamoDB', 'AccountId': self.account_id }) except Exception as e: if 'AccessDenied' not in str(e): logger.debug(f"Global tables not supported or error in {region}: {e}") except Exception as e: logger.error(f"Error getting DynamoDB tables in {region}: {e}") self.resources['DynamoDB'] = tables logger.info(f"Found {len(tables)} DynamoDB resources") def get_ecs_resources(self): """Get ECS clusters, services, and task definitions""" ecs_resources = [] for region in self.regions: try: ecs = self.session.client('ecs', region_name=region) # List clusters cluster_arns = [] paginator = ecs.get_paginator('list_clusters') for page in paginator.paginate(): cluster_arns.extend(page['clusterArns']) for arn in cluster_arns: try: cluster = ecs.describe_clusters(clusters=[arn])['clusters'][0] cluster_info = { 'ClusterName': cluster['clusterName'], 'ClusterArn': arn, 'Status': cluster['status'], 'RunningTasksCount': cluster.get('runningTasksCount', 0), 'PendingTasksCount': cluster.get('pendingTasksCount', 0), 'ActiveServicesCount': cluster.get('activeServicesCount', 0), 'RegisteredContainerInstancesCount': cluster.get('registeredContainerInstancesCount', 0), 'Region': region, 'Type': 'ECS Cluster', 'Service': 'ECS', 'AccountId': self.account_id } ecs_resources.append(cluster_info) # List services in cluster svc_paginator = ecs.get_paginator('list_services') for svc_page in svc_paginator.paginate(cluster=arn): for svc_arn in svc_page['serviceArns']: try: svc = ecs.describe_services(cluster=arn, services=[svc_arn])['services'][0] svc_info = { 'ServiceName': svc['serviceName'], 'Cluster': cluster['clusterName'], 'DesiredCount': svc['desiredCount'], 'RunningCount': svc['runningCount'], 'Status': svc['status'], 'LaunchType': svc.get('launchType', 'EC2'), 'Region': region, 'Type': 'ECS Service', 'Service': 'ECS', 'AccountId': self.account_id } ecs_resources.append(svc_info) except Exception as e: logger.warning(f"Could not describe ECS service {svc_arn}: {e}") except Exception as e: logger.warning(f"Could not describe ECS cluster {arn}: {e}") except Exception as e: logger.error(f"Error getting ECS resources in {region}: {e}") self.resources['ECS'] = ecs_resources logger.info(f"Found {len(ecs_resources)} ECS resources") def get_eks_clusters(self): """Get EKS clusters with Kubernetes version and status""" clusters = [] for region in self.regions: try: eks = self.session.client('eks', region_name=region) cluster_names = eks.list_clusters().get('clusters', []) for name in cluster_names: try: desc = eks.describe_cluster(name=name)['cluster'] cluster_info = { 'Name': name, 'Arn': desc['arn'], 'Status': desc['status'], 'Version': desc['version'], 'Endpoint': desc['endpoint'], 'CreatedAt': desc['createdAt'], 'PlatformVersion': desc.get('platformVersion', 'N/A'), 'Region': region, 'Type': 'EKS Cluster', 'Service': 'EKS', 'AccountId': self.account_id } clusters.append(cluster_info) except Exception as e: logger.warning(f"Could not describe EKS cluster {name} in {region}: {e}") except Exception as e: # EKS not available in all regions; suppress noise for unsupported ones if 'Unsupported' not in str(e): logger.error(f"Error getting EKS clusters in {region}: {e}") self.resources['EKS'] = clusters logger.info(f"Found {len(clusters)} EKS clusters") def get_sqs_queues(self): """Get SQS queues with attributes""" queues = [] for region in self.regions: try: sqs = self.session.client('sqs', region_name=region) paginator = sqs.get_paginator('list_queues') for page in paginator.paginate(): for url in page.get('QueueUrls', []): try: attrs = sqs.get_queue_attributes( QueueUrl=url, AttributeNames=['All'] )['Attributes'] queue_info = { 'QueueUrl': url, 'QueueName': url.split('/')[-1], 'VisibilityTimeout': attrs.get('VisibilityTimeout', 'N/A'), 'MessageRetentionPeriod': attrs.get('MessageRetentionPeriod', 'N/A'), 'DelaySeconds': attrs.get('DelaySeconds', 'N/A'), 'ApproximateNumberOfMessages': attrs.get('ApproximateNumberOfMessages', '0'), 'Region': region, 'Type': 'SQS Queue', 'Service': 'SQS', 'AccountId': self.account_id } if 'FifoQueue' in attrs and attrs['FifoQueue'] == 'true': queue_info['Type'] = 'SQS FIFO Queue' queues.append(queue_info) except Exception as e: logger.warning(f"Could not get attributes for SQS queue {url}: {e}") except Exception as e: logger.error(f"Error getting SQS queues in {region}: {e}") self.resources['SQS'] = queues logger.info(f"Found {len(queues)} SQS queues") def get_sns_topics_and_subscriptions(self): """Get SNS topics and subscriptions""" sns_resources = [] for region in self.regions: try: sns = self.session.client('sns', region_name=region) # Topics topic_paginator = sns.get_paginator('list_topics') for page in topic_paginator.paginate(): for topic in page.get('Topics', []): arn = topic['TopicArn'] topic_info = { 'TopicArn': arn, 'TopicName': arn.split(':')[-1], 'Type': 'SNS Topic', 'Service': 'SNS', 'Region': region, 'AccountId': self.account_id } sns_resources.append(topic_info) # Subscriptions sub_paginator = sns.get_paginator('list_subscriptions') for page in sub_paginator.paginate(): for sub in page.get('Subscriptions', []): sub_info = { 'SubscriptionArn': sub['SubscriptionArn'], 'TopicArn': sub['TopicArn'], 'Protocol': sub['Protocol'], 'Endpoint': sub['Endpoint'][:100], # Truncate long endpoints (e.g., HTTPS URLs) 'Status': sub.get('SubscriptionArn', '').split(':')[-1] if ':' in sub.get('SubscriptionArn', '') else 'Unknown', 'Type': 'SNS Subscription', 'Service': 'SNS', 'Region': region, 'AccountId': self.account_id } sns_resources.append(sub_info) except Exception as e: logger.error(f"Error getting SNS resources in {region}: {e}") self.resources['SNS'] = sns_resources logger.info(f"Found {len(sns_resources)} SNS resources") def get_secrets_manager_secrets(self): """Get Secrets Manager secrets (metadata only — no secret values)""" secrets = [] for region in self.regions: try: secrets_client = self.session.client('secretsmanager', region_name=region) paginator = secrets_client.get_paginator('list_secrets') for page in paginator.paginate(): for secret in page.get('SecretList', []): secret_info = { 'Name': secret['Name'], 'ARN': secret['ARN'], 'Description': secret.get('Description', 'N/A')[:100], 'LastChangedDate': secret.get('LastChangedDate'), 'RotationEnabled': secret.get('RotationEnabled', False), 'CreatedDate': secret.get('CreatedDate'), 'Region': region, 'Type': 'Secrets Manager Secret', 'Service': 'Secrets Manager', 'AccountId': self.account_id } secrets.append(secret_info) except Exception as e: logger.error(f"Error getting Secrets Manager secrets in {region}: {e}") self.resources['SecretsManager'] = secrets logger.info(f"Found {len(secrets)} Secrets Manager secrets") def get_api_gateway_resources(self): """Get API Gateway REST and HTTP APIs""" api_resources = [] for region in self.regions: try: # REST APIs apigw = self.session.client('apigateway', region_name=region) rest_paginator = apigw.get_paginator('get_rest_apis') for page in rest_paginator.paginate(): for api in page.get('items', []): api_info = { 'Name': api.get('name', 'N/A'), 'Id': api['id'], 'Description': api.get('description', 'N/A')[:100], 'CreatedDate': api.get('createdDate'), 'Version': api.get('version', 'N/A'), 'Protocol': 'REST', 'Region': region, 'Type': 'API Gateway REST API', 'Service': 'API Gateway', 'AccountId': self.account_id } api_resources.append(api_info) # HTTP APIs (v2) apigwv2 = self.session.client('apigatewayv2', region_name=region) http_paginator = apigwv2.get_paginator('get_apis') for page in http_paginator.paginate(): for api in page.get('Items', []): api_info = { 'Name': api.get('Name', 'N/A'), 'Id': api['ApiId'], 'Description': api.get('Description', 'N/A')[:100], 'CreatedDate': api.get('CreatedDate'), 'Protocol': api.get('ProtocolType', 'HTTP'), 'Region': region, 'Type': 'API Gateway HTTP API', 'Service': 'API Gateway', 'AccountId': self.account_id } api_resources.append(api_info) except Exception as e: # Some regions lack API Gateway support if 'Unsupported' not in str(e): logger.error(f"Error getting API Gateway resources in {region}: {e}") self.resources['APIGateway'] = api_resources logger.info(f"Found {len(api_resources)} API Gateway resources") def get_elastic_load_balancers(self): """Get Application, Network, and Classic Load Balancers""" lbs = [] for region in self.regions: try: # ALB & NLB (v2) elbv2 = self.session.client('elbv2', region_name=region) lb_paginator = elbv2.get_paginator('describe_load_balancers') for page in lb_paginator.paginate(): for lb in page.get('LoadBalancers', []): lb_info = { 'Name': lb['LoadBalancerName'], 'DNSName': lb['DNSName'], 'Type': lb['Type'], 'State': lb['State']['Code'], 'Scheme': lb['Scheme'], 'VpcId': lb.get('VpcId', 'N/A'), 'CreatedTime': lb['CreatedTime'], 'Region': region, 'ResourceType': 'Load Balancer', 'Service': 'ELBv2', 'AccountId': self.account_id } lbs.append(lb_info) # Classic Load Balancers elb = self.session.client('elb', region_name=region) classic_paginator = elb.get_paginator('describe_load_balancers') for page in classic_paginator.paginate(): for lb in page.get('LoadBalancerDescriptions', []): lb_info = { 'Name': lb['LoadBalancerName'], 'DNSName': lb['DNSName'], 'Type': 'classic', 'State': 'active', 'Scheme': lb.get('Scheme', 'internet-facing'), 'VpcId': lb.get('VPCId', 'N/A'), 'CreatedTime': lb.get('CreatedTime', 'N/A'), 'Region': region, 'ResourceType': 'Load Balancer', 'Service': 'ELB', 'AccountId': self.account_id } lbs.append(lb_info) except Exception as e: # Classic LB may be deprecated in newer regions if 'Unsupported' not in str(e) and 'not available' not in str(e).lower(): logger.error(f"Error getting ELB resources in {region}: {e}") self.resources['LoadBalancers'] = lbs logger.info(f"Found {len(lbs)} Load Balancers") def generate_report(self): """Generate comprehensive report""" logger.info("Starting AWS Resource Inventory...") # Collect all resources resource_methods = [ self.get_s3_buckets, self.get_lambda_functions, self.get_route53_records, self.get_ec2_instances, self.get_rds_instances, self.get_iam_users, self.get_cloudfront_distributions, self.get_dynamodb_tables, self.get_ecs_resources, self.get_eks_clusters, self.get_sqs_queues, self.get_sns_topics_and_subscriptions, self.get_secrets_manager_secrets, self.get_api_gateway_resources, self.get_elastic_load_balancers, ] for method in resource_methods: try: method() except Exception as e: logger.error(f"Error in {method.__name__}: {e}") # Generate reports self.generate_json_report() self.generate_csv_report() self.generate_summary_report() logger.info("AWS Resource Inventory completed!") def generate_json_report(self): """Generate JSON report""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"aws_inventory_{self.account_id}_{timestamp}.json" # Create reports directory if it doesn't exist os.makedirs('reports', exist_ok=True) filepath = os.path.join('reports', filename) with open(filepath, 'w') as f: json.dump(self.resources, f, indent=2, default=str) logger.info(f"JSON report saved as: {filepath}") return filepath except Exception as e: logger.error(f"Error generating JSON report: {e}") def generate_csv_report(self): """Generate CSV report""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"aws_inventory_{self.account_id}_{timestamp}.csv" # Create reports directory if it doesn't exist os.makedirs('reports', exist_ok=True) filepath = os.path.join('reports', filename) all_resources = [] for service, resources in self.resources.items(): for resource in resources: resource['Service'] = service all_resources.append(resource) if all_resources: df = pd.DataFrame(all_resources) df.to_csv(filepath, index=False) logger.info(f"CSV report saved as: {filepath}") return filepath except Exception as e: logger.error(f"Error generating CSV report: {e}") def generate_summary_report(self): """Generate summary report""" print("\n" + "="*60) print("AWS RESOURCE INVENTORY SUMMARY") print("="*60) print(f"Account ID: {self.account_id}") print(f"Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Regions Scanned: {len(self.regions)}") print("-"*60) total_resources = 0 for service, resources in self.resources.items(): count = len(resources) total_resources += count print(f"{service:.<20}: {count:>5} resources") print("-"*60) print(f"{'TOTAL':.<20}: {total_resources:>5} resources") print("="*60) def main(): """Main function""" print("AWS Resource Inventory Script - Improved Version") print("This script will inventory your AWS resources...") try: # You can specify profile and regions here inventory = AWSResourceInventory( # profile_name="your-profile-name", # Uncomment to use specific profile # regions=['us-east-1', 'us-west-2'] # Uncomment to specify regions ) inventory.generate_report() except Exception as e: logger.error(f"Script execution failed: {e}") print(f"\nError: {e}") print("Please check your AWS configuration and permissions.") if __name__ == "__main__": main()