added 8 additional key services

هذا الالتزام موجود في:
2025-11-18 11:00:28 +00:00
الأصل 336765bbed
التزام 8fefaaf713

397
main.py
عرض الملف

@@ -373,6 +373,393 @@ class AWSResourceInventory:
except Exception as e:
logger.error(f"Error getting CloudFront distributions: {e}")
def get_dynamodb_tables(self):
"""Get DynamoDB tables (standard and global) with key metrics"""
tables = []
for region in self.regions:
try:
dynamodb = self.session.client('dynamodb', region_name=region)
paginator = dynamodb.get_paginator('list_tables')
for page in paginator.paginate():
for table_name in page['TableNames']:
try:
desc = dynamodb.describe_table(TableName=table_name)['Table']
table_info = {
'Name': table_name,
'TableStatus': desc['TableStatus'],
'CreationDateTime': desc['CreationDateTime'],
'ItemCount': desc.get('ItemCount', 0),
'TableSizeBytes': desc.get('TableSizeBytes', 0),
'BillingMode': desc.get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED'),
'Region': region,
'Type': 'DynamoDB Table',
'Service': 'DynamoDB',
'AccountId': self.account_id
}
# Add key schema
if 'KeySchema' in desc:
table_info['PartitionKey'] = next(
(k['AttributeName'] for k in desc['KeySchema'] if k['KeyType'] == 'HASH'), 'N/A'
)
sort_key = next(
(k['AttributeName'] for k in desc['KeySchema'] if k['KeyType'] == 'RANGE'), None
)
table_info['SortKey'] = sort_key if sort_key else 'N/A'
tables.append(table_info)
except Exception as e:
logger.warning(f"Could not describe DynamoDB table {table_name} in {region}: {e}")
continue
# Check for Global Tables (v2019.11.21+)
try:
global_tables = dynamodb.list_global_tables()['GlobalTables']
for gt in global_tables:
tables.append({
'Name': gt['GlobalTableName'],
'ReplicationRegions': ', '.join([r['RegionName'] for r in gt['ReplicationGroup']]),
'TableStatus': 'Global Table',
'Region': region,
'Type': 'DynamoDB Global Table',
'Service': 'DynamoDB',
'AccountId': self.account_id
})
except Exception as e:
if 'AccessDenied' not in str(e):
logger.debug(f"Global tables not supported or error in {region}: {e}")
except Exception as e:
logger.error(f"Error getting DynamoDB tables in {region}: {e}")
self.resources['DynamoDB'] = tables
logger.info(f"Found {len(tables)} DynamoDB resources")
def get_ecs_resources(self):
"""Get ECS clusters, services, and task definitions"""
ecs_resources = []
for region in self.regions:
try:
ecs = self.session.client('ecs', region_name=region)
# List clusters
cluster_arns = []
paginator = ecs.get_paginator('list_clusters')
for page in paginator.paginate():
cluster_arns.extend(page['clusterArns'])
for arn in cluster_arns:
try:
cluster = ecs.describe_clusters(clusters=[arn])['clusters'][0]
cluster_info = {
'ClusterName': cluster['clusterName'],
'ClusterArn': arn,
'Status': cluster['status'],
'RunningTasksCount': cluster.get('runningTasksCount', 0),
'PendingTasksCount': cluster.get('pendingTasksCount', 0),
'ActiveServicesCount': cluster.get('activeServicesCount', 0),
'RegisteredContainerInstancesCount': cluster.get('registeredContainerInstancesCount', 0),
'Region': region,
'Type': 'ECS Cluster',
'Service': 'ECS',
'AccountId': self.account_id
}
ecs_resources.append(cluster_info)
# List services in cluster
svc_paginator = ecs.get_paginator('list_services')
for svc_page in svc_paginator.paginate(cluster=arn):
for svc_arn in svc_page['serviceArns']:
try:
svc = ecs.describe_services(cluster=arn, services=[svc_arn])['services'][0]
svc_info = {
'ServiceName': svc['serviceName'],
'Cluster': cluster['clusterName'],
'DesiredCount': svc['desiredCount'],
'RunningCount': svc['runningCount'],
'Status': svc['status'],
'LaunchType': svc.get('launchType', 'EC2'),
'Region': region,
'Type': 'ECS Service',
'Service': 'ECS',
'AccountId': self.account_id
}
ecs_resources.append(svc_info)
except Exception as e:
logger.warning(f"Could not describe ECS service {svc_arn}: {e}")
except Exception as e:
logger.warning(f"Could not describe ECS cluster {arn}: {e}")
except Exception as e:
logger.error(f"Error getting ECS resources in {region}: {e}")
self.resources['ECS'] = ecs_resources
logger.info(f"Found {len(ecs_resources)} ECS resources")
def get_eks_clusters(self):
"""Get EKS clusters with Kubernetes version and status"""
clusters = []
for region in self.regions:
try:
eks = self.session.client('eks', region_name=region)
cluster_names = eks.list_clusters().get('clusters', [])
for name in cluster_names:
try:
desc = eks.describe_cluster(name=name)['cluster']
cluster_info = {
'Name': name,
'Arn': desc['arn'],
'Status': desc['status'],
'Version': desc['version'],
'Endpoint': desc['endpoint'],
'CreatedAt': desc['createdAt'],
'PlatformVersion': desc.get('platformVersion', 'N/A'),
'Region': region,
'Type': 'EKS Cluster',
'Service': 'EKS',
'AccountId': self.account_id
}
clusters.append(cluster_info)
except Exception as e:
logger.warning(f"Could not describe EKS cluster {name} in {region}: {e}")
except Exception as e:
# EKS not available in all regions; suppress noise for unsupported ones
if 'Unsupported' not in str(e):
logger.error(f"Error getting EKS clusters in {region}: {e}")
self.resources['EKS'] = clusters
logger.info(f"Found {len(clusters)} EKS clusters")
def get_sqs_queues(self):
"""Get SQS queues with attributes"""
queues = []
for region in self.regions:
try:
sqs = self.session.client('sqs', region_name=region)
paginator = sqs.get_paginator('list_queues')
for page in paginator.paginate():
for url in page.get('QueueUrls', []):
try:
attrs = sqs.get_queue_attributes(
QueueUrl=url,
AttributeNames=['All']
)['Attributes']
queue_info = {
'QueueUrl': url,
'QueueName': url.split('/')[-1],
'VisibilityTimeout': attrs.get('VisibilityTimeout', 'N/A'),
'MessageRetentionPeriod': attrs.get('MessageRetentionPeriod', 'N/A'),
'DelaySeconds': attrs.get('DelaySeconds', 'N/A'),
'ApproximateNumberOfMessages': attrs.get('ApproximateNumberOfMessages', '0'),
'Region': region,
'Type': 'SQS Queue',
'Service': 'SQS',
'AccountId': self.account_id
}
if 'FifoQueue' in attrs and attrs['FifoQueue'] == 'true':
queue_info['Type'] = 'SQS FIFO Queue'
queues.append(queue_info)
except Exception as e:
logger.warning(f"Could not get attributes for SQS queue {url}: {e}")
except Exception as e:
logger.error(f"Error getting SQS queues in {region}: {e}")
self.resources['SQS'] = queues
logger.info(f"Found {len(queues)} SQS queues")
def get_sns_topics_and_subscriptions(self):
"""Get SNS topics and subscriptions"""
sns_resources = []
for region in self.regions:
try:
sns = self.session.client('sns', region_name=region)
# Topics
topic_paginator = sns.get_paginator('list_topics')
for page in topic_paginator.paginate():
for topic in page.get('Topics', []):
arn = topic['TopicArn']
topic_info = {
'TopicArn': arn,
'TopicName': arn.split(':')[-1],
'Type': 'SNS Topic',
'Service': 'SNS',
'Region': region,
'AccountId': self.account_id
}
sns_resources.append(topic_info)
# Subscriptions
sub_paginator = sns.get_paginator('list_subscriptions')
for page in sub_paginator.paginate():
for sub in page.get('Subscriptions', []):
sub_info = {
'SubscriptionArn': sub['SubscriptionArn'],
'TopicArn': sub['TopicArn'],
'Protocol': sub['Protocol'],
'Endpoint': sub['Endpoint'][:100], # Truncate long endpoints (e.g., HTTPS URLs)
'Status': sub.get('SubscriptionArn', '').split(':')[-1] if ':' in sub.get('SubscriptionArn', '') else 'Unknown',
'Type': 'SNS Subscription',
'Service': 'SNS',
'Region': region,
'AccountId': self.account_id
}
sns_resources.append(sub_info)
except Exception as e:
logger.error(f"Error getting SNS resources in {region}: {e}")
self.resources['SNS'] = sns_resources
logger.info(f"Found {len(sns_resources)} SNS resources")
def get_secrets_manager_secrets(self):
"""Get Secrets Manager secrets (metadata only — no secret values)"""
secrets = []
for region in self.regions:
try:
secrets_client = self.session.client('secretsmanager', region_name=region)
paginator = secrets_client.get_paginator('list_secrets')
for page in paginator.paginate():
for secret in page.get('SecretList', []):
secret_info = {
'Name': secret['Name'],
'ARN': secret['ARN'],
'Description': secret.get('Description', 'N/A')[:100],
'LastChangedDate': secret.get('LastChangedDate'),
'RotationEnabled': secret.get('RotationEnabled', False),
'CreatedDate': secret.get('CreatedDate'),
'Region': region,
'Type': 'Secrets Manager Secret',
'Service': 'Secrets Manager',
'AccountId': self.account_id
}
secrets.append(secret_info)
except Exception as e:
logger.error(f"Error getting Secrets Manager secrets in {region}: {e}")
self.resources['SecretsManager'] = secrets
logger.info(f"Found {len(secrets)} Secrets Manager secrets")
def get_api_gateway_resources(self):
"""Get API Gateway REST and HTTP APIs"""
api_resources = []
for region in self.regions:
try:
# REST APIs
apigw = self.session.client('apigateway', region_name=region)
rest_paginator = apigw.get_paginator('get_rest_apis')
for page in rest_paginator.paginate():
for api in page.get('items', []):
api_info = {
'Name': api.get('name', 'N/A'),
'Id': api['id'],
'Description': api.get('description', 'N/A')[:100],
'CreatedDate': api.get('createdDate'),
'Version': api.get('version', 'N/A'),
'Protocol': 'REST',
'Region': region,
'Type': 'API Gateway REST API',
'Service': 'API Gateway',
'AccountId': self.account_id
}
api_resources.append(api_info)
# HTTP APIs (v2)
apigwv2 = self.session.client('apigatewayv2', region_name=region)
http_paginator = apigwv2.get_paginator('get_apis')
for page in http_paginator.paginate():
for api in page.get('Items', []):
api_info = {
'Name': api.get('Name', 'N/A'),
'Id': api['ApiId'],
'Description': api.get('Description', 'N/A')[:100],
'CreatedDate': api.get('CreatedDate'),
'Protocol': api.get('ProtocolType', 'HTTP'),
'Region': region,
'Type': 'API Gateway HTTP API',
'Service': 'API Gateway',
'AccountId': self.account_id
}
api_resources.append(api_info)
except Exception as e:
# Some regions lack API Gateway support
if 'Unsupported' not in str(e):
logger.error(f"Error getting API Gateway resources in {region}: {e}")
self.resources['APIGateway'] = api_resources
logger.info(f"Found {len(api_resources)} API Gateway resources")
def get_elastic_load_balancers(self):
"""Get Application, Network, and Classic Load Balancers"""
lbs = []
for region in self.regions:
try:
# ALB & NLB (v2)
elbv2 = self.session.client('elbv2', region_name=region)
lb_paginator = elbv2.get_paginator('describe_load_balancers')
for page in lb_paginator.paginate():
for lb in page.get('LoadBalancers', []):
lb_info = {
'Name': lb['LoadBalancerName'],
'DNSName': lb['DNSName'],
'Type': lb['Type'],
'State': lb['State']['Code'],
'Scheme': lb['Scheme'],
'VpcId': lb.get('VpcId', 'N/A'),
'CreatedTime': lb['CreatedTime'],
'Region': region,
'ResourceType': 'Load Balancer',
'Service': 'ELBv2',
'AccountId': self.account_id
}
lbs.append(lb_info)
# Classic Load Balancers
elb = self.session.client('elb', region_name=region)
classic_paginator = elb.get_paginator('describe_load_balancers')
for page in classic_paginator.paginate():
for lb in page.get('LoadBalancerDescriptions', []):
lb_info = {
'Name': lb['LoadBalancerName'],
'DNSName': lb['DNSName'],
'Type': 'classic',
'State': 'active',
'Scheme': lb.get('Scheme', 'internet-facing'),
'VpcId': lb.get('VPCId', 'N/A'),
'CreatedTime': lb.get('CreatedTime', 'N/A'),
'Region': region,
'ResourceType': 'Load Balancer',
'Service': 'ELB',
'AccountId': self.account_id
}
lbs.append(lb_info)
except Exception as e:
# Classic LB may be deprecated in newer regions
if 'Unsupported' not in str(e) and 'not available' not in str(e).lower():
logger.error(f"Error getting ELB resources in {region}: {e}")
self.resources['LoadBalancers'] = lbs
logger.info(f"Found {len(lbs)} Load Balancers")
def generate_report(self):
"""Generate comprehensive report"""
logger.info("Starting AWS Resource Inventory...")
@@ -385,7 +772,15 @@ class AWSResourceInventory:
self.get_ec2_instances,
self.get_rds_instances,
self.get_iam_users,
self.get_cloudfront_distributions
self.get_cloudfront_distributions,
self.get_dynamodb_tables,
self.get_ecs_resources,
self.get_eks_clusters,
self.get_sqs_queues,
self.get_sns_topics_and_subscriptions,
self.get_secrets_manager_secrets,
self.get_api_gateway_resources,
self.get_elastic_load_balancers,
]
for method in resource_methods: