From 8fefaaf713338423388cc8d9d4b37a8e9339a077 Mon Sep 17 00:00:00 2001
From: ghaymah_dev
Date: Tue, 18 Nov 2025 11:00:28 +0000
Subject: [PATCH] added 8 additional key services
---
main.py | 399 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 397 insertions(+), 2 deletions(-)
diff --git a/main.py b/main.py
index a1b5b42..9dc69cb 100644
--- a/main.py
+++ b/main.py
@@ -372,7 +372,394 @@ class AWSResourceInventory:
except Exception as e:
logger.error(f"Error getting CloudFront distributions: {e}")
-
+
+ def get_dynamodb_tables(self):
+ """Get DynamoDB tables (standard and global) with key metrics"""
+ tables = []
+
+ for region in self.regions:
+ try:
+ dynamodb = self.session.client('dynamodb', region_name=region)
+ paginator = dynamodb.get_paginator('list_tables')
+
+ for page in paginator.paginate():
+ for table_name in page['TableNames']:
+ try:
+ desc = dynamodb.describe_table(TableName=table_name)['Table']
+ table_info = {
+ 'Name': table_name,
+ 'TableStatus': desc['TableStatus'],
+ 'CreationDateTime': desc['CreationDateTime'],
+ 'ItemCount': desc.get('ItemCount', 0),
+ 'TableSizeBytes': desc.get('TableSizeBytes', 0),
+ 'BillingMode': desc.get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED'),
+ 'Region': region,
+ 'Type': 'DynamoDB Table',
+ 'Service': 'DynamoDB',
+ 'AccountId': self.account_id
+ }
+
+ # Add key schema
+ if 'KeySchema' in desc:
+ table_info['PartitionKey'] = next(
+ (k['AttributeName'] for k in desc['KeySchema'] if k['KeyType'] == 'HASH'), 'N/A'
+ )
+ sort_key = next(
+ (k['AttributeName'] for k in desc['KeySchema'] if k['KeyType'] == 'RANGE'), None
+ )
+ table_info['SortKey'] = sort_key if sort_key else 'N/A'
+
+ tables.append(table_info)
+ except Exception as e:
+ logger.warning(f"Could not describe DynamoDB table {table_name} in {region}: {e}")
+ continue
+
+ # Check for Global Tables (v2019.11.21+)
+ try:
+ global_tables = dynamodb.list_global_tables()['GlobalTables']
+ for gt in global_tables:
+ tables.append({
+ 'Name': gt['GlobalTableName'],
+ 'ReplicationRegions': ', '.join([r['RegionName'] for r in gt['ReplicationGroup']]),
+ 'TableStatus': 'Global Table',
+ 'Region': region,
+ 'Type': 'DynamoDB Global Table',
+ 'Service': 'DynamoDB',
+ 'AccountId': self.account_id
+ })
+ except Exception as e:
+ if 'AccessDenied' not in str(e):
+ logger.debug(f"Global tables not supported or error in {region}: {e}")
+
+ except Exception as e:
+ logger.error(f"Error getting DynamoDB tables in {region}: {e}")
+
+ self.resources['DynamoDB'] = tables
+ logger.info(f"Found {len(tables)} DynamoDB resources")
+
+ def get_ecs_resources(self):
+ """Get ECS clusters, services, and task definitions"""
+ ecs_resources = []
+
+ for region in self.regions:
+ try:
+ ecs = self.session.client('ecs', region_name=region)
+
+ # List clusters
+ cluster_arns = []
+ paginator = ecs.get_paginator('list_clusters')
+ for page in paginator.paginate():
+ cluster_arns.extend(page['clusterArns'])
+
+ for arn in cluster_arns:
+ try:
+ cluster = ecs.describe_clusters(clusters=[arn])['clusters'][0]
+ cluster_info = {
+ 'ClusterName': cluster['clusterName'],
+ 'ClusterArn': arn,
+ 'Status': cluster['status'],
+ 'RunningTasksCount': cluster.get('runningTasksCount', 0),
+ 'PendingTasksCount': cluster.get('pendingTasksCount', 0),
+ 'ActiveServicesCount': cluster.get('activeServicesCount', 0),
+ 'RegisteredContainerInstancesCount': cluster.get('registeredContainerInstancesCount', 0),
+ 'Region': region,
+ 'Type': 'ECS Cluster',
+ 'Service': 'ECS',
+ 'AccountId': self.account_id
+ }
+ ecs_resources.append(cluster_info)
+
+ # List services in cluster
+ svc_paginator = ecs.get_paginator('list_services')
+ for svc_page in svc_paginator.paginate(cluster=arn):
+ for svc_arn in svc_page['serviceArns']:
+ try:
+ svc = ecs.describe_services(cluster=arn, services=[svc_arn])['services'][0]
+ svc_info = {
+ 'ServiceName': svc['serviceName'],
+ 'Cluster': cluster['clusterName'],
+ 'DesiredCount': svc['desiredCount'],
+ 'RunningCount': svc['runningCount'],
+ 'Status': svc['status'],
+ 'LaunchType': svc.get('launchType', 'EC2'),
+ 'Region': region,
+ 'Type': 'ECS Service',
+ 'Service': 'ECS',
+ 'AccountId': self.account_id
+ }
+ ecs_resources.append(svc_info)
+ except Exception as e:
+ logger.warning(f"Could not describe ECS service {svc_arn}: {e}")
+ except Exception as e:
+ logger.warning(f"Could not describe ECS cluster {arn}: {e}")
+
+ except Exception as e:
+ logger.error(f"Error getting ECS resources in {region}: {e}")
+
+ self.resources['ECS'] = ecs_resources
+ logger.info(f"Found {len(ecs_resources)} ECS resources")
+
+ def get_eks_clusters(self):
+ """Get EKS clusters with Kubernetes version and status"""
+ clusters = []
+
+ for region in self.regions:
+ try:
+ eks = self.session.client('eks', region_name=region)
+ cluster_names = eks.list_clusters().get('clusters', [])
+
+ for name in cluster_names:
+ try:
+ desc = eks.describe_cluster(name=name)['cluster']
+ cluster_info = {
+ 'Name': name,
+ 'Arn': desc['arn'],
+ 'Status': desc['status'],
+ 'Version': desc['version'],
+ 'Endpoint': desc['endpoint'],
+ 'CreatedAt': desc['createdAt'],
+ 'PlatformVersion': desc.get('platformVersion', 'N/A'),
+ 'Region': region,
+ 'Type': 'EKS Cluster',
+ 'Service': 'EKS',
+ 'AccountId': self.account_id
+ }
+ clusters.append(cluster_info)
+ except Exception as e:
+ logger.warning(f"Could not describe EKS cluster {name} in {region}: {e}")
+ except Exception as e:
+ # EKS not available in all regions; suppress noise for unsupported ones
+ if 'Unsupported' not in str(e):
+ logger.error(f"Error getting EKS clusters in {region}: {e}")
+
+ self.resources['EKS'] = clusters
+ logger.info(f"Found {len(clusters)} EKS clusters")
+
+ def get_sqs_queues(self):
+ """Get SQS queues with attributes"""
+ queues = []
+
+ for region in self.regions:
+ try:
+ sqs = self.session.client('sqs', region_name=region)
+ paginator = sqs.get_paginator('list_queues')
+
+ for page in paginator.paginate():
+ for url in page.get('QueueUrls', []):
+ try:
+ attrs = sqs.get_queue_attributes(
+ QueueUrl=url,
+ AttributeNames=['All']
+ )['Attributes']
+
+ queue_info = {
+ 'QueueUrl': url,
+ 'QueueName': url.split('/')[-1],
+ 'VisibilityTimeout': attrs.get('VisibilityTimeout', 'N/A'),
+ 'MessageRetentionPeriod': attrs.get('MessageRetentionPeriod', 'N/A'),
+ 'DelaySeconds': attrs.get('DelaySeconds', 'N/A'),
+ 'ApproximateNumberOfMessages': attrs.get('ApproximateNumberOfMessages', '0'),
+ 'Region': region,
+ 'Type': 'SQS Queue',
+ 'Service': 'SQS',
+ 'AccountId': self.account_id
+ }
+
+ if 'FifoQueue' in attrs and attrs['FifoQueue'] == 'true':
+ queue_info['Type'] = 'SQS FIFO Queue'
+
+ queues.append(queue_info)
+ except Exception as e:
+ logger.warning(f"Could not get attributes for SQS queue {url}: {e}")
+ except Exception as e:
+ logger.error(f"Error getting SQS queues in {region}: {e}")
+
+ self.resources['SQS'] = queues
+ logger.info(f"Found {len(queues)} SQS queues")
+
+ def get_sns_topics_and_subscriptions(self):
+ """Get SNS topics and subscriptions"""
+ sns_resources = []
+
+ for region in self.regions:
+ try:
+ sns = self.session.client('sns', region_name=region)
+
+ # Topics
+ topic_paginator = sns.get_paginator('list_topics')
+ for page in topic_paginator.paginate():
+ for topic in page.get('Topics', []):
+ arn = topic['TopicArn']
+ topic_info = {
+ 'TopicArn': arn,
+ 'TopicName': arn.split(':')[-1],
+ 'Type': 'SNS Topic',
+ 'Service': 'SNS',
+ 'Region': region,
+ 'AccountId': self.account_id
+ }
+ sns_resources.append(topic_info)
+
+ # Subscriptions
+ sub_paginator = sns.get_paginator('list_subscriptions')
+ for page in sub_paginator.paginate():
+ for sub in page.get('Subscriptions', []):
+ sub_info = {
+ 'SubscriptionArn': sub['SubscriptionArn'],
+ 'TopicArn': sub['TopicArn'],
+ 'Protocol': sub['Protocol'],
+ 'Endpoint': sub['Endpoint'][:100], # Truncate long endpoints (e.g., HTTPS URLs)
+ 'Status': sub.get('SubscriptionArn', '').split(':')[-1] if ':' in sub.get('SubscriptionArn', '') else 'Unknown',
+ 'Type': 'SNS Subscription',
+ 'Service': 'SNS',
+ 'Region': region,
+ 'AccountId': self.account_id
+ }
+ sns_resources.append(sub_info)
+
+ except Exception as e:
+ logger.error(f"Error getting SNS resources in {region}: {e}")
+
+ self.resources['SNS'] = sns_resources
+ logger.info(f"Found {len(sns_resources)} SNS resources")
+
+ def get_secrets_manager_secrets(self):
+ """Get Secrets Manager secrets (metadata only — no secret values)"""
+ secrets = []
+
+ for region in self.regions:
+ try:
+ secrets_client = self.session.client('secretsmanager', region_name=region)
+ paginator = secrets_client.get_paginator('list_secrets')
+
+ for page in paginator.paginate():
+ for secret in page.get('SecretList', []):
+ secret_info = {
+ 'Name': secret['Name'],
+ 'ARN': secret['ARN'],
+ 'Description': secret.get('Description', 'N/A')[:100],
+ 'LastChangedDate': secret.get('LastChangedDate'),
+ 'RotationEnabled': secret.get('RotationEnabled', False),
+ 'CreatedDate': secret.get('CreatedDate'),
+ 'Region': region,
+ 'Type': 'Secrets Manager Secret',
+ 'Service': 'Secrets Manager',
+ 'AccountId': self.account_id
+ }
+ secrets.append(secret_info)
+ except Exception as e:
+ logger.error(f"Error getting Secrets Manager secrets in {region}: {e}")
+
+ self.resources['SecretsManager'] = secrets
+ logger.info(f"Found {len(secrets)} Secrets Manager secrets")
+
+ def get_api_gateway_resources(self):
+ """Get API Gateway REST and HTTP APIs"""
+ api_resources = []
+
+ for region in self.regions:
+ try:
+ # REST APIs
+ apigw = self.session.client('apigateway', region_name=region)
+ rest_paginator = apigw.get_paginator('get_rest_apis')
+ for page in rest_paginator.paginate():
+ for api in page.get('items', []):
+ api_info = {
+ 'Name': api.get('name', 'N/A'),
+ 'Id': api['id'],
+ 'Description': api.get('description', 'N/A')[:100],
+ 'CreatedDate': api.get('createdDate'),
+ 'Version': api.get('version', 'N/A'),
+ 'Protocol': 'REST',
+ 'Region': region,
+ 'Type': 'API Gateway REST API',
+ 'Service': 'API Gateway',
+ 'AccountId': self.account_id
+ }
+ api_resources.append(api_info)
+
+ # HTTP APIs (v2)
+ apigwv2 = self.session.client('apigatewayv2', region_name=region)
+ http_paginator = apigwv2.get_paginator('get_apis')
+ for page in http_paginator.paginate():
+ for api in page.get('Items', []):
+ api_info = {
+ 'Name': api.get('Name', 'N/A'),
+ 'Id': api['ApiId'],
+ 'Description': api.get('Description', 'N/A')[:100],
+ 'CreatedDate': api.get('CreatedDate'),
+ 'Protocol': api.get('ProtocolType', 'HTTP'),
+ 'Region': region,
+ 'Type': 'API Gateway HTTP API',
+ 'Service': 'API Gateway',
+ 'AccountId': self.account_id
+ }
+ api_resources.append(api_info)
+
+ except Exception as e:
+ # Some regions lack API Gateway support
+ if 'Unsupported' not in str(e):
+ logger.error(f"Error getting API Gateway resources in {region}: {e}")
+
+ self.resources['APIGateway'] = api_resources
+ logger.info(f"Found {len(api_resources)} API Gateway resources")
+
+ def get_elastic_load_balancers(self):
+ """Get Application, Network, and Classic Load Balancers"""
+ lbs = []
+
+ for region in self.regions:
+ try:
+ # ALB & NLB (v2)
+ elbv2 = self.session.client('elbv2', region_name=region)
+ lb_paginator = elbv2.get_paginator('describe_load_balancers')
+ for page in lb_paginator.paginate():
+ for lb in page.get('LoadBalancers', []):
+ lb_info = {
+ 'Name': lb['LoadBalancerName'],
+ 'DNSName': lb['DNSName'],
+ 'Type': lb['Type'],
+ 'State': lb['State']['Code'],
+ 'Scheme': lb['Scheme'],
+ 'VpcId': lb.get('VpcId', 'N/A'),
+ 'CreatedTime': lb['CreatedTime'],
+ 'Region': region,
+ 'ResourceType': 'Load Balancer',
+ 'Service': 'ELBv2',
+ 'AccountId': self.account_id
+ }
+ lbs.append(lb_info)
+
+ # Classic Load Balancers
+ elb = self.session.client('elb', region_name=region)
+ classic_paginator = elb.get_paginator('describe_load_balancers')
+ for page in classic_paginator.paginate():
+ for lb in page.get('LoadBalancerDescriptions', []):
+ lb_info = {
+ 'Name': lb['LoadBalancerName'],
+ 'DNSName': lb['DNSName'],
+ 'Type': 'classic',
+ 'State': 'active',
+ 'Scheme': lb.get('Scheme', 'internet-facing'),
+ 'VpcId': lb.get('VPCId', 'N/A'),
+ 'CreatedTime': lb.get('CreatedTime', 'N/A'),
+ 'Region': region,
+ 'ResourceType': 'Load Balancer',
+ 'Service': 'ELB',
+ 'AccountId': self.account_id
+ }
+ lbs.append(lb_info)
+
+ except Exception as e:
+ # Classic LB may be deprecated in newer regions
+ if 'Unsupported' not in str(e) and 'not available' not in str(e).lower():
+ logger.error(f"Error getting ELB resources in {region}: {e}")
+
+ self.resources['LoadBalancers'] = lbs
+ logger.info(f"Found {len(lbs)} Load Balancers")
+
+
+
def generate_report(self):
"""Generate comprehensive report"""
logger.info("Starting AWS Resource Inventory...")
@@ -385,7 +772,15 @@ class AWSResourceInventory:
self.get_ec2_instances,
self.get_rds_instances,
self.get_iam_users,
- self.get_cloudfront_distributions
+ self.get_cloudfront_distributions,
+ self.get_dynamodb_tables,
+ self.get_ecs_resources,
+ self.get_eks_clusters,
+ self.get_sqs_queues,
+ self.get_sns_topics_and_subscriptions,
+ self.get_secrets_manager_secrets,
+ self.get_api_gateway_resources,
+ self.get_elastic_load_balancers,
]
for method in resource_methods: