الملفات
aws-inventory/main.py
2025-11-18 10:55:42 +00:00

485 أسطر
20 KiB
Python

#!/usr/bin/env python3
"""
AWS Resource Inventory Script
Generates a comprehensive report of all resources in an AWS account
Improved version with error handling and additional features
"""
import boto3
import json
import pandas as pd
from datetime import datetime
import csv
import logging
from typing import List, Dict, Any
import os
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AWSResourceInventory:
def __init__(self, profile_name: str = None, regions: List[str] = None):
"""
Initialize AWS Resource Inventory
Args:
profile_name: AWS profile name to use (optional)
regions: List of regions to scan (optional)
"""
try:
if profile_name:
self.session = boto3.Session(profile_name=profile_name)
else:
self.session = boto3.Session()
self.resources = {}
self.regions = regions or self.get_all_regions('ec2')
self.account_id = self.get_account_id()
logger.info(f"Initialized for account {self.account_id}, regions: {self.regions}")
except Exception as e:
logger.error(f"Failed to initialize AWS session: {e}")
raise
def get_account_id(self) -> str:
"""Get AWS account ID"""
try:
sts = self.session.client('sts')
return sts.get_caller_identity()['Account']
except Exception as e:
logger.error(f"Error getting account ID: {e}")
return "Unknown"
def get_all_regions(self, service: str = 'ec2') -> List[str]:
"""Get all available regions for a service"""
try:
client = self.session.client(service, region_name='us-east-1')
regions = [region['RegionName'] for region in client.describe_regions()['Regions']]
logger.info(f"Found {len(regions)} regions for {service}")
return regions
except Exception as e:
logger.warning(f"Error getting regions for {service}: {e}. Using default regions.")
return ['us-east-1', 'us-west-2', 'eu-west-1']
def safe_boto_call(self, client_method, **kwargs):
"""Safe wrapper for boto3 calls with error handling"""
try:
return client_method(**kwargs)
except Exception as e:
logger.error(f"Boto3 call failed: {e}")
return None
def get_s3_buckets(self):
"""Get all S3 buckets with detailed information"""
try:
s3 = self.session.client('s3')
response = self.safe_boto_call(s3.list_buckets)
if not response:
return
buckets = []
for bucket in response['Buckets']:
bucket_info = {
'Name': bucket['Name'],
'CreationDate': bucket['CreationDate'],
'Type': 'S3 Bucket',
'Service': 'S3',
'AccountId': self.account_id
}
# Get bucket location and other details
try:
location = self.safe_boto_call(
s3.get_bucket_location,
Bucket=bucket['Name']
)
bucket_info['Region'] = location.get('LocationConstraint', 'us-east-1')
# Get bucket size and object count (requires ListBucket permission)
try:
cloudwatch = self.session.client('cloudwatch')
end_time = datetime.utcnow()
start_time = datetime.utcnow().replace(day=1) # Start of current month
# Get bucket size
size_response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket['Name']},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=start_time,
EndTime=end_time,
Period=86400,
Statistics=['Average']
)
if size_response['Datapoints']:
bucket_info['SizeBytes'] = size_response['Datapoints'][0]['Average']
except Exception as e:
logger.debug(f"Could not get bucket metrics for {bucket['Name']}: {e}")
except Exception as e:
logger.warning(f"Could not get location for bucket {bucket['Name']}: {e}")
bucket_info['Region'] = 'Unknown'
buckets.append(bucket_info)
self.resources['S3'] = buckets
logger.info(f"Found {len(buckets)} S3 buckets")
except Exception as e:
logger.error(f"Error getting S3 buckets: {e}")
def get_lambda_functions(self):
"""Get all Lambda functions across regions with enhanced details"""
lambda_functions = []
for region in self.regions:
try:
lambda_client = self.session.client('lambda', region_name=region)
paginator = lambda_client.get_paginator('list_functions')
for page in paginator.paginate():
for func in page['Functions']:
function_info = {
'Name': func['FunctionName'],
'Runtime': func.get('Runtime', 'N/A'),
'MemorySize': func['MemorySize'],
'Timeout': func['Timeout'],
'LastModified': func['LastModified'],
'Region': region,
'Type': 'Lambda Function',
'Service': 'Lambda',
'AccountId': self.account_id,
'CodeSize': func.get('CodeSize', 0),
'Handler': func.get('Handler', 'N/A')
}
# Get environment variables count
if 'Environment' in func and 'Variables' in func['Environment']:
function_info['EnvVarsCount'] = len(func['Environment']['Variables'])
lambda_functions.append(function_info)
except Exception as e:
logger.error(f"Error getting Lambda functions in {region}: {e}")
self.resources['Lambda'] = lambda_functions
logger.info(f"Found {len(lambda_functions)} Lambda functions")
def get_route53_records(self):
"""Get Route53 hosted zones and records"""
try:
route53 = self.session.client('route53')
hosted_zones = []
paginator = route53.get_paginator('list_hosted_zones')
for page in paginator.paginate():
for zone in page['HostedZones']:
zone_info = {
'Name': zone['Name'],
'Id': zone['Id'].split('/')[-1], # Clean up ID
'ResourceRecordSetCount': zone['ResourceRecordSetCount'],
'Type': 'Route53 Hosted Zone',
'Service': 'Route53',
'AccountId': self.account_id,
'Region': 'global'
}
hosted_zones.append(zone_info)
# Get records for each zone with pagination
try:
record_paginator = route53.get_paginator('list_resource_record_sets')
for record_page in record_paginator.paginate(HostedZoneId=zone['Id']):
for record in record_page['ResourceRecordSets']:
record_info = {
'Name': record['Name'],
'Type': record['Type'],
'TTL': record.get('TTL', 'N/A'),
'HostedZone': zone['Name'],
'RecordType': 'DNS Record',
'Service': 'Route53',
'AccountId': self.account_id,
'Region': 'global'
}
# Add record values
if 'ResourceRecords' in record:
record_info['Values'] = [rr['Value'] for rr in record['ResourceRecords']]
elif 'AliasTarget' in record:
record_info['AliasTarget'] = record['AliasTarget']['DNSName']
hosted_zones.append(record_info)
except Exception as e:
logger.warning(f"Could not get records for zone {zone['Name']}: {e}")
self.resources['Route53'] = hosted_zones
logger.info(f"Found {len(hosted_zones)} Route53 resources")
except Exception as e:
logger.error(f"Error getting Route53 records: {e}")
def get_ec2_instances(self):
"""Get all EC2 instances across regions with enhanced details"""
ec2_instances = []
for region in self.regions:
try:
ec2 = self.session.resource('ec2', region_name=region)
for instance in ec2.instances.all():
instance_info = {
'InstanceId': instance.id,
'InstanceType': instance.instance_type,
'State': instance.state['Name'],
'LaunchTime': instance.launch_time,
'Region': region,
'Type': 'EC2 Instance',
'Service': 'EC2',
'AccountId': self.account_id,
'VpcId': instance.vpc_id,
'SubnetId': instance.subnet_id,
'Platform': instance.platform or 'Linux',
'PublicIpAddress': instance.public_ip_address,
'PrivateIpAddress': instance.private_ip_address
}
# Get tags
name = 'Unnamed'
if instance.tags:
for tag in instance.tags:
if tag['Key'] == 'Name':
name = tag['Value']
instance_info[f"Tag_{tag['Key']}"] = tag['Value']
instance_info['Name'] = name
ec2_instances.append(instance_info)
except Exception as e:
logger.error(f"Error getting EC2 instances in {region}: {e}")
self.resources['EC2'] = ec2_instances
logger.info(f"Found {len(ec2_instances)} EC2 instances")
def get_rds_instances(self):
"""Get all RDS instances across regions"""
rds_instances = []
for region in self.regions:
try:
rds = self.session.client('rds', region_name=region)
paginator = rds.get_paginator('describe_db_instances')
for page in paginator.paginate():
for db in page['DBInstances']:
db_info = {
'DBInstanceIdentifier': db['DBInstanceIdentifier'],
'Engine': db['Engine'],
'DBInstanceStatus': db['DBInstanceStatus'],
'AllocatedStorage': db['AllocatedStorage'],
'Region': region,
'Type': 'RDS Instance',
'Service': 'RDS',
'AccountId': self.account_id,
'DBInstanceClass': db['DBInstanceClass'],
'Endpoint': db.get('Endpoint', {}).get('Address', 'N/A')
}
rds_instances.append(db_info)
except Exception as e:
logger.error(f"Error getting RDS instances in {region}: {e}")
self.resources['RDS'] = rds_instances
logger.info(f"Found {len(rds_instances)} RDS instances")
def get_iam_users(self):
"""Get IAM users with enhanced details"""
try:
iam = self.session.client('iam')
users = []
paginator = iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
user_info = {
'UserName': user['UserName'],
'UserId': user['UserId'],
'CreateDate': user['CreateDate'],
'Type': 'IAM User',
'Service': 'IAM',
'AccountId': self.account_id,
'Region': 'global'
}
# Get user groups
try:
groups = iam.list_groups_for_user(UserName=user['UserName'])
user_info['Groups'] = [group['GroupName'] for group in groups['Groups']]
except Exception as e:
logger.debug(f"Could not get groups for user {user['UserName']}: {e}")
# Get attached policies count
try:
attached_policies = iam.list_attached_user_policies(UserName=user['UserName'])
user_info['AttachedPoliciesCount'] = len(attached_policies['AttachedPolicies'])
except Exception as e:
logger.debug(f"Could not get policies for user {user['UserName']}: {e}")
users.append(user_info)
self.resources['IAM'] = users
logger.info(f"Found {len(users)} IAM users")
except Exception as e:
logger.error(f"Error getting IAM users: {e}")
def get_cloudfront_distributions(self):
"""Get CloudFront distributions"""
try:
cloudfront = self.session.client('cloudfront')
distributions = []
paginator = cloudfront.get_paginator('list_distributions')
for page in paginator.paginate():
if 'Items' in page['DistributionList']:
for dist in page['DistributionList']['Items']:
dist_info = {
'Id': dist['Id'],
'DomainName': dist['DomainName'],
'Status': dist['Status'],
'Enabled': dist['Enabled'],
'Type': 'CloudFront Distribution',
'Service': 'CloudFront',
'AccountId': self.account_id,
'Region': 'global'
}
distributions.append(dist_info)
self.resources['CloudFront'] = distributions
logger.info(f"Found {len(distributions)} CloudFront distributions")
except Exception as e:
logger.error(f"Error getting CloudFront distributions: {e}")
def generate_report(self):
"""Generate comprehensive report"""
logger.info("Starting AWS Resource Inventory...")
# Collect all resources
resource_methods = [
self.get_s3_buckets,
self.get_lambda_functions,
self.get_route53_records,
self.get_ec2_instances,
self.get_rds_instances,
self.get_iam_users,
self.get_cloudfront_distributions
]
for method in resource_methods:
try:
method()
except Exception as e:
logger.error(f"Error in {method.__name__}: {e}")
# Generate reports
self.generate_json_report()
self.generate_csv_report()
self.generate_summary_report()
logger.info("AWS Resource Inventory completed!")
def generate_json_report(self):
"""Generate JSON report"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"aws_inventory_{self.account_id}_{timestamp}.json"
# Create reports directory if it doesn't exist
os.makedirs('reports', exist_ok=True)
filepath = os.path.join('reports', filename)
with open(filepath, 'w') as f:
json.dump(self.resources, f, indent=2, default=str)
logger.info(f"JSON report saved as: {filepath}")
return filepath
except Exception as e:
logger.error(f"Error generating JSON report: {e}")
def generate_csv_report(self):
"""Generate CSV report"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"aws_inventory_{self.account_id}_{timestamp}.csv"
# Create reports directory if it doesn't exist
os.makedirs('reports', exist_ok=True)
filepath = os.path.join('reports', filename)
all_resources = []
for service, resources in self.resources.items():
for resource in resources:
resource['Service'] = service
all_resources.append(resource)
if all_resources:
df = pd.DataFrame(all_resources)
df.to_csv(filepath, index=False)
logger.info(f"CSV report saved as: {filepath}")
return filepath
except Exception as e:
logger.error(f"Error generating CSV report: {e}")
def generate_summary_report(self):
"""Generate summary report"""
print("\n" + "="*60)
print("AWS RESOURCE INVENTORY SUMMARY")
print("="*60)
print(f"Account ID: {self.account_id}")
print(f"Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Regions Scanned: {len(self.regions)}")
print("-"*60)
total_resources = 0
for service, resources in self.resources.items():
count = len(resources)
total_resources += count
print(f"{service:.<20}: {count:>5} resources")
print("-"*60)
print(f"{'TOTAL':.<20}: {total_resources:>5} resources")
print("="*60)
def main():
"""Main function"""
print("AWS Resource Inventory Script - Improved Version")
print("This script will inventory your AWS resources...")
try:
# You can specify profile and regions here
inventory = AWSResourceInventory(
# profile_name="your-profile-name", # Uncomment to use specific profile
# regions=['us-east-1', 'us-west-2'] # Uncomment to specify regions
)
inventory.generate_report()
except Exception as e:
logger.error(f"Script execution failed: {e}")
print(f"\nError: {e}")
print("Please check your AWS configuration and permissions.")
if __name__ == "__main__":
main()