الملفات
queue-processing/main.py
2025-10-08 10:48:39 +00:00

255 أسطر
10 KiB
Python

# بسم الله الرحمن الرحيم
import os
import json
import redis
import boto3
import requests
import logging
from datetime import datetime
from urllib.parse import urlparse
from typing import Dict, Any, Optional
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RedisConsumer:
def __init__(self):
# Load configuration from environment variables
self.redis_host = os.getenv('REDIS_HOST', 'localhost')
self.redis_port = int(os.getenv('REDIS_PORT', 6379))
self.redis_db = int(os.getenv('REDIS_DB', 0))
self.redis_password = os.getenv('REDIS_PASSWORD', None)
self.queue_name = os.getenv('QUEUE_NAME', 'processing_queue')
self.detail_view_api = os.getenv('DETAIL_VIEW_API', 'http://localhost:8000/api/detail')
self.api_timeout = int(os.getenv('API_TIMEOUT', 45)) # Slightly more than 40 seconds
# Retry configuration
self.max_retries = int(os.getenv('API_MAX_RETRIES', 3))
self.retry_delay = int(os.getenv('API_RETRY_DELAY', 2)) # seconds
# S3 configuration
self.s3_access_key = os.getenv('S3_ACCESS_KEY')
self.s3_secret_key = os.getenv('S3_SECRET_KEY')
self.s3_endpoint = os.getenv('S3_ENDPOINT_URL')
self.s3_bucket = os.getenv('S3_BUCKET_NAME')
self.s3_region = os.getenv('S3_REGION', 'us-east-1')
# Validate required environment variables
self._validate_env_vars()
# Initialize Redis connection
self.redis_client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
password=self.redis_password,
decode_responses=True
)
# Initialize S3 client
self.s3_client = boto3.client(
's3',
aws_access_key_id=self.s3_access_key,
aws_secret_access_key=self.s3_secret_key,
endpoint_url=self.s3_endpoint,
region_name=self.s3_region
)
logger.info("Redis consumer initialized successfully")
def _validate_env_vars(self):
"""Validate that all required environment variables are set"""
required_vars = {
'S3_ACCESS_KEY': self.s3_access_key,
'S3_SECRET_KEY': self.s3_secret_key,
'S3_ENDPOINT_URL': self.s3_endpoint,
'S3_BUCKET_NAME': self.s3_bucket
}
missing_vars = [var for var, value in required_vars.items() if not value]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
def is_valid_url(self, url: str) -> bool:
"""Validate if the provided string is a valid URL"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except:
return False
def call_detail_view_api(self, url: str) -> Optional[Dict[str, Any]]:
"""Call the detail view API with the provided URL with retry logic"""
payload = {"url": url}
for attempt in range(self.max_retries):
try:
logger.info(f"Calling detail view API for URL: {url} (Attempt {attempt + 1}/{self.max_retries})")
response = requests.post(
self.detail_view_api,
json=payload,
timeout=self.api_timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.warning(f"API request timed out for URL: {url} (Attempt {attempt + 1}/{self.max_retries})")
if attempt < self.max_retries - 1:
logger.info(f"Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
else:
logger.error(f"All {self.max_retries} attempts failed due to timeout for URL: {url}")
return None
except requests.exceptions.RequestException as e:
logger.warning(f"API request failed for URL {url} (Attempt {attempt + 1}/{self.max_retries}): {str(e)}")
if attempt < self.max_retries - 1:
logger.info(f"Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
else:
logger.error(f"All {self.max_retries} attempts failed for URL {url}: {str(e)}")
return None
except json.JSONDecodeError as e:
logger.error(f"Failed to parse API response for URL {url}: {str(e)}")
# Don't retry on JSON decode errors as they indicate malformed response
return None
return None
def store_in_s3(self, data: Dict[str, Any]) -> bool:
"""Store the combined data in S3 with retry logic"""
# Create folder path based on today's date
today = datetime.now().strftime('%Y-%m-%d')
object_key = f"{today}/{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.json"
for attempt in range(self.max_retries):
try:
# Convert data to JSON string
json_data = json.dumps(data, ensure_ascii=False)
# Upload to S3
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=object_key,
Body=json_data,
ContentType='application/json'
)
logger.info(f"Successfully stored data in S3: {object_key}")
return True
except Exception as e:
logger.warning(f"Failed to store data in S3 (Attempt {attempt + 1}/{self.max_retries}): {str(e)}")
if attempt < self.max_retries - 1:
logger.info(f"Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
else:
logger.error(f"All {self.max_retries} attempts failed to store data in S3: {str(e)}")
return False
return False
def process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process a single message from Redis"""
# Validate the URL
if 'link' not in message or not self.is_valid_url(message['link']):
logger.error(f"Invalid URL in message: {message.get('link', 'Missing')}")
return None
# Call the detail view API with retries
api_response = self.call_detail_view_api(message['link'])
if not api_response:
return None
# Combine the original message with the API response
combined_data = {**message, **api_response}
return combined_data
def run(self):
"""Main loop to process messages from Redis"""
logger.info(f"Starting consumer for queue: {self.queue_name}")
while True:
try:
# Use BLPOP to block until a message is available
# This allows multiple instances to work together
_, message_json = self.redis_client.blpop(self.queue_name, timeout=30)
if message_json:
try:
message = json.loads(message_json)
logger.info(f"Processing message: {message.get('name', 'Unknown')}")
# Process the message
combined_data = self.process_message(message)
if combined_data:
# Store in S3 with retries
success = self.store_in_s3(combined_data)
if not success:
logger.error(f"Failed to store message in S3 after {self.max_retries} attempts: {message.get('name', 'Unknown')}")
else:
logger.warning(f"Failed to process message: {message.get('name', 'Unknown')}")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse message as JSON: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error processing message: {str(e)}")
except redis.exceptions.ConnectionError:
logger.error("Redis connection error. Attempting to reconnect...")
self._reconnect_redis()
except Exception as e:
logger.error(f"Unexpected error in main loop: {str(e)}")
# Add a small delay to prevent tight loops on errors
time.sleep(1)
def _reconnect_redis(self):
"""Reconnect to Redis in case of connection issues"""
max_reconnect_attempts = 3
reconnect_delay = 5
for attempt in range(max_reconnect_attempts):
try:
self.redis_client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
password=self.redis_password,
decode_responses=True
)
# Test the connection
self.redis_client.ping()
logger.info("Redis reconnected successfully")
return
except Exception as e:
logger.error(f"Failed to reconnect to Redis (Attempt {attempt + 1}/{max_reconnect_attempts}): {str(e)}")
if attempt < max_reconnect_attempts - 1:
time.sleep(reconnect_delay)
logger.error(f"All {max_reconnect_attempts} reconnection attempts failed")
if __name__ == "__main__":
# Load environment variables from .env file if it exists
try:
from dotenv import load_dotenv
load_dotenv()
logger.info("Loaded environment variables from .env file")
except ImportError:
pass # dotenv not installed
try:
consumer = RedisConsumer()
consumer.run()
except Exception as e:
logger.error(f"Failed to initialize consumer: {str(e)}")