diff --git a/main.py b/main.py index 85ce26a..5f58afe 100644 --- a/main.py +++ b/main.py @@ -14,7 +14,7 @@ from datetime import datetime import logging # --- Configuration --- -DATABASE_URL = os.getenv("DATABASE_URL", "") +DATABASE_URL = os.getenv("DATABASE_URL") # --- Logging Setup --- logging.basicConfig(level=logging.INFO) @@ -63,14 +63,13 @@ def init_database(): id SERIAL PRIMARY KEY, keyword VARCHAR(500) NOT NULL, url TEXT NOT NULL, - url_hash VARCHAR(64) NOT NULL, + url_hash VARCHAR(64) NOT NULL UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - processed BOOLEAN DEFAULT FALSE, - UNIQUE(url_hash) + processed BOOLEAN DEFAULT FALSE ) """) - # Create index for faster lookups + # Create indexes for faster lookups cur.execute(""" CREATE INDEX IF NOT EXISTS idx_search_ingest_url_hash ON search_ingest(url_hash) @@ -94,27 +93,47 @@ def init_database(): scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, source_keyword VARCHAR(500), - http_status INTEGER, - INDEX(url_hash) + http_status INTEGER ) """) + # Create indexes for arabic_index + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_arabic_index_url_hash + ON arabic_index(url_hash) + """) + + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_arabic_index_scraped_at + ON arabic_index(scraped_at) + """) + # Create table for processing queue cur.execute(""" CREATE TABLE IF NOT EXISTS processing_queue ( id SERIAL PRIMARY KEY, - search_ingest_id INTEGER REFERENCES search_ingest(id), + search_ingest_id INTEGER REFERENCES search_ingest(id) ON DELETE CASCADE, url TEXT NOT NULL, - url_hash VARCHAR(64) NOT NULL, + url_hash VARCHAR(64) NOT NULL UNIQUE, status VARCHAR(50) DEFAULT 'pending', attempts INTEGER DEFAULT 0, last_attempt TIMESTAMP, error_message TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - UNIQUE(url_hash) + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) + # Create indexes for processing_queue + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_processing_queue_status + ON processing_queue(status) + """) + + cur.execute(""" + CREATE INDEX IF NOT EXISTS idx_processing_queue_url_hash + ON processing_queue(url_hash) + """) + conn.commit() logger.info("Database initialized successfully") except Exception as e: @@ -134,7 +153,7 @@ def contains_arabic(text: str) -> bool: Check if text contains Arabic characters. Arabic Unicode block: U+0600 to U+06FF """ - if not text: + if not text or not isinstance(text, str): return False # Arabic Unicode range pattern @@ -145,7 +164,7 @@ def extract_arabic_content(text: str, max_length: int = 500) -> str: """ Extract Arabic content preview from text """ - if not text: + if not text or not isinstance(text, str): return "" arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]+') @@ -162,7 +181,7 @@ def calculate_arabic_score(text: str) -> float: """ Calculate the percentage of Arabic characters in the text """ - if not text or len(text) == 0: + if not text or not isinstance(text, str) or len(text) == 0: return 0.0 arabic_chars = len(re.findall(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]', text)) @@ -387,6 +406,7 @@ def ingest_search_data(payload: SearchPayload, background_tasks: BackgroundTasks try: inserted_count = 0 + duplicate_count = 0 with conn.cursor() as cur: for url in payload.results: @@ -394,28 +414,37 @@ def ingest_search_data(payload: SearchPayload, background_tasks: BackgroundTasks url_hash = get_url_hash(url_str) try: - # Insert into search_ingest table (ignore duplicates) + # First check if URL already exists in search_ingest cur.execute(""" - INSERT INTO search_ingest (keyword, url, url_hash, created_at, processed) - VALUES (%s, %s, %s, CURRENT_TIMESTAMP, FALSE) - ON CONFLICT (url_hash) DO NOTHING - RETURNING id - """, (payload.keyword, url_str, url_hash)) + SELECT id FROM search_ingest WHERE url_hash = %s + """, (url_hash,)) - result = cur.fetchone() + existing = cur.fetchone() - if result: - # Also add to processing queue + if not existing: + # Insert into search_ingest table cur.execute(""" - INSERT INTO processing_queue (search_ingest_id, url, url_hash, status) - VALUES (%s, %s, %s, 'pending') - ON CONFLICT (url_hash) DO NOTHING - """, (result[0], url_str, url_hash)) + INSERT INTO search_ingest (keyword, url, url_hash, created_at, processed) + VALUES (%s, %s, %s, CURRENT_TIMESTAMP, FALSE) + RETURNING id + """, (payload.keyword, url_str, url_hash)) - inserted_count += 1 + result = cur.fetchone() + + if result: + # Also add to processing queue + cur.execute(""" + INSERT INTO processing_queue (search_ingest_id, url, url_hash, status) + VALUES (%s, %s, %s, 'pending') + """, (result[0], url_str, url_hash)) + + inserted_count += 1 + else: + duplicate_count += 1 except Exception as e: logger.error(f"Failed to insert URL {url_str}: {e}") + conn.rollback() # Rollback the failed transaction continue conn.commit() @@ -429,7 +458,7 @@ def ingest_search_data(payload: SearchPayload, background_tasks: BackgroundTasks "keyword": payload.keyword, "total_received": len(payload.results), "new_urls": inserted_count, - "duplicates_skipped": len(payload.results) - inserted_count + "duplicates_skipped": duplicate_count } except Exception as e: @@ -471,7 +500,17 @@ def get_arabic_index(limit: int = 100, offset: int = 0): LIMIT %s OFFSET %s """, (limit, offset)) - results = [dict(row) for row in cur.fetchall()] + results = [] + for row in cur.fetchall(): + results.append({ + 'id': row[0], + 'url': row[1], + 'title': row[2], + 'meta_description': row[3], + 'detection_score': row[4], + 'scraped_at': row[5].isoformat() if row[5] else None, + 'source_keyword': row[6] + }) # Get total count cur.execute("SELECT COUNT(*) FROM arabic_index") @@ -497,4 +536,48 @@ def trigger_queue_processing(background_tasks: BackgroundTasks): background_tasks.add_task(process_urls_from_queue) return {"message": "Queue processing triggered"} -# --- Run with: uvicorn main:app --reload \ No newline at end of file +@app.get("/api/stats") +def get_stats(): + """ + Get statistics about the index + """ + conn = get_db_connection() + if not conn: + raise HTTPException(status_code=503, detail="Database service unavailable") + + try: + with conn.cursor() as cur: + # Get total URLs ingested + cur.execute("SELECT COUNT(*) FROM search_ingest") + total_ingested = cur.fetchone()[0] + + # Get processed URLs + cur.execute("SELECT COUNT(*) FROM search_ingest WHERE processed = TRUE") + total_processed = cur.fetchone()[0] + + # Get Arabic index count + cur.execute("SELECT COUNT(*) FROM arabic_index") + total_arabic = cur.fetchone()[0] + + # Get queue stats + cur.execute(""" + SELECT status, COUNT(*) + FROM processing_queue + GROUP BY status + """) + queue_stats = {row[0]: row[1] for row in cur.fetchall()} + + return { + "total_urls_ingested": total_ingested, + "total_urls_processed": total_processed, + "total_arabic_pages": total_arabic, + "queue_status": queue_stats, + "processing_rate": f"{(total_processed/total_ingested*100):.1f}%" if total_ingested > 0 else "0%" + } + except Exception as e: + logger.error(f"Failed to get stats: {e}") + raise HTTPException(status_code=500, detail="Failed to get statistics") + finally: + conn.close() + +# --- Run with: uvicorn main:app --reload --host 0.0.0.0 --port 8000 \ No newline at end of file