From a7edb063e27ed64141c7856a8409c9a69bf040d2 Mon Sep 17 00:00:00 2001 From: ghaymah_dev Date: Sun, 15 Mar 2026 17:02:00 +0000 Subject: [PATCH] Update main.py --- main.py | 522 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 473 insertions(+), 49 deletions(-) diff --git a/main.py b/main.py index f5bcc50..85ce26a 100644 --- a/main.py +++ b/main.py @@ -1,14 +1,24 @@ +# main.py import os -import json -import redis -from fastapi import FastAPI, HTTPException, status -from pydantic import BaseModel, Field, UrlConstraints +import re +import psycopg2 +import psycopg2.extras +import requests +from bs4 import BeautifulSoup +from fastapi import FastAPI, HTTPException, status, BackgroundTasks +from pydantic import BaseModel, Field, HttpUrl +from urllib.parse import urlparse +import hashlib +from typing import List, Optional +from datetime import datetime +import logging # --- Configuration --- -# We connect to Redis. In Docker, the hostname is usually the service name 'redis'. -REDIS_HOST = os.getenv("REDIS_HOST", "localhost") -REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) -QUEUE_NAME = "arabic_crawling_queue" +DATABASE_URL = os.getenv("DATABASE_URL", "") + +# --- Logging Setup --- +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) # --- Models --- class SearchPayload(BaseModel): @@ -16,61 +26,475 @@ class SearchPayload(BaseModel): Input model matching your description. """ keyword: str = Field(..., description="The search keyword used") - results: list[str] = Field(..., description="List of URLs to process") + results: List[HttpUrl] = Field(..., description="List of URLs to process") -# --- App & Redis Setup --- +class ArabicContentResponse(BaseModel): + url: str + has_arabic: bool + title: Optional[str] = None + meta_description: Optional[str] = None + arabic_content_preview: Optional[str] = None + +# --- App Setup --- app = FastAPI(title="Arabic Search Ingestion API") -# Initialize Redis connection -try: - redis_client = redis.Redis( - host=REDIS_HOST, - port=REDIS_PORT, - db=0, - decode_responses=True # Automatically decode bytes to str - ) - # Test connection on startup - redis_client.ping() - print(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}") -except Exception as e: - print(f"Could not connect to Redis: {e}") - redis_client = None +# --- Database Setup --- +def get_db_connection(): + """Create a database connection""" + try: + conn = psycopg2.connect(DATABASE_URL) + return conn + except Exception as e: + logger.error(f"Database connection failed: {e}") + return None -# --- Endpoints --- +def init_database(): + """Initialize database tables if they don't exist""" + conn = get_db_connection() + if not conn: + logger.error("Cannot initialize database - no connection") + return + + try: + with conn.cursor() as cur: + # Create table for raw search data + cur.execute(""" + CREATE TABLE IF NOT EXISTS search_ingest ( + id SERIAL PRIMARY KEY, + keyword VARCHAR(500) NOT NULL, + url TEXT NOT NULL, + url_hash VARCHAR(64) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + processed BOOLEAN DEFAULT FALSE, + UNIQUE(url_hash) + ) + """) + + # Create index for faster lookups + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_search_ingest_url_hash + ON search_ingest(url_hash) + """) + + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_search_ingest_processed + ON search_ingest(processed) + """) + + # Create table for Arabic content index + cur.execute(""" + CREATE TABLE IF NOT EXISTS arabic_index ( + id SERIAL PRIMARY KEY, + url TEXT NOT NULL UNIQUE, + url_hash VARCHAR(64) NOT NULL UNIQUE, + title TEXT, + meta_description TEXT, + arabic_content TEXT, + detection_score FLOAT, + scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + source_keyword VARCHAR(500), + http_status INTEGER, + INDEX(url_hash) + ) + """) + + # Create table for processing queue + cur.execute(""" + CREATE TABLE IF NOT EXISTS processing_queue ( + id SERIAL PRIMARY KEY, + search_ingest_id INTEGER REFERENCES search_ingest(id), + url TEXT NOT NULL, + url_hash VARCHAR(64) NOT NULL, + status VARCHAR(50) DEFAULT 'pending', + attempts INTEGER DEFAULT 0, + last_attempt TIMESTAMP, + error_message TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(url_hash) + ) + """) + + conn.commit() + logger.info("Database initialized successfully") + except Exception as e: + logger.error(f"Database initialization failed: {e}") + conn.rollback() + finally: + conn.close() +# Initialize database on startup +@app.on_event("startup") +def startup_event(): + init_database() + +# --- Arabic Language Detection --- +def contains_arabic(text: str) -> bool: + """ + Check if text contains Arabic characters. + Arabic Unicode block: U+0600 to U+06FF + """ + if not text: + return False + + # Arabic Unicode range pattern + arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]') + return bool(arabic_pattern.search(text)) + +def extract_arabic_content(text: str, max_length: int = 500) -> str: + """ + Extract Arabic content preview from text + """ + if not text: + return "" + + arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]+') + matches = arabic_pattern.findall(text) + + if matches: + # Join matches and truncate + arabic_text = " ".join(matches) + return arabic_text[:max_length] + "..." if len(arabic_text) > max_length else arabic_text + + return "" + +def calculate_arabic_score(text: str) -> float: + """ + Calculate the percentage of Arabic characters in the text + """ + if not text or len(text) == 0: + return 0.0 + + arabic_chars = len(re.findall(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]', text)) + total_chars = len(text) + + return (arabic_chars / total_chars) * 100 if total_chars > 0 else 0.0 + +def get_url_hash(url: str) -> str: + """Create a unique hash for a URL""" + return hashlib.sha256(url.encode('utf-8')).hexdigest() + +# --- Scraping Function --- +def scrape_and_check_arabic(url: str, keyword: str = None) -> dict: + """ + Scrape URL and check for Arabic content + """ + result = { + "url": url, + "has_arabic": False, + "title": None, + "meta_description": None, + "arabic_content_preview": None, + "arabic_score": 0.0, + "http_status": None, + "error": None + } + + try: + # Set a user agent to avoid being blocked + headers = { + 'User-Agent': 'Mozilla/5.0 (compatible; ArabicIndexBot/1.0; +http://example.com/bot)' + } + + # Make request with timeout + response = requests.get(url, headers=headers, timeout=10, allow_redirects=True) + result["http_status"] = response.status_code + + if response.status_code == 200: + # Parse HTML + soup = BeautifulSoup(response.text, 'html.parser') + + # Extract title + title_tag = soup.find('title') + if title_tag: + result["title"] = title_tag.get_text().strip() + + # Extract meta description + meta_desc = soup.find('meta', attrs={'name': 'description'}) + if meta_desc and meta_desc.get('content'): + result["meta_description"] = meta_desc['content'].strip() + + # Extract main content (remove script, style tags) + for script in soup(["script", "style", "nav", "footer", "header"]): + script.decompose() + + # Get text content + text_content = soup.get_text(separator=' ', strip=True) + + # Check for Arabic + has_arabic_title = contains_arabic(result["title"]) + has_arabic_meta = contains_arabic(result["meta_description"]) + has_arabic_content = contains_arabic(text_content) + + result["has_arabic"] = has_arabic_title or has_arabic_meta or has_arabic_content + + if result["has_arabic"]: + # Calculate Arabic score + result["arabic_score"] = calculate_arabic_score(text_content) + + # Extract Arabic content preview + arabic_preview = extract_arabic_content(text_content) + result["arabic_content_preview"] = arabic_preview + + # Save to database + save_to_arabic_index(result, keyword) + + except requests.Timeout: + result["error"] = "Request timeout" + except requests.RequestException as e: + result["error"] = f"Request failed: {str(e)}" + except Exception as e: + result["error"] = f"Scraping failed: {str(e)}" + + return result + +def save_to_arabic_index(data: dict, keyword: str = None): + """Save Arabic content to the arabic_index table""" + conn = get_db_connection() + if not conn: + logger.error("Cannot save to arabic_index - no database connection") + return + + try: + url_hash = get_url_hash(data["url"]) + + with conn.cursor() as cur: + # Use INSERT ... ON CONFLICT to handle duplicates + cur.execute(""" + INSERT INTO arabic_index + (url, url_hash, title, meta_description, arabic_content, detection_score, source_keyword, http_status, last_updated) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP) + ON CONFLICT (url_hash) + DO UPDATE SET + last_updated = CURRENT_TIMESTAMP, + title = EXCLUDED.title, + meta_description = EXCLUDED.meta_description, + arabic_content = EXCLUDED.arabic_content, + detection_score = EXCLUDED.detection_score, + http_status = EXCLUDED.http_status, + source_keyword = EXCLUDED.source_keyword + """, ( + data["url"], + url_hash, + data["title"], + data["meta_description"], + data["arabic_content_preview"], + data["arabic_score"], + keyword, + data["http_status"] + )) + conn.commit() + logger.info(f"Saved/Updated Arabic content for URL: {data['url']}") + except Exception as e: + logger.error(f"Failed to save to arabic_index: {e}") + conn.rollback() + finally: + conn.close() + +# --- Background Processing --- +def process_urls_from_queue(): + """Process pending URLs from the queue""" + conn = get_db_connection() + if not conn: + logger.error("Cannot process queue - no database connection") + return + + try: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + # Get pending URLs (limit 10 per batch) + cur.execute(""" + SELECT pq.id, pq.url, pq.url_hash, si.keyword + FROM processing_queue pq + JOIN search_ingest si ON pq.search_ingest_id = si.id + WHERE pq.status = 'pending' AND pq.attempts < 3 + ORDER BY pq.created_at ASC + LIMIT 10 + FOR UPDATE SKIP LOCKED + """) + + pending_items = cur.fetchall() + + for item in pending_items: + # Update attempt count + cur.execute(""" + UPDATE processing_queue + SET attempts = attempts + 1, last_attempt = CURRENT_TIMESTAMP + WHERE id = %s + """, (item['id'],)) + conn.commit() + + # Process the URL + result = scrape_and_check_arabic(item['url'], item['keyword']) + + # Update processing status + if result['has_arabic']: + # Successfully processed and saved + cur.execute(""" + UPDATE processing_queue + SET status = 'completed' + WHERE id = %s + """, (item['id'],)) + + # Mark search_ingest as processed + cur.execute(""" + UPDATE search_ingest + SET processed = TRUE + WHERE url_hash = %s + """, (item['url_hash'],)) + else: + # Failed or no Arabic content + status = 'failed' if result['error'] else 'no_arabic' + error_msg = result.get('error', 'No Arabic content found') + + cur.execute(""" + UPDATE processing_queue + SET status = %s, error_message = %s + WHERE id = %s + """, (status, error_msg, item['id'])) + + conn.commit() + + except Exception as e: + logger.error(f"Queue processing failed: {e}") + conn.rollback() + finally: + conn.close() + +# --- API Endpoints --- @app.get("/") def health_check(): """Simple health check""" - return {"status": "running", "redis_connected": redis_client is not None} - -@app.post("/api/ingest", status_code=status.HTTP_202_ACCEPTED) -def ingest_search_data(payload: SearchPayload): - """ - Receives keywords + 10 URLs, validates them, - and pushes them to the background queue. - """ - if not redis_client: - raise HTTPException(status_code=503, detail="Queue service unavailable") - - # 1. Validate (Pydantic handles basic validation) + conn = get_db_connection() + db_status = conn is not None + if conn: + conn.close() - # 2. Prepare the job payload for the worker - # We structure it as a dictionary - job_data = { - "keyword": payload.keyword, - "urls": payload.results, - "count": len(payload.results) + return { + "status": "running", + "database_connected": db_status, + "service": "Arabic Search Indexer" } - # 3. Push to Redis Queue (LPUSH pushes to the left, so workers RPOP from right - FIFO logic) +@app.post("/api/ingest", status_code=status.HTTP_202_ACCEPTED) +def ingest_search_data(payload: SearchPayload, background_tasks: BackgroundTasks): + """ + Receives keywords + URLs, validates them, + and stores them in PostgreSQL for processing. + """ + conn = get_db_connection() + if not conn: + raise HTTPException(status_code=503, detail="Database service unavailable") + try: - # We convert dict to JSON string - redis_client.lpush(QUEUE_NAME, json.dumps(job_data)) + inserted_count = 0 + + with conn.cursor() as cur: + for url in payload.results: + url_str = str(url) + url_hash = get_url_hash(url_str) + + try: + # Insert into search_ingest table (ignore duplicates) + cur.execute(""" + INSERT INTO search_ingest (keyword, url, url_hash, created_at, processed) + VALUES (%s, %s, %s, CURRENT_TIMESTAMP, FALSE) + ON CONFLICT (url_hash) DO NOTHING + RETURNING id + """, (payload.keyword, url_str, url_hash)) + + result = cur.fetchone() + + if result: + # Also add to processing queue + cur.execute(""" + INSERT INTO processing_queue (search_ingest_id, url, url_hash, status) + VALUES (%s, %s, %s, 'pending') + ON CONFLICT (url_hash) DO NOTHING + """, (result[0], url_str, url_hash)) + + inserted_count += 1 + + except Exception as e: + logger.error(f"Failed to insert URL {url_str}: {e}") + continue + + conn.commit() + + # Trigger background processing + if inserted_count > 0: + background_tasks.add_task(process_urls_from_queue) return { - "message": "Success. Job queued for processing.", + "message": f"Successfully queued {inserted_count} URLs for processing", "keyword": payload.keyword, - "queued_urls": len(payload.results) + "total_received": len(payload.results), + "new_urls": inserted_count, + "duplicates_skipped": len(payload.results) - inserted_count } + except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to queue job: {str(e)}") + conn.rollback() + logger.error(f"Failed to ingest data: {e}") + raise HTTPException(status_code=500, detail=f"Failed to store data: {str(e)}") + finally: + conn.close() + +@app.post("/api/scrape-and-check") +def scrape_and_check_endpoint(url: HttpUrl): + """ + Endpoint to scrape a single URL and check for Arabic content + """ + url_str = str(url) + result = scrape_and_check_arabic(url_str) + + if result["error"]: + raise HTTPException(status_code=400, detail=result["error"]) + + return result + +@app.get("/api/arabic-index") +def get_arabic_index(limit: int = 100, offset: int = 0): + """ + Retrieve entries from the Arabic index + """ + conn = get_db_connection() + if not conn: + raise HTTPException(status_code=503, detail="Database service unavailable") + + try: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute(""" + SELECT id, url, title, meta_description, + detection_score, scraped_at, source_keyword + FROM arabic_index + ORDER BY scraped_at DESC + LIMIT %s OFFSET %s + """, (limit, offset)) + + results = [dict(row) for row in cur.fetchall()] + + # Get total count + cur.execute("SELECT COUNT(*) FROM arabic_index") + total = cur.fetchone()[0] + + return { + "total": total, + "offset": offset, + "limit": limit, + "results": results + } + except Exception as e: + logger.error(f"Failed to fetch arabic index: {e}") + raise HTTPException(status_code=500, detail="Failed to fetch data") + finally: + conn.close() + +@app.post("/api/process-queue") +def trigger_queue_processing(background_tasks: BackgroundTasks): + """ + Manually trigger queue processing + """ + background_tasks.add_task(process_urls_from_queue) + return {"message": "Queue processing triggered"} + +# --- Run with: uvicorn main:app --reload \ No newline at end of file