هذا الالتزام موجود في:
2025-10-08 10:48:39 +00:00
الأصل 9b1538d0cf
التزام 672fa6c9be

164
main.py
عرض الملف

@@ -1,3 +1,6 @@
# بسم الله الرحمن الرحيم
import os import os
import json import json
import redis import redis
@@ -28,6 +31,10 @@ class RedisConsumer:
self.detail_view_api = os.getenv('DETAIL_VIEW_API', 'http://localhost:8000/api/detail') self.detail_view_api = os.getenv('DETAIL_VIEW_API', 'http://localhost:8000/api/detail')
self.api_timeout = int(os.getenv('API_TIMEOUT', 45)) # Slightly more than 40 seconds self.api_timeout = int(os.getenv('API_TIMEOUT', 45)) # Slightly more than 40 seconds
# Retry configuration
self.max_retries = int(os.getenv('API_MAX_RETRIES', 3))
self.retry_delay = int(os.getenv('API_RETRY_DELAY', 2)) # seconds
# S3 configuration # S3 configuration
self.s3_access_key = os.getenv('S3_ACCESS_KEY') self.s3_access_key = os.getenv('S3_ACCESS_KEY')
self.s3_secret_key = os.getenv('S3_SECRET_KEY') self.s3_secret_key = os.getenv('S3_SECRET_KEY')
@@ -80,27 +87,77 @@ class RedisConsumer:
return False return False
def call_detail_view_api(self, url: str) -> Optional[Dict[str, Any]]: def call_detail_view_api(self, url: str) -> Optional[Dict[str, Any]]:
"""Call the detail view API with the provided URL""" """Call the detail view API with the provided URL with retry logic"""
payload = {"url": url} payload = {"url": url}
try: for attempt in range(self.max_retries):
logger.info(f"Calling detail view API for URL: {url}") try:
response = requests.post( logger.info(f"Calling detail view API for URL: {url} (Attempt {attempt + 1}/{self.max_retries})")
self.detail_view_api, response = requests.post(
json=payload, self.detail_view_api,
timeout=self.api_timeout json=payload,
) timeout=self.api_timeout
response.raise_for_status() )
return response.json() response.raise_for_status()
except requests.exceptions.Timeout: return response.json()
logger.error(f"API request timed out for URL: {url}")
return None except requests.exceptions.Timeout:
except requests.exceptions.RequestException as e: logger.warning(f"API request timed out for URL: {url} (Attempt {attempt + 1}/{self.max_retries})")
logger.error(f"API request failed for URL {url}: {str(e)}") if attempt < self.max_retries - 1:
return None logger.info(f"Retrying in {self.retry_delay} seconds...")
except json.JSONDecodeError as e: time.sleep(self.retry_delay)
logger.error(f"Failed to parse API response for URL {url}: {str(e)}") else:
return None logger.error(f"All {self.max_retries} attempts failed due to timeout for URL: {url}")
return None
except requests.exceptions.RequestException as e:
logger.warning(f"API request failed for URL {url} (Attempt {attempt + 1}/{self.max_retries}): {str(e)}")
if attempt < self.max_retries - 1:
logger.info(f"Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
else:
logger.error(f"All {self.max_retries} attempts failed for URL {url}: {str(e)}")
return None
except json.JSONDecodeError as e:
logger.error(f"Failed to parse API response for URL {url}: {str(e)}")
# Don't retry on JSON decode errors as they indicate malformed response
return None
return None
def store_in_s3(self, data: Dict[str, Any]) -> bool:
"""Store the combined data in S3 with retry logic"""
# Create folder path based on today's date
today = datetime.now().strftime('%Y-%m-%d')
object_key = f"{today}/{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.json"
for attempt in range(self.max_retries):
try:
# Convert data to JSON string
json_data = json.dumps(data, ensure_ascii=False)
# Upload to S3
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=object_key,
Body=json_data,
ContentType='application/json'
)
logger.info(f"Successfully stored data in S3: {object_key}")
return True
except Exception as e:
logger.warning(f"Failed to store data in S3 (Attempt {attempt + 1}/{self.max_retries}): {str(e)}")
if attempt < self.max_retries - 1:
logger.info(f"Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
else:
logger.error(f"All {self.max_retries} attempts failed to store data in S3: {str(e)}")
return False
return False
def process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: def process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process a single message from Redis""" """Process a single message from Redis"""
@@ -109,7 +166,7 @@ class RedisConsumer:
logger.error(f"Invalid URL in message: {message.get('link', 'Missing')}") logger.error(f"Invalid URL in message: {message.get('link', 'Missing')}")
return None return None
# Call the detail view API # Call the detail view API with retries
api_response = self.call_detail_view_api(message['link']) api_response = self.call_detail_view_api(message['link'])
if not api_response: if not api_response:
return None return None
@@ -118,30 +175,6 @@ class RedisConsumer:
combined_data = {**message, **api_response} combined_data = {**message, **api_response}
return combined_data return combined_data
def store_in_s3(self, data: Dict[str, Any]) -> bool:
"""Store the combined data in S3"""
# Create folder path based on today's date
today = datetime.now().strftime('%Y-%m-%d')
object_key = f"{today}/{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.json"
try:
# Convert data to JSON string
json_data = json.dumps(data, ensure_ascii=False)
# Upload to S3
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=object_key,
Body=json_data,
ContentType='application/json'
)
logger.info(f"Successfully stored data in S3: {object_key}")
return True
except Exception as e:
logger.error(f"Failed to store data in S3: {str(e)}")
return False
def run(self): def run(self):
"""Main loop to process messages from Redis""" """Main loop to process messages from Redis"""
logger.info(f"Starting consumer for queue: {self.queue_name}") logger.info(f"Starting consumer for queue: {self.queue_name}")
@@ -161,8 +194,10 @@ class RedisConsumer:
combined_data = self.process_message(message) combined_data = self.process_message(message)
if combined_data: if combined_data:
# Store in S3 # Store in S3 with retries
self.store_in_s3(combined_data) success = self.store_in_s3(combined_data)
if not success:
logger.error(f"Failed to store message in S3 after {self.max_retries} attempts: {message.get('name', 'Unknown')}")
else: else:
logger.warning(f"Failed to process message: {message.get('name', 'Unknown')}") logger.warning(f"Failed to process message: {message.get('name', 'Unknown')}")
@@ -181,19 +216,28 @@ class RedisConsumer:
def _reconnect_redis(self): def _reconnect_redis(self):
"""Reconnect to Redis in case of connection issues""" """Reconnect to Redis in case of connection issues"""
try: max_reconnect_attempts = 3
self.redis_client = redis.Redis( reconnect_delay = 5
host=self.redis_host,
port=self.redis_port, for attempt in range(max_reconnect_attempts):
db=self.redis_db, try:
decode_responses=True self.redis_client = redis.Redis(
) host=self.redis_host,
# Test the connection port=self.redis_port,
self.redis_client.ping() db=self.redis_db,
logger.info("Redis reconnected successfully") password=self.redis_password,
except Exception as e: decode_responses=True
logger.error(f"Failed to reconnect to Redis: {str(e)}") )
time.sleep(5) # Wait before retrying # Test the connection
self.redis_client.ping()
logger.info("Redis reconnected successfully")
return
except Exception as e:
logger.error(f"Failed to reconnect to Redis (Attempt {attempt + 1}/{max_reconnect_attempts}): {str(e)}")
if attempt < max_reconnect_attempts - 1:
time.sleep(reconnect_delay)
logger.error(f"All {max_reconnect_attempts} reconnection attempts failed")
if __name__ == "__main__": if __name__ == "__main__":
# Load environment variables from .env file if it exists # Load environment variables from .env file if it exists
@@ -208,4 +252,4 @@ if __name__ == "__main__":
consumer = RedisConsumer() consumer = RedisConsumer()
consumer.run() consumer.run()
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize consumer: {str(e)}") logger.error(f"Failed to initialize consumer: {str(e)}")