Add main.py
هذا الالتزام موجود في:
76
main.py
Normal file
76
main.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import os
|
||||
import json
|
||||
import redis
|
||||
from fastapi import FastAPI, HTTPException, status
|
||||
from pydantic import BaseModel, Field, UrlConstraints
|
||||
|
||||
# --- Configuration ---
|
||||
# We connect to Redis. In Docker, the hostname is usually the service name 'redis'.
|
||||
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
|
||||
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
|
||||
QUEUE_NAME = "arabic_crawling_queue"
|
||||
|
||||
# --- Models ---
|
||||
class SearchPayload(BaseModel):
|
||||
"""
|
||||
Input model matching your description.
|
||||
"""
|
||||
keyword: str = Field(..., description="The search keyword used")
|
||||
results: list[str] = Field(..., description="List of URLs to process")
|
||||
|
||||
# --- App & Redis Setup ---
|
||||
app = FastAPI(title="Arabic Search Ingestion API")
|
||||
|
||||
# Initialize Redis connection
|
||||
try:
|
||||
redis_client = redis.Redis(
|
||||
host=REDIS_HOST,
|
||||
port=REDIS_PORT,
|
||||
db=0,
|
||||
decode_responses=True # Automatically decode bytes to str
|
||||
)
|
||||
# Test connection on startup
|
||||
redis_client.ping()
|
||||
print(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
|
||||
except Exception as e:
|
||||
print(f"Could not connect to Redis: {e}")
|
||||
redis_client = None
|
||||
|
||||
# --- Endpoints ---
|
||||
|
||||
@app.get("/")
|
||||
def health_check():
|
||||
"""Simple health check"""
|
||||
return {"status": "running", "redis_connected": redis_client is not None}
|
||||
|
||||
@app.post("/api/ingest", status_code=status.HTTP_202_ACCEPTED)
|
||||
def ingest_search_data(payload: SearchPayload):
|
||||
"""
|
||||
Receives keywords + 10 URLs, validates them,
|
||||
and pushes them to the background queue.
|
||||
"""
|
||||
if not redis_client:
|
||||
raise HTTPException(status_code=503, detail="Queue service unavailable")
|
||||
|
||||
# 1. Validate (Pydantic handles basic validation)
|
||||
|
||||
# 2. Prepare the job payload for the worker
|
||||
# We structure it as a dictionary
|
||||
job_data = {
|
||||
"keyword": payload.keyword,
|
||||
"urls": payload.results,
|
||||
"count": len(payload.results)
|
||||
}
|
||||
|
||||
# 3. Push to Redis Queue (LPUSH pushes to the left, so workers RPOP from right - FIFO logic)
|
||||
try:
|
||||
# We convert dict to JSON string
|
||||
redis_client.lpush(QUEUE_NAME, json.dumps(job_data))
|
||||
|
||||
return {
|
||||
"message": "Success. Job queued for processing.",
|
||||
"keyword": payload.keyword,
|
||||
"queued_urls": len(payload.results)
|
||||
}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to queue job: {str(e)}")
|
||||
المرجع في مشكلة جديدة
حظر مستخدم