From 672fa6c9bee722e95b986d013bd46232c75cee06 Mon Sep 17 00:00:00 2001 From: ghaymah_dev Date: Wed, 8 Oct 2025 10:48:39 +0000 Subject: [PATCH] added retries --- main.py | 164 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 104 insertions(+), 60 deletions(-) diff --git a/main.py b/main.py index edba5d3..ddeed07 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,6 @@ +# بسم الله الرحمن الرحيم + + import os import json import redis @@ -28,6 +31,10 @@ class RedisConsumer: self.detail_view_api = os.getenv('DETAIL_VIEW_API', 'http://localhost:8000/api/detail') self.api_timeout = int(os.getenv('API_TIMEOUT', 45)) # Slightly more than 40 seconds + # Retry configuration + self.max_retries = int(os.getenv('API_MAX_RETRIES', 3)) + self.retry_delay = int(os.getenv('API_RETRY_DELAY', 2)) # seconds + # S3 configuration self.s3_access_key = os.getenv('S3_ACCESS_KEY') self.s3_secret_key = os.getenv('S3_SECRET_KEY') @@ -80,27 +87,77 @@ class RedisConsumer: return False def call_detail_view_api(self, url: str) -> Optional[Dict[str, Any]]: - """Call the detail view API with the provided URL""" + """Call the detail view API with the provided URL with retry logic""" payload = {"url": url} - try: - logger.info(f"Calling detail view API for URL: {url}") - response = requests.post( - self.detail_view_api, - json=payload, - timeout=self.api_timeout - ) - response.raise_for_status() - return response.json() - except requests.exceptions.Timeout: - logger.error(f"API request timed out for URL: {url}") - return None - except requests.exceptions.RequestException as e: - logger.error(f"API request failed for URL {url}: {str(e)}") - return None - except json.JSONDecodeError as e: - logger.error(f"Failed to parse API response for URL {url}: {str(e)}") - return None + for attempt in range(self.max_retries): + try: + logger.info(f"Calling detail view API for URL: {url} (Attempt {attempt + 1}/{self.max_retries})") + response = requests.post( + self.detail_view_api, + json=payload, + timeout=self.api_timeout + ) + response.raise_for_status() + return response.json() + + except requests.exceptions.Timeout: + logger.warning(f"API request timed out for URL: {url} (Attempt {attempt + 1}/{self.max_retries})") + if attempt < self.max_retries - 1: + logger.info(f"Retrying in {self.retry_delay} seconds...") + time.sleep(self.retry_delay) + else: + logger.error(f"All {self.max_retries} attempts failed due to timeout for URL: {url}") + return None + + except requests.exceptions.RequestException as e: + logger.warning(f"API request failed for URL {url} (Attempt {attempt + 1}/{self.max_retries}): {str(e)}") + if attempt < self.max_retries - 1: + logger.info(f"Retrying in {self.retry_delay} seconds...") + time.sleep(self.retry_delay) + else: + logger.error(f"All {self.max_retries} attempts failed for URL {url}: {str(e)}") + return None + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse API response for URL {url}: {str(e)}") + # Don't retry on JSON decode errors as they indicate malformed response + return None + + return None + + def store_in_s3(self, data: Dict[str, Any]) -> bool: + """Store the combined data in S3 with retry logic""" + # Create folder path based on today's date + today = datetime.now().strftime('%Y-%m-%d') + object_key = f"{today}/{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.json" + + for attempt in range(self.max_retries): + try: + # Convert data to JSON string + json_data = json.dumps(data, ensure_ascii=False) + + # Upload to S3 + self.s3_client.put_object( + Bucket=self.s3_bucket, + Key=object_key, + Body=json_data, + ContentType='application/json' + ) + + logger.info(f"Successfully stored data in S3: {object_key}") + return True + + except Exception as e: + logger.warning(f"Failed to store data in S3 (Attempt {attempt + 1}/{self.max_retries}): {str(e)}") + if attempt < self.max_retries - 1: + logger.info(f"Retrying in {self.retry_delay} seconds...") + time.sleep(self.retry_delay) + else: + logger.error(f"All {self.max_retries} attempts failed to store data in S3: {str(e)}") + return False + + return False def process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Process a single message from Redis""" @@ -109,7 +166,7 @@ class RedisConsumer: logger.error(f"Invalid URL in message: {message.get('link', 'Missing')}") return None - # Call the detail view API + # Call the detail view API with retries api_response = self.call_detail_view_api(message['link']) if not api_response: return None @@ -118,30 +175,6 @@ class RedisConsumer: combined_data = {**message, **api_response} return combined_data - def store_in_s3(self, data: Dict[str, Any]) -> bool: - """Store the combined data in S3""" - # Create folder path based on today's date - today = datetime.now().strftime('%Y-%m-%d') - object_key = f"{today}/{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.json" - - try: - # Convert data to JSON string - json_data = json.dumps(data, ensure_ascii=False) - - # Upload to S3 - self.s3_client.put_object( - Bucket=self.s3_bucket, - Key=object_key, - Body=json_data, - ContentType='application/json' - ) - - logger.info(f"Successfully stored data in S3: {object_key}") - return True - except Exception as e: - logger.error(f"Failed to store data in S3: {str(e)}") - return False - def run(self): """Main loop to process messages from Redis""" logger.info(f"Starting consumer for queue: {self.queue_name}") @@ -161,8 +194,10 @@ class RedisConsumer: combined_data = self.process_message(message) if combined_data: - # Store in S3 - self.store_in_s3(combined_data) + # Store in S3 with retries + success = self.store_in_s3(combined_data) + if not success: + logger.error(f"Failed to store message in S3 after {self.max_retries} attempts: {message.get('name', 'Unknown')}") else: logger.warning(f"Failed to process message: {message.get('name', 'Unknown')}") @@ -181,19 +216,28 @@ class RedisConsumer: def _reconnect_redis(self): """Reconnect to Redis in case of connection issues""" - try: - self.redis_client = redis.Redis( - host=self.redis_host, - port=self.redis_port, - db=self.redis_db, - decode_responses=True - ) - # Test the connection - self.redis_client.ping() - logger.info("Redis reconnected successfully") - except Exception as e: - logger.error(f"Failed to reconnect to Redis: {str(e)}") - time.sleep(5) # Wait before retrying + max_reconnect_attempts = 3 + reconnect_delay = 5 + + for attempt in range(max_reconnect_attempts): + try: + self.redis_client = redis.Redis( + host=self.redis_host, + port=self.redis_port, + db=self.redis_db, + password=self.redis_password, + decode_responses=True + ) + # Test the connection + self.redis_client.ping() + logger.info("Redis reconnected successfully") + return + except Exception as e: + logger.error(f"Failed to reconnect to Redis (Attempt {attempt + 1}/{max_reconnect_attempts}): {str(e)}") + if attempt < max_reconnect_attempts - 1: + time.sleep(reconnect_delay) + + logger.error(f"All {max_reconnect_attempts} reconnection attempts failed") if __name__ == "__main__": # Load environment variables from .env file if it exists @@ -208,4 +252,4 @@ if __name__ == "__main__": consumer = RedisConsumer() consumer.run() except Exception as e: - logger.error(f"Failed to initialize consumer: {str(e)}") + logger.error(f"Failed to initialize consumer: {str(e)}") \ No newline at end of file