الملفات
ingestion_api/main.py
2026-03-15 17:02:00 +00:00

500 أسطر
17 KiB
Python

# main.py
import os
import re
import psycopg2
import psycopg2.extras
import requests
from bs4 import BeautifulSoup
from fastapi import FastAPI, HTTPException, status, BackgroundTasks
from pydantic import BaseModel, Field, HttpUrl
from urllib.parse import urlparse
import hashlib
from typing import List, Optional
from datetime import datetime
import logging
# --- Configuration ---
DATABASE_URL = os.getenv("DATABASE_URL", "")
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Models ---
class SearchPayload(BaseModel):
"""
Input model matching your description.
"""
keyword: str = Field(..., description="The search keyword used")
results: List[HttpUrl] = Field(..., description="List of URLs to process")
class ArabicContentResponse(BaseModel):
url: str
has_arabic: bool
title: Optional[str] = None
meta_description: Optional[str] = None
arabic_content_preview: Optional[str] = None
# --- App Setup ---
app = FastAPI(title="Arabic Search Ingestion API")
# --- Database Setup ---
def get_db_connection():
"""Create a database connection"""
try:
conn = psycopg2.connect(DATABASE_URL)
return conn
except Exception as e:
logger.error(f"Database connection failed: {e}")
return None
def init_database():
"""Initialize database tables if they don't exist"""
conn = get_db_connection()
if not conn:
logger.error("Cannot initialize database - no connection")
return
try:
with conn.cursor() as cur:
# Create table for raw search data
cur.execute("""
CREATE TABLE IF NOT EXISTS search_ingest (
id SERIAL PRIMARY KEY,
keyword VARCHAR(500) NOT NULL,
url TEXT NOT NULL,
url_hash VARCHAR(64) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
UNIQUE(url_hash)
)
""")
# Create index for faster lookups
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_search_ingest_url_hash
ON search_ingest(url_hash)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_search_ingest_processed
ON search_ingest(processed)
""")
# Create table for Arabic content index
cur.execute("""
CREATE TABLE IF NOT EXISTS arabic_index (
id SERIAL PRIMARY KEY,
url TEXT NOT NULL UNIQUE,
url_hash VARCHAR(64) NOT NULL UNIQUE,
title TEXT,
meta_description TEXT,
arabic_content TEXT,
detection_score FLOAT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_keyword VARCHAR(500),
http_status INTEGER,
INDEX(url_hash)
)
""")
# Create table for processing queue
cur.execute("""
CREATE TABLE IF NOT EXISTS processing_queue (
id SERIAL PRIMARY KEY,
search_ingest_id INTEGER REFERENCES search_ingest(id),
url TEXT NOT NULL,
url_hash VARCHAR(64) NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
attempts INTEGER DEFAULT 0,
last_attempt TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(url_hash)
)
""")
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization failed: {e}")
conn.rollback()
finally:
conn.close()
# Initialize database on startup
@app.on_event("startup")
def startup_event():
init_database()
# --- Arabic Language Detection ---
def contains_arabic(text: str) -> bool:
"""
Check if text contains Arabic characters.
Arabic Unicode block: U+0600 to U+06FF
"""
if not text:
return False
# Arabic Unicode range pattern
arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]')
return bool(arabic_pattern.search(text))
def extract_arabic_content(text: str, max_length: int = 500) -> str:
"""
Extract Arabic content preview from text
"""
if not text:
return ""
arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]+')
matches = arabic_pattern.findall(text)
if matches:
# Join matches and truncate
arabic_text = " ".join(matches)
return arabic_text[:max_length] + "..." if len(arabic_text) > max_length else arabic_text
return ""
def calculate_arabic_score(text: str) -> float:
"""
Calculate the percentage of Arabic characters in the text
"""
if not text or len(text) == 0:
return 0.0
arabic_chars = len(re.findall(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]', text))
total_chars = len(text)
return (arabic_chars / total_chars) * 100 if total_chars > 0 else 0.0
def get_url_hash(url: str) -> str:
"""Create a unique hash for a URL"""
return hashlib.sha256(url.encode('utf-8')).hexdigest()
# --- Scraping Function ---
def scrape_and_check_arabic(url: str, keyword: str = None) -> dict:
"""
Scrape URL and check for Arabic content
"""
result = {
"url": url,
"has_arabic": False,
"title": None,
"meta_description": None,
"arabic_content_preview": None,
"arabic_score": 0.0,
"http_status": None,
"error": None
}
try:
# Set a user agent to avoid being blocked
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; ArabicIndexBot/1.0; +http://example.com/bot)'
}
# Make request with timeout
response = requests.get(url, headers=headers, timeout=10, allow_redirects=True)
result["http_status"] = response.status_code
if response.status_code == 200:
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Extract title
title_tag = soup.find('title')
if title_tag:
result["title"] = title_tag.get_text().strip()
# Extract meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and meta_desc.get('content'):
result["meta_description"] = meta_desc['content'].strip()
# Extract main content (remove script, style tags)
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# Get text content
text_content = soup.get_text(separator=' ', strip=True)
# Check for Arabic
has_arabic_title = contains_arabic(result["title"])
has_arabic_meta = contains_arabic(result["meta_description"])
has_arabic_content = contains_arabic(text_content)
result["has_arabic"] = has_arabic_title or has_arabic_meta or has_arabic_content
if result["has_arabic"]:
# Calculate Arabic score
result["arabic_score"] = calculate_arabic_score(text_content)
# Extract Arabic content preview
arabic_preview = extract_arabic_content(text_content)
result["arabic_content_preview"] = arabic_preview
# Save to database
save_to_arabic_index(result, keyword)
except requests.Timeout:
result["error"] = "Request timeout"
except requests.RequestException as e:
result["error"] = f"Request failed: {str(e)}"
except Exception as e:
result["error"] = f"Scraping failed: {str(e)}"
return result
def save_to_arabic_index(data: dict, keyword: str = None):
"""Save Arabic content to the arabic_index table"""
conn = get_db_connection()
if not conn:
logger.error("Cannot save to arabic_index - no database connection")
return
try:
url_hash = get_url_hash(data["url"])
with conn.cursor() as cur:
# Use INSERT ... ON CONFLICT to handle duplicates
cur.execute("""
INSERT INTO arabic_index
(url, url_hash, title, meta_description, arabic_content, detection_score, source_keyword, http_status, last_updated)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (url_hash)
DO UPDATE SET
last_updated = CURRENT_TIMESTAMP,
title = EXCLUDED.title,
meta_description = EXCLUDED.meta_description,
arabic_content = EXCLUDED.arabic_content,
detection_score = EXCLUDED.detection_score,
http_status = EXCLUDED.http_status,
source_keyword = EXCLUDED.source_keyword
""", (
data["url"],
url_hash,
data["title"],
data["meta_description"],
data["arabic_content_preview"],
data["arabic_score"],
keyword,
data["http_status"]
))
conn.commit()
logger.info(f"Saved/Updated Arabic content for URL: {data['url']}")
except Exception as e:
logger.error(f"Failed to save to arabic_index: {e}")
conn.rollback()
finally:
conn.close()
# --- Background Processing ---
def process_urls_from_queue():
"""Process pending URLs from the queue"""
conn = get_db_connection()
if not conn:
logger.error("Cannot process queue - no database connection")
return
try:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Get pending URLs (limit 10 per batch)
cur.execute("""
SELECT pq.id, pq.url, pq.url_hash, si.keyword
FROM processing_queue pq
JOIN search_ingest si ON pq.search_ingest_id = si.id
WHERE pq.status = 'pending' AND pq.attempts < 3
ORDER BY pq.created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
""")
pending_items = cur.fetchall()
for item in pending_items:
# Update attempt count
cur.execute("""
UPDATE processing_queue
SET attempts = attempts + 1, last_attempt = CURRENT_TIMESTAMP
WHERE id = %s
""", (item['id'],))
conn.commit()
# Process the URL
result = scrape_and_check_arabic(item['url'], item['keyword'])
# Update processing status
if result['has_arabic']:
# Successfully processed and saved
cur.execute("""
UPDATE processing_queue
SET status = 'completed'
WHERE id = %s
""", (item['id'],))
# Mark search_ingest as processed
cur.execute("""
UPDATE search_ingest
SET processed = TRUE
WHERE url_hash = %s
""", (item['url_hash'],))
else:
# Failed or no Arabic content
status = 'failed' if result['error'] else 'no_arabic'
error_msg = result.get('error', 'No Arabic content found')
cur.execute("""
UPDATE processing_queue
SET status = %s, error_message = %s
WHERE id = %s
""", (status, error_msg, item['id']))
conn.commit()
except Exception as e:
logger.error(f"Queue processing failed: {e}")
conn.rollback()
finally:
conn.close()
# --- API Endpoints ---
@app.get("/")
def health_check():
"""Simple health check"""
conn = get_db_connection()
db_status = conn is not None
if conn:
conn.close()
return {
"status": "running",
"database_connected": db_status,
"service": "Arabic Search Indexer"
}
@app.post("/api/ingest", status_code=status.HTTP_202_ACCEPTED)
def ingest_search_data(payload: SearchPayload, background_tasks: BackgroundTasks):
"""
Receives keywords + URLs, validates them,
and stores them in PostgreSQL for processing.
"""
conn = get_db_connection()
if not conn:
raise HTTPException(status_code=503, detail="Database service unavailable")
try:
inserted_count = 0
with conn.cursor() as cur:
for url in payload.results:
url_str = str(url)
url_hash = get_url_hash(url_str)
try:
# Insert into search_ingest table (ignore duplicates)
cur.execute("""
INSERT INTO search_ingest (keyword, url, url_hash, created_at, processed)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP, FALSE)
ON CONFLICT (url_hash) DO NOTHING
RETURNING id
""", (payload.keyword, url_str, url_hash))
result = cur.fetchone()
if result:
# Also add to processing queue
cur.execute("""
INSERT INTO processing_queue (search_ingest_id, url, url_hash, status)
VALUES (%s, %s, %s, 'pending')
ON CONFLICT (url_hash) DO NOTHING
""", (result[0], url_str, url_hash))
inserted_count += 1
except Exception as e:
logger.error(f"Failed to insert URL {url_str}: {e}")
continue
conn.commit()
# Trigger background processing
if inserted_count > 0:
background_tasks.add_task(process_urls_from_queue)
return {
"message": f"Successfully queued {inserted_count} URLs for processing",
"keyword": payload.keyword,
"total_received": len(payload.results),
"new_urls": inserted_count,
"duplicates_skipped": len(payload.results) - inserted_count
}
except Exception as e:
conn.rollback()
logger.error(f"Failed to ingest data: {e}")
raise HTTPException(status_code=500, detail=f"Failed to store data: {str(e)}")
finally:
conn.close()
@app.post("/api/scrape-and-check")
def scrape_and_check_endpoint(url: HttpUrl):
"""
Endpoint to scrape a single URL and check for Arabic content
"""
url_str = str(url)
result = scrape_and_check_arabic(url_str)
if result["error"]:
raise HTTPException(status_code=400, detail=result["error"])
return result
@app.get("/api/arabic-index")
def get_arabic_index(limit: int = 100, offset: int = 0):
"""
Retrieve entries from the Arabic index
"""
conn = get_db_connection()
if not conn:
raise HTTPException(status_code=503, detail="Database service unavailable")
try:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("""
SELECT id, url, title, meta_description,
detection_score, scraped_at, source_keyword
FROM arabic_index
ORDER BY scraped_at DESC
LIMIT %s OFFSET %s
""", (limit, offset))
results = [dict(row) for row in cur.fetchall()]
# Get total count
cur.execute("SELECT COUNT(*) FROM arabic_index")
total = cur.fetchone()[0]
return {
"total": total,
"offset": offset,
"limit": limit,
"results": results
}
except Exception as e:
logger.error(f"Failed to fetch arabic index: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch data")
finally:
conn.close()
@app.post("/api/process-queue")
def trigger_queue_processing(background_tasks: BackgroundTasks):
"""
Manually trigger queue processing
"""
background_tasks.add_task(process_urls_from_queue)
return {"message": "Queue processing triggered"}
# --- Run with: uvicorn main:app --reload