# main.py import os import re import psycopg2 import psycopg2.extras import requests from bs4 import BeautifulSoup from fastapi import FastAPI, HTTPException, status, BackgroundTasks from pydantic import BaseModel, Field, HttpUrl from urllib.parse import urlparse import hashlib from typing import List, Optional from datetime import datetime import logging # --- Configuration --- DATABASE_URL = os.getenv("DATABASE_URL") # --- Logging Setup --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Models --- class SearchPayload(BaseModel): """ Input model matching your description. """ keyword: str = Field(..., description="The search keyword used") results: List[HttpUrl] = Field(..., description="List of URLs to process") class ArabicContentResponse(BaseModel): url: str has_arabic: bool title: Optional[str] = None meta_description: Optional[str] = None arabic_content_preview: Optional[str] = None # --- App Setup --- app = FastAPI(title="Arabic Search Ingestion API") # --- Database Setup --- def get_db_connection(): """Create a database connection""" try: conn = psycopg2.connect(DATABASE_URL) return conn except Exception as e: logger.error(f"Database connection failed: {e}") return None def init_database(): """Initialize database tables if they don't exist""" conn = get_db_connection() if not conn: logger.error("Cannot initialize database - no connection") return try: with conn.cursor() as cur: # Create table for raw search data cur.execute(""" CREATE TABLE IF NOT EXISTS search_ingest ( id SERIAL PRIMARY KEY, keyword VARCHAR(500) NOT NULL, url TEXT NOT NULL, url_hash VARCHAR(64) NOT NULL UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, processed BOOLEAN DEFAULT FALSE ) """) # Create indexes for faster lookups cur.execute(""" CREATE INDEX IF NOT EXISTS idx_search_ingest_url_hash ON search_ingest(url_hash) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_search_ingest_processed ON search_ingest(processed) """) # Create table for Arabic content index cur.execute(""" CREATE TABLE IF NOT EXISTS arabic_index ( id SERIAL PRIMARY KEY, url TEXT NOT NULL UNIQUE, url_hash VARCHAR(64) NOT NULL UNIQUE, title TEXT, meta_description TEXT, arabic_content TEXT, detection_score FLOAT, scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, source_keyword VARCHAR(500), http_status INTEGER ) """) # Create indexes for arabic_index cur.execute(""" CREATE INDEX IF NOT EXISTS idx_arabic_index_url_hash ON arabic_index(url_hash) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_arabic_index_scraped_at ON arabic_index(scraped_at) """) # Create table for processing queue cur.execute(""" CREATE TABLE IF NOT EXISTS processing_queue ( id SERIAL PRIMARY KEY, search_ingest_id INTEGER REFERENCES search_ingest(id) ON DELETE CASCADE, url TEXT NOT NULL, url_hash VARCHAR(64) NOT NULL UNIQUE, status VARCHAR(50) DEFAULT 'pending', attempts INTEGER DEFAULT 0, last_attempt TIMESTAMP, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create indexes for processing_queue cur.execute(""" CREATE INDEX IF NOT EXISTS idx_processing_queue_status ON processing_queue(status) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_processing_queue_url_hash ON processing_queue(url_hash) """) conn.commit() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Database initialization failed: {e}") conn.rollback() finally: conn.close() # Initialize database on startup @app.on_event("startup") def startup_event(): init_database() # --- Arabic Language Detection --- def contains_arabic(text: str) -> bool: """ Check if text contains Arabic characters. Arabic Unicode block: U+0600 to U+06FF """ if not text or not isinstance(text, str): return False # Arabic Unicode range pattern arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]') return bool(arabic_pattern.search(text)) def extract_arabic_content(text: str, max_length: int = 500) -> str: """ Extract Arabic content preview from text """ if not text or not isinstance(text, str): return "" arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]+') matches = arabic_pattern.findall(text) if matches: # Join matches and truncate arabic_text = " ".join(matches) return arabic_text[:max_length] + "..." if len(arabic_text) > max_length else arabic_text return "" def calculate_arabic_score(text: str) -> float: """ Calculate the percentage of Arabic characters in the text """ if not text or not isinstance(text, str) or len(text) == 0: return 0.0 arabic_chars = len(re.findall(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]', text)) total_chars = len(text) return (arabic_chars / total_chars) * 100 if total_chars > 0 else 0.0 def get_url_hash(url: str) -> str: """Create a unique hash for a URL""" return hashlib.sha256(url.encode('utf-8')).hexdigest() # --- Scraping Function --- def scrape_and_check_arabic(url: str, keyword: str = None) -> dict: """ Scrape URL and check for Arabic content """ result = { "url": url, "has_arabic": False, "title": None, "meta_description": None, "arabic_content_preview": None, "arabic_score": 0.0, "http_status": None, "error": None } try: # Set a user agent to avoid being blocked headers = { 'User-Agent': 'Mozilla/5.0 (compatible; ArabicIndexBot/1.0; +http://example.com/bot)' } # Make request with timeout response = requests.get(url, headers=headers, timeout=10, allow_redirects=True) result["http_status"] = response.status_code if response.status_code == 200: # Parse HTML soup = BeautifulSoup(response.text, 'html.parser') # Extract title title_tag = soup.find('title') if title_tag: result["title"] = title_tag.get_text().strip() # Extract meta description meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc and meta_desc.get('content'): result["meta_description"] = meta_desc['content'].strip() # Extract main content (remove script, style tags) for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # Get text content text_content = soup.get_text(separator=' ', strip=True) # Check for Arabic has_arabic_title = contains_arabic(result["title"]) has_arabic_meta = contains_arabic(result["meta_description"]) has_arabic_content = contains_arabic(text_content) result["has_arabic"] = has_arabic_title or has_arabic_meta or has_arabic_content if result["has_arabic"]: # Calculate Arabic score result["arabic_score"] = calculate_arabic_score(text_content) # Extract Arabic content preview arabic_preview = extract_arabic_content(text_content) result["arabic_content_preview"] = arabic_preview # Save to database save_to_arabic_index(result, keyword) except requests.Timeout: result["error"] = "Request timeout" except requests.RequestException as e: result["error"] = f"Request failed: {str(e)}" except Exception as e: result["error"] = f"Scraping failed: {str(e)}" return result def save_to_arabic_index(data: dict, keyword: str = None): """Save Arabic content to the arabic_index table""" conn = get_db_connection() if not conn: logger.error("Cannot save to arabic_index - no database connection") return try: url_hash = get_url_hash(data["url"]) with conn.cursor() as cur: # Use INSERT ... ON CONFLICT to handle duplicates cur.execute(""" INSERT INTO arabic_index (url, url_hash, title, meta_description, arabic_content, detection_score, source_keyword, http_status, last_updated) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP) ON CONFLICT (url_hash) DO UPDATE SET last_updated = CURRENT_TIMESTAMP, title = EXCLUDED.title, meta_description = EXCLUDED.meta_description, arabic_content = EXCLUDED.arabic_content, detection_score = EXCLUDED.detection_score, http_status = EXCLUDED.http_status, source_keyword = EXCLUDED.source_keyword """, ( data["url"], url_hash, data["title"], data["meta_description"], data["arabic_content_preview"], data["arabic_score"], keyword, data["http_status"] )) conn.commit() logger.info(f"Saved/Updated Arabic content for URL: {data['url']}") except Exception as e: logger.error(f"Failed to save to arabic_index: {e}") conn.rollback() finally: conn.close() # --- Background Processing --- def process_urls_from_queue(): """Process pending URLs from the queue""" conn = get_db_connection() if not conn: logger.error("Cannot process queue - no database connection") return try: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: # Get pending URLs (limit 10 per batch) cur.execute(""" SELECT pq.id, pq.url, pq.url_hash, si.keyword FROM processing_queue pq JOIN search_ingest si ON pq.search_ingest_id = si.id WHERE pq.status = 'pending' AND pq.attempts < 3 ORDER BY pq.created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED """) pending_items = cur.fetchall() for item in pending_items: # Update attempt count cur.execute(""" UPDATE processing_queue SET attempts = attempts + 1, last_attempt = CURRENT_TIMESTAMP WHERE id = %s """, (item['id'],)) conn.commit() # Process the URL result = scrape_and_check_arabic(item['url'], item['keyword']) # Update processing status if result['has_arabic']: # Successfully processed and saved cur.execute(""" UPDATE processing_queue SET status = 'completed' WHERE id = %s """, (item['id'],)) # Mark search_ingest as processed cur.execute(""" UPDATE search_ingest SET processed = TRUE WHERE url_hash = %s """, (item['url_hash'],)) else: # Failed or no Arabic content status = 'failed' if result['error'] else 'no_arabic' error_msg = result.get('error', 'No Arabic content found') cur.execute(""" UPDATE processing_queue SET status = %s, error_message = %s WHERE id = %s """, (status, error_msg, item['id'])) conn.commit() except Exception as e: logger.error(f"Queue processing failed: {e}") conn.rollback() finally: conn.close() # --- API Endpoints --- @app.get("/") def health_check(): """Simple health check""" conn = get_db_connection() db_status = conn is not None if conn: conn.close() return { "status": "running", "database_connected": db_status, "service": "Arabic Search Indexer" } @app.post("/api/ingest", status_code=status.HTTP_202_ACCEPTED) def ingest_search_data(payload: SearchPayload, background_tasks: BackgroundTasks): """ Receives keywords + URLs, validates them, and stores them in PostgreSQL for processing. """ conn = get_db_connection() if not conn: raise HTTPException(status_code=503, detail="Database service unavailable") try: inserted_count = 0 duplicate_count = 0 with conn.cursor() as cur: for url in payload.results: url_str = str(url) url_hash = get_url_hash(url_str) try: # First check if URL already exists in search_ingest cur.execute(""" SELECT id FROM search_ingest WHERE url_hash = %s """, (url_hash,)) existing = cur.fetchone() if not existing: # Insert into search_ingest table cur.execute(""" INSERT INTO search_ingest (keyword, url, url_hash, created_at, processed) VALUES (%s, %s, %s, CURRENT_TIMESTAMP, FALSE) RETURNING id """, (payload.keyword, url_str, url_hash)) result = cur.fetchone() if result: # Also add to processing queue cur.execute(""" INSERT INTO processing_queue (search_ingest_id, url, url_hash, status) VALUES (%s, %s, %s, 'pending') """, (result[0], url_str, url_hash)) inserted_count += 1 else: duplicate_count += 1 except Exception as e: logger.error(f"Failed to insert URL {url_str}: {e}") conn.rollback() # Rollback the failed transaction continue conn.commit() # Trigger background processing if inserted_count > 0: background_tasks.add_task(process_urls_from_queue) return { "message": f"Successfully queued {inserted_count} URLs for processing", "keyword": payload.keyword, "total_received": len(payload.results), "new_urls": inserted_count, "duplicates_skipped": duplicate_count } except Exception as e: conn.rollback() logger.error(f"Failed to ingest data: {e}") raise HTTPException(status_code=500, detail=f"Failed to store data: {str(e)}") finally: conn.close() @app.post("/api/scrape-and-check") def scrape_and_check_endpoint(url: HttpUrl): """ Endpoint to scrape a single URL and check for Arabic content """ url_str = str(url) result = scrape_and_check_arabic(url_str) if result["error"]: raise HTTPException(status_code=400, detail=result["error"]) return result @app.get("/api/arabic-index") def get_arabic_index(limit: int = 100, offset: int = 0): """ Retrieve entries from the Arabic index """ conn = get_db_connection() if not conn: raise HTTPException(status_code=503, detail="Database service unavailable") try: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute(""" SELECT id, url, title, meta_description, detection_score, scraped_at, source_keyword FROM arabic_index ORDER BY scraped_at DESC LIMIT %s OFFSET %s """, (limit, offset)) results = [] for row in cur.fetchall(): results.append({ 'id': row[0], 'url': row[1], 'title': row[2], 'meta_description': row[3], 'detection_score': row[4], 'scraped_at': row[5].isoformat() if row[5] else None, 'source_keyword': row[6] }) # Get total count cur.execute("SELECT COUNT(*) FROM arabic_index") total = cur.fetchone()[0] return { "total": total, "offset": offset, "limit": limit, "results": results } except Exception as e: logger.error(f"Failed to fetch arabic index: {e}") raise HTTPException(status_code=500, detail="Failed to fetch data") finally: conn.close() @app.post("/api/process-queue") def trigger_queue_processing(background_tasks: BackgroundTasks): """ Manually trigger queue processing """ background_tasks.add_task(process_urls_from_queue) return {"message": "Queue processing triggered"} @app.get("/api/stats") def get_stats(): """ Get statistics about the index """ conn = get_db_connection() if not conn: raise HTTPException(status_code=503, detail="Database service unavailable") try: with conn.cursor() as cur: # Get total URLs ingested cur.execute("SELECT COUNT(*) FROM search_ingest") total_ingested = cur.fetchone()[0] # Get processed URLs cur.execute("SELECT COUNT(*) FROM search_ingest WHERE processed = TRUE") total_processed = cur.fetchone()[0] # Get Arabic index count cur.execute("SELECT COUNT(*) FROM arabic_index") total_arabic = cur.fetchone()[0] # Get queue stats cur.execute(""" SELECT status, COUNT(*) FROM processing_queue GROUP BY status """) queue_stats = {row[0]: row[1] for row in cur.fetchall()} return { "total_urls_ingested": total_ingested, "total_urls_processed": total_processed, "total_arabic_pages": total_arabic, "queue_status": queue_stats, "processing_rate": f"{(total_processed/total_ingested*100):.1f}%" if total_ingested > 0 else "0%" } except Exception as e: logger.error(f"Failed to get stats: {e}") raise HTTPException(status_code=500, detail="Failed to get statistics") finally: conn.close() # --- Run with: uvicorn main:app --reload --host 0.0.0.0 --port 8000