#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Automate 100 Arabic search queries on https://mithal.space/ for research. Uses Playwright (sync API) to capture screenshots and structured result data. """ from __future__ import annotations # ============================================================================= # STEP 1 – Verify the Playwright Python package is importable # ============================================================================= import sys try: import playwright # noqa: F401 – package presence check only here except ImportError: print("=" * 70) print("ERROR: Playwright Python package is NOT installed for this Python.") print("=" * 70) print() print("You are running:") print(f" {sys.executable}") print() print("Install Playwright with the SAME interpreter you use to run this script:") print() print(f' "{sys.executable}" -m pip install playwright') print(f' "{sys.executable}" -m playwright install') print() print("If `playwright` is not found as a command, always use -m playwright instead.") print("=" * 70) sys.exit(1) # Safe to import sync API after the package check above from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright import csv import json import logging import os import re import time import traceback from datetime import datetime from pathlib import Path from urllib.parse import quote # Base URL and timing constants (milliseconds unless noted) BASE_URL = "https://mithal.space/search" NAVIGATION_TIMEOUT_MS = 15_000 RESULTS_WAIT_TIMEOUT_MS = 10_000 DELAY_BETWEEN_QUERIES_SEC = 2 MAX_RETRIES = 2 # up to 2 retries after the first attempt (3 tries total) BROWSER_LAUNCH_TIMEOUT_MS = 10_000 STARTUP_TEST_TIMEOUT_MS = 15_000 # CSS selectors for mithal.space (with fallbacks) RESULTS_CONTAINER_SELECTORS = [ "#results-container", ".results-container", "#visible-results", "a.result-link", ".result", "div[class*='result']", ] # JavaScript run in the page to extract results (Arabic-safe via browser DOM) EXTRACT_RESULTS_JS = r"""() => { const norm = (s) => (s || '').replace(/\s+/g, ' ').trim(); const links = Array.from(document.querySelectorAll('a.result-link')); const results = links.slice(0, 10).map((anchor) => { const box = anchor.querySelector('.result') || anchor; const titleEl = box.querySelector('h3'); const snippetEl = box.querySelector('p'); const urlEl = box.querySelector('small.arabic-url'); const href = anchor.getAttribute('href') || ''; const dataUrl = urlEl ? urlEl.getAttribute('data-url') : ''; return { title: norm(titleEl ? titleEl.innerText : anchor.innerText.split('\n')[0]), url: norm(dataUrl || href), snippet: norm(snippetEl ? snippetEl.innerText : ''), }; }); let resultCount = null; const bodyText = document.body ? document.body.innerText : ''; const countPatterns = [ /(\d[\d,\.]*)\s*نتيجة/iu, /(\d[\d,\.]*)\s*نتائج/iu, /نتائج\s*[::]?\s*(\d[\d,\.]*)/iu, /(\d[\d,\.]*)\s*results?/i, /about\s+(\d[\d,\.]*)\s+results?/i, ]; for (const re of countPatterns) { const m = bodyText.match(re); if (m) { resultCount = parseInt(m[1].replace(/[,\.]/g, ''), 10); if (!Number.isNaN(resultCount)) break; } } if (resultCount === null) { const stats = document.querySelector('.search-header-stats'); if (stats) { const digits = (stats.innerText || '').match(/\d+/); if (digits) resultCount = parseInt(digits[0], 10); } } if (resultCount === null) { const itemCount = document.querySelectorAll('a.result-link, .result').length; if (itemCount > 0) resultCount = itemCount; } const special = []; const spellPatterns = [/هل تقصد/iu, /did you mean/i, /ربما تقصد/iu]; for (const el of document.querySelectorAll('a, p, div, span, h2, h3, h4')) { const t = norm(el.innerText); if (!t || t.length > 300) continue; if (spellPatterns.some((re) => re.test(t))) { special.push({ type: 'spell_suggestion', text: t }); break; } } for (const sel of ['.related-searches', '[class*="related"]', '[id*="related"]']) { const block = document.querySelector(sel); if (block && norm(block.innerText)) { special.push({ type: 'related_searches', text: norm(block.innerText).slice(0, 500) }); break; } } for (const sel of ['.knowledge-panel', '[class*="knowledge"]', '[id*="knowledge"]']) { const block = document.querySelector(sel); if (block && norm(block.innerText)) { special.push({ type: 'knowledge_panel', text: norm(block.innerText).slice(0, 500) }); break; } } const aiPanel = document.querySelector('.ai-mode-tab.active, [class*="ai-response"], #aiResponse'); if (aiPanel && norm(aiPanel.innerText)) { special.push({ type: 'ai_mode', text: norm(aiPanel.innerText).slice(0, 300) }); } return { result_count: resultCount, results, special_features: special }; }""" def _is_browser_missing_error(exc: BaseException) -> bool: """Detect Playwright errors that mean browser binaries were not installed.""" msg = str(exc).lower() needles = ( "executable doesn't exist", "executable does not exist", "browser not found", "failed to launch", "please run the following command to download", "playwright install", "browserType.launch", ) return any(n in msg for n in needles) def _get_playwright_package_version() -> str: """Read installed playwright version from package metadata.""" try: from importlib.metadata import version return version("playwright") except Exception: return "unknown (could not read package metadata)" def print_playwright_debug_info(playwright_instance) -> None: """Print version, Python path, and browser install locations for debugging.""" print("\n" + "=" * 70) print("PLAYWRIGHT DEBUG INFO") print("=" * 70) print(f"Playwright package version: {_get_playwright_package_version()}") print(f"Python executable: {sys.executable}") print(f"Python version: {sys.version.split()[0]}") env_browsers = os.environ.get("PLAYWRIGHT_BROWSERS_PATH") if env_browsers: print(f"PLAYWRIGHT_BROWSERS_PATH: {env_browsers}") else: if sys.platform == "win32": default_path = Path.home() / "AppData" / "Local" / "ms-playwright" elif sys.platform == "darwin": default_path = Path.home() / "Library" / "Caches" / "ms-playwright" else: default_path = Path.home() / ".cache" / "ms-playwright" print(f"Default browsers folder: {default_path}") print(" (override with PLAYWRIGHT_BROWSERS_PATH environment variable)") try: chromium_path = playwright_instance.chromium.executable_path print(f"Chromium executable path: {chromium_path}") print(f" exists on disk: {Path(chromium_path).exists()}") except Exception as exc: print(f"Chromium executable path: (could not resolve: {exc})") print("=" * 70 + "\n") def verify_browsers_installed(playwright_instance) -> None: """ STEP 2 – Try launching Chromium with a short timeout. Exit with install instructions if browser binaries are missing. """ print("Checking Chromium browser installation...") try: browser = playwright_instance.chromium.launch( headless=True, timeout=BROWSER_LAUNCH_TIMEOUT_MS, ) browser.close() print("Chromium launched successfully.\n") except Exception as exc: print("\n" + "=" * 70) print("ERROR: Could not launch Chromium browser.") print("=" * 70) print(f"\nExact error:\n {type(exc).__name__}: {exc}\n") if _is_browser_missing_error(exc): print("Browser binaries appear to be missing or incomplete.") print("\nRun these commands with the SAME Python you use for this script:\n") print(f' "{sys.executable}" -m playwright install') print(f' "{sys.executable}" -m playwright install chromium') print("\nOptional – install all browsers:") print(f' "{sys.executable}" -m playwright install --with-deps') else: print("If this is unexpected, try reinstalling browsers:") print(f' "{sys.executable}" -m playwright install chromium') traceback.print_exc() print("=" * 70) sys.exit(1) def run_startup_browser_test(playwright_instance, test_dir: Path) -> None: """ STEP 3 – Quick smoke test: open example.com, screenshot, close. Exits with the exact error if anything fails. """ print("Running startup browser test (https://example.com)...") test_dir.mkdir(parents=True, exist_ok=True) test_screenshot = test_dir / "_playwright_startup_test.png" try: with playwright_instance.chromium.launch( headless=True, timeout=BROWSER_LAUNCH_TIMEOUT_MS, ) as browser: with browser.new_context() as context: page = context.new_page() page.goto( "https://example.com", wait_until="domcontentloaded", timeout=STARTUP_TEST_TIMEOUT_MS, ) page.screenshot(path=str(test_screenshot)) title = page.title() print(f"Startup test PASSED (page title: {title!r})") print(f"Test screenshot saved: {test_screenshot.resolve()}\n") except Exception as exc: print("\n" + "=" * 70) print("ERROR: Startup browser test FAILED.") print("=" * 70) print(f"\nExact error:\n {type(exc).__name__}: {exc}\n") traceback.print_exc() print("\nFix suggestions:") print(f' 1. "{sys.executable}" -m playwright install chromium') print(" 2. Check firewall / proxy if navigation to example.com fails") print(" 3. Confirm you run the script with the same Python that has playwright:") print(f' "{sys.executable}" mithal_search_automation.py') print("=" * 70) sys.exit(1) def generate_arabic_topics() -> list[str]: """ Build exactly 100 diverse Arabic search queries in-code (no external file). Twelve thematic buckets; each bucket contributes several phrases. """ category_phrases: dict[str, list[str]] = { "science": [ "الفيزياء الكمية", "تلسكوب جيمس ويب", "نظرية النسبية", "الخلايا الجذعية", "تغير المناخ والعلوم", "الحمض النووي DNA", "الطاقة الشمسية", "المجرة والثقوب السوداء", "التطعيمات والمناعة", ], "technology": [ "الذكاء الاصطناعي", "تعلم الآلة", "الحوسبة السحابية", "الأمن السيبراني", "بلوك تشين", "إنترنت الأشياء", "الواقع الافتراضي", "5G والاتصالات", "روبوتات الخدمة", ], "history": [ "الحضارة الإسلامية", "الدولة العباسية", "الحرب العالمية الأولى", "الحرب العالمية الثانية", "الحضارة المصرية القديمة", "الأندلس", "الدولة العثمانية", "ثورة التكنولوجيا الصناعية", "تاريخ الخليج العربي", ], "culture": [ "الأدب العربي", "الشعر الجاهلي", "المطبخ العربي", "التراث الشعبي", "الخط العربي", "الموسيقى العربية", "الأزياء التقليدية", "الاحتفالات الوطنية", "اللغة العربية الفصحى", ], "religion": [ "القرآن الكريم", "الحديث النبوي", "الصلاة وأحكامها", "الزكاة والصدقة", "الحج والعمرة", "الأخلاق الإسلامية", "الفقه الإسلامي", "السيرة النبوية", "الإيمان والتوحيد", ], "sports": [ "كرة القدم العالمية", "دوري روشن السعودي", "كأس العالم FIFA", "الألعاب الأولمبية", "كرة السلة NBA", "التنس وجائزة ويمبلدون", "السباقات والماراثون", "الرياضة النسائية", "اللياقة البدنية", ], "business": [ "ريادة الأعمال", "التسويق الرقمي", "الشركات الناشئة", "الاستثمار في الأسهم", "إدارة المشاريع", "التجارة الإلكترونية", "ريادة الأعمال الاجتماعية", "رؤية السعودية 2030", "الابتكار في الأعمال", ], "health": [ "التغذية الصحية", "السكري وطرق الوقاية", "الصحة النفسية", "اللقاحات والأوبئة", "الرياضة والصحة", "النوم وجودته", "أمراض القلب", "الطب عن بعد", "الصحة العامة", ], "arts": [ "الرسم والفن التشكيلي", "السينما العربية", "المسرح والدراما", "التصوير الفوتوغرافي", "العمارة الإسلامية", "النحت والتركيب", "الفن الرقمي", "الأوبرا والباليه", "المتاحف والمعارض", ], "politics": [ "الأمم المتحدة", "حقوق الإنسان", "الدبلوماسية الدولية", "الانتخابات والديمقراطية", "العلاقات العربية", "القانون الدولي", "السياسة الخارجية", "الحكم الرشيد", "السلام والنزاعات", ], "environment": [ "إعادة التدوير", "التنوع البيولوجي", "الطاقة المتجددة", "تلوث الهواء", "حماية المحيطات", "الغابات والتصحر", "المياه العذبة", "الاستدامة البيئية", "انبعاثات الكربون", ], "daily_life": [ "وصفات طبخ سريعة", "تربية الأطفال", "إدارة الوقت", "السفر والسياحة", "التسوق عبر الإنترنت", "العمل من المنزل", "الادخار والميزانية", "الحيوانات الأليفة", "الحدائق المنزلية", ], } modifiers = [ "{phrase}", "ما هو {phrase}", "أفضل {phrase}", "تاريخ {phrase}", "دليل {phrase} للمبتدئين", ] topics: list[str] = [] for category_index, (_category, phrases) in enumerate(category_phrases.items()): for phrase_index, phrase in enumerate(phrases): template = modifiers[(category_index + phrase_index) % len(modifiers)] topics.append(template.format(phrase=phrase)) return topics[:100] def sanitize_filename(topic: str, max_length: int = 80) -> str: """Make a Windows-safe filename stem from an Arabic topic string.""" invalid_chars = r'<>:"/\|?*' cleaned = "".join("_" if ch in invalid_chars else ch for ch in topic) cleaned = re.sub(r"\s+", " ", cleaned).strip().strip(".") if not cleaned: cleaned = "topic" if len(cleaned) > max_length: cleaned = cleaned[:max_length].rstrip() return cleaned def build_search_url(topic: str) -> str: """URL-encode the Arabic query for the search endpoint.""" return f"{BASE_URL}?q={quote(topic, safe='')}" def setup_output_dirs() -> tuple[Path, Path, Path]: """Create output/YYYY-MM-DD_HH-MM-SS/ with screenshots/ subfolder.""" run_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") run_dir = Path("output") / run_timestamp screenshots_dir = run_dir / "screenshots" screenshots_dir.mkdir(parents=True, exist_ok=True) csv_path = run_dir / "data.csv" return run_dir, screenshots_dir, csv_path def setup_logging(log_path: Path) -> logging.Logger: """Plain-text log file plus console output for actions and errors.""" logger = logging.getLogger("mithal_search") logger.setLevel(logging.DEBUG) logger.handlers.clear() formatter = logging.Formatter( "%(asctime)s | %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) file_handler = logging.FileHandler(log_path, encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger def wait_for_results_container(page, logger: logging.Logger) -> str | None: """ Wait until any known results indicator appears, or fall back after ~10 seconds. Returns the selector that matched, or None on timeout. """ per_selector_ms = max(2000, RESULTS_WAIT_TIMEOUT_MS // len(RESULTS_CONTAINER_SELECTORS)) for selector in RESULTS_CONTAINER_SELECTORS: try: page.wait_for_selector(selector, timeout=per_selector_ms) logger.debug("Results container found via: %s", selector) return selector except PlaywrightTimeoutError: continue logger.warning( "No results container matched within %sms; continuing anyway.", RESULTS_WAIT_TIMEOUT_MS, ) return None def extract_page_data(page) -> dict: """Pull result count, top 10 hits, and special UI blocks from the live DOM.""" return page.evaluate(EXTRACT_RESULTS_JS) def process_single_query( page, topic: str, screenshots_dir: Path, logger: logging.Logger, used_filenames: set[str], ) -> dict: """ Navigate, wait, screenshot, and extract data for one topic. Retries up to MAX_RETRIES times on failure. """ search_url = build_search_url(topic) last_error = "" for attempt in range(1, MAX_RETRIES + 2): try: logger.info("Attempt %s | %s | %s", attempt, topic[:60], search_url) page.goto( search_url, wait_until="domcontentloaded", timeout=NAVIGATION_TIMEOUT_MS, ) matched = wait_for_results_container(page, logger) if matched is None: logger.warning("Proceeding without confirmed results container for: %s", topic) page.wait_for_timeout(500) stem = sanitize_filename(topic) if stem in used_filenames: stem = f"{stem}_{attempt}" used_filenames.add(stem) screenshot_path = screenshots_dir / f"{stem}.png" page.screenshot(path=str(screenshot_path), full_page=True) logger.info("Screenshot saved: %s", screenshot_path) extracted = extract_page_data(page) return { "success": True, "error_message": "", "search_url": search_url, "result_count": extracted.get("result_count"), "results": extracted.get("results", []), "special_features": extracted.get("special_features", []), "screenshot": str(screenshot_path), } except PlaywrightTimeoutError as exc: last_error = f"Timeout: {exc}" logger.warning("Attempt %s timed out for '%s': %s", attempt, topic, exc) except Exception as exc: last_error = f"{type(exc).__name__}: {exc}" logger.warning("Attempt %s failed for '%s': %s", attempt, topic, exc) if attempt <= MAX_RETRIES: logger.info("Retrying (%s/%s)...", attempt, MAX_RETRIES) time.sleep(1) return { "success": False, "error_message": last_error, "search_url": search_url, "result_count": None, "results": [], "special_features": [], "screenshot": "", } def write_csv_row( writer: csv.DictWriter, row_timestamp: str, topic: str, payload: dict, ) -> None: """Append one row to data.csv with UTF-8-safe JSON in results_json.""" special_text = json.dumps(payload.get("special_features", []), ensure_ascii=False) results_json = json.dumps(payload.get("results", []), ensure_ascii=False) writer.writerow( { "timestamp": row_timestamp, "topic": topic, "search_url": payload.get("search_url", ""), "result_count": payload.get("result_count") if payload.get("result_count") is not None else "", "success": payload.get("success", False), "error_message": payload.get("error_message", ""), "special_features": special_text, "results_json": results_json, } ) def run_automation() -> None: """Main entry: preflight checks, then 100 queries with sync Playwright.""" topics = generate_arabic_topics() if len(topics) != 100: print(f"Warning: expected 100 topics, got {len(topics)}.") run_dir, screenshots_dir, csv_path = setup_output_dirs() log_path = run_dir / "log.txt" logger = setup_logging(log_path) logger.info("Starting mithal.space search automation") logger.info("Output directory: %s", run_dir.resolve()) logger.info("Total topics: %s", len(topics)) csv_columns = [ "timestamp", "topic", "search_url", "result_count", "success", "error_message", "special_features", "results_json", ] success_count = 0 fail_count = 0 used_filenames: set[str] = set() # sync_playwright() is a context manager – ensures driver cleanup on exit with sync_playwright() as playwright_instance: print_playwright_debug_info(playwright_instance) verify_browsers_installed(playwright_instance) run_startup_browser_test(playwright_instance, run_dir) logger.info("Preflight checks passed; starting 100 search queries") # Nested context managers: browser and context always close cleanly with playwright_instance.chromium.launch( headless=True, timeout=BROWSER_LAUNCH_TIMEOUT_MS, ) as browser: with browser.new_context( locale="ar-SA", viewport={"width": 1280, "height": 900}, ) as context: page = context.new_page() with csv_path.open("w", newline="", encoding="utf-8-sig") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=csv_columns) writer.writeheader() for index, topic in enumerate(topics, start=1): row_timestamp = datetime.now().isoformat(timespec="seconds") logger.info("--- [%s/100] %s ---", index, topic) payload = process_single_query( page, topic, screenshots_dir, logger, used_filenames ) if payload["success"]: success_count += 1 else: fail_count += 1 try: stem = sanitize_filename(topic) + "_failed" fail_path = screenshots_dir / f"{stem}.png" page.screenshot(path=str(fail_path), full_page=True) logger.info("Failure screenshot: %s", fail_path) except Exception as shot_exc: logger.debug("Could not save failure screenshot: %s", shot_exc) write_csv_row(writer, row_timestamp, topic, payload) csv_file.flush() time.sleep(DELAY_BETWEEN_QUERIES_SEC) print("\n" + "=" * 60) print("MITHAL.SPACE SEARCH AUTOMATION – SUMMARY") print("=" * 60) print(f"Total topics: {len(topics)}") print(f"Successful queries: {success_count}") print(f"Failed queries: {fail_count}") print(f"Output directory: {run_dir.resolve()}") print(f" - CSV: {csv_path.name}") print(f" - Log: {log_path.name}") print(f" - Screenshots: {screenshots_dir.name}/") print("=" * 60) logger.info( "Finished. success=%s failed=%s dir=%s", success_count, fail_count, run_dir.resolve(), ) def main() -> None: """Run preflight + automation (sync – no asyncio event loop).""" run_automation() if __name__ == "__main__": main()