77 أسطر
2.4 KiB
Python
77 أسطر
2.4 KiB
Python
import os
|
|
import json
|
|
import redis
|
|
from fastapi import FastAPI, HTTPException, status
|
|
from pydantic import BaseModel, Field, UrlConstraints
|
|
|
|
# --- Configuration ---
|
|
# We connect to Redis. In Docker, the hostname is usually the service name 'redis'.
|
|
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
|
|
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
|
|
QUEUE_NAME = "arabic_crawling_queue"
|
|
|
|
# --- Models ---
|
|
class SearchPayload(BaseModel):
|
|
"""
|
|
Input model matching your description.
|
|
"""
|
|
keyword: str = Field(..., description="The search keyword used")
|
|
results: list[str] = Field(..., description="List of URLs to process")
|
|
|
|
# --- App & Redis Setup ---
|
|
app = FastAPI(title="Arabic Search Ingestion API")
|
|
|
|
# Initialize Redis connection
|
|
try:
|
|
redis_client = redis.Redis(
|
|
host=REDIS_HOST,
|
|
port=REDIS_PORT,
|
|
db=0,
|
|
decode_responses=True # Automatically decode bytes to str
|
|
)
|
|
# Test connection on startup
|
|
redis_client.ping()
|
|
print(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
|
|
except Exception as e:
|
|
print(f"Could not connect to Redis: {e}")
|
|
redis_client = None
|
|
|
|
# --- Endpoints ---
|
|
|
|
@app.get("/")
|
|
def health_check():
|
|
"""Simple health check"""
|
|
return {"status": "running", "redis_connected": redis_client is not None}
|
|
|
|
@app.post("/api/ingest", status_code=status.HTTP_202_ACCEPTED)
|
|
def ingest_search_data(payload: SearchPayload):
|
|
"""
|
|
Receives keywords + 10 URLs, validates them,
|
|
and pushes them to the background queue.
|
|
"""
|
|
if not redis_client:
|
|
raise HTTPException(status_code=503, detail="Queue service unavailable")
|
|
|
|
# 1. Validate (Pydantic handles basic validation)
|
|
|
|
# 2. Prepare the job payload for the worker
|
|
# We structure it as a dictionary
|
|
job_data = {
|
|
"keyword": payload.keyword,
|
|
"urls": payload.results,
|
|
"count": len(payload.results)
|
|
}
|
|
|
|
# 3. Push to Redis Queue (LPUSH pushes to the left, so workers RPOP from right - FIFO logic)
|
|
try:
|
|
# We convert dict to JSON string
|
|
redis_client.lpush(QUEUE_NAME, json.dumps(job_data))
|
|
|
|
return {
|
|
"message": "Success. Job queued for processing.",
|
|
"keyword": payload.keyword,
|
|
"queued_urls": len(payload.results)
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to queue job: {str(e)}")
|