الملفات
Search_Script_-_results/mithal_search_automation.py
2026-06-10 18:44:45 +00:00

719 أسطر
27 KiB
Python
خام اللوم التاريخ

هذا الملف يحتوي على أحرف Unicode غامضة

هذا الملف يحتوي على أحرف Unicode قد تُخلط مع أحرف أخرى. إذا كنت تعتقد أن هذا مقصود، يمكنك تجاهل هذا التحذير بأمان. استخدم زر الهروب للكشف عنها.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Automate 100 Arabic search queries on https://mithal.space/ for research.
Uses Playwright (sync API) to capture screenshots and structured result data.
"""
from __future__ import annotations
# =============================================================================
# STEP 1 Verify the Playwright Python package is importable
# =============================================================================
import sys
try:
import playwright # noqa: F401 package presence check only here
except ImportError:
print("=" * 70)
print("ERROR: Playwright Python package is NOT installed for this Python.")
print("=" * 70)
print()
print("You are running:")
print(f" {sys.executable}")
print()
print("Install Playwright with the SAME interpreter you use to run this script:")
print()
print(f' "{sys.executable}" -m pip install playwright')
print(f' "{sys.executable}" -m playwright install')
print()
print("If `playwright` is not found as a command, always use -m playwright instead.")
print("=" * 70)
sys.exit(1)
# Safe to import sync API after the package check above
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
import csv
import json
import logging
import os
import re
import time
import traceback
from datetime import datetime
from pathlib import Path
from urllib.parse import quote
# Base URL and timing constants (milliseconds unless noted)
BASE_URL = "https://mithal.space/search"
NAVIGATION_TIMEOUT_MS = 15_000
RESULTS_WAIT_TIMEOUT_MS = 10_000
DELAY_BETWEEN_QUERIES_SEC = 2
MAX_RETRIES = 2 # up to 2 retries after the first attempt (3 tries total)
BROWSER_LAUNCH_TIMEOUT_MS = 10_000
STARTUP_TEST_TIMEOUT_MS = 15_000
# CSS selectors for mithal.space (with fallbacks)
RESULTS_CONTAINER_SELECTORS = [
"#results-container",
".results-container",
"#visible-results",
"a.result-link",
".result",
"div[class*='result']",
]
# JavaScript run in the page to extract results (Arabic-safe via browser DOM)
EXTRACT_RESULTS_JS = r"""() => {
const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
const links = Array.from(document.querySelectorAll('a.result-link'));
const results = links.slice(0, 10).map((anchor) => {
const box = anchor.querySelector('.result') || anchor;
const titleEl = box.querySelector('h3');
const snippetEl = box.querySelector('p');
const urlEl = box.querySelector('small.arabic-url');
const href = anchor.getAttribute('href') || '';
const dataUrl = urlEl ? urlEl.getAttribute('data-url') : '';
return {
title: norm(titleEl ? titleEl.innerText : anchor.innerText.split('\n')[0]),
url: norm(dataUrl || href),
snippet: norm(snippetEl ? snippetEl.innerText : ''),
};
});
let resultCount = null;
const bodyText = document.body ? document.body.innerText : '';
const countPatterns = [
/(\d[\d,\.]*)\s*نتيجة/iu,
/(\d[\d,\.]*)\s*نتائج/iu,
/نتائج\s*[:]?\s*(\d[\d,\.]*)/iu,
/(\d[\d,\.]*)\s*results?/i,
/about\s+(\d[\d,\.]*)\s+results?/i,
];
for (const re of countPatterns) {
const m = bodyText.match(re);
if (m) {
resultCount = parseInt(m[1].replace(/[,\.]/g, ''), 10);
if (!Number.isNaN(resultCount)) break;
}
}
if (resultCount === null) {
const stats = document.querySelector('.search-header-stats');
if (stats) {
const digits = (stats.innerText || '').match(/\d+/);
if (digits) resultCount = parseInt(digits[0], 10);
}
}
if (resultCount === null) {
const itemCount = document.querySelectorAll('a.result-link, .result').length;
if (itemCount > 0) resultCount = itemCount;
}
const special = [];
const spellPatterns = [/هل تقصد/iu, /did you mean/i, /ربما تقصد/iu];
for (const el of document.querySelectorAll('a, p, div, span, h2, h3, h4')) {
const t = norm(el.innerText);
if (!t || t.length > 300) continue;
if (spellPatterns.some((re) => re.test(t))) {
special.push({ type: 'spell_suggestion', text: t });
break;
}
}
for (const sel of ['.related-searches', '[class*="related"]', '[id*="related"]']) {
const block = document.querySelector(sel);
if (block && norm(block.innerText)) {
special.push({ type: 'related_searches', text: norm(block.innerText).slice(0, 500) });
break;
}
}
for (const sel of ['.knowledge-panel', '[class*="knowledge"]', '[id*="knowledge"]']) {
const block = document.querySelector(sel);
if (block && norm(block.innerText)) {
special.push({ type: 'knowledge_panel', text: norm(block.innerText).slice(0, 500) });
break;
}
}
const aiPanel = document.querySelector('.ai-mode-tab.active, [class*="ai-response"], #aiResponse');
if (aiPanel && norm(aiPanel.innerText)) {
special.push({ type: 'ai_mode', text: norm(aiPanel.innerText).slice(0, 300) });
}
return { result_count: resultCount, results, special_features: special };
}"""
def _is_browser_missing_error(exc: BaseException) -> bool:
"""Detect Playwright errors that mean browser binaries were not installed."""
msg = str(exc).lower()
needles = (
"executable doesn't exist",
"executable does not exist",
"browser not found",
"failed to launch",
"please run the following command to download",
"playwright install",
"browserType.launch",
)
return any(n in msg for n in needles)
def _get_playwright_package_version() -> str:
"""Read installed playwright version from package metadata."""
try:
from importlib.metadata import version
return version("playwright")
except Exception:
return "unknown (could not read package metadata)"
def print_playwright_debug_info(playwright_instance) -> None:
"""Print version, Python path, and browser install locations for debugging."""
print("\n" + "=" * 70)
print("PLAYWRIGHT DEBUG INFO")
print("=" * 70)
print(f"Playwright package version: {_get_playwright_package_version()}")
print(f"Python executable: {sys.executable}")
print(f"Python version: {sys.version.split()[0]}")
env_browsers = os.environ.get("PLAYWRIGHT_BROWSERS_PATH")
if env_browsers:
print(f"PLAYWRIGHT_BROWSERS_PATH: {env_browsers}")
else:
if sys.platform == "win32":
default_path = Path.home() / "AppData" / "Local" / "ms-playwright"
elif sys.platform == "darwin":
default_path = Path.home() / "Library" / "Caches" / "ms-playwright"
else:
default_path = Path.home() / ".cache" / "ms-playwright"
print(f"Default browsers folder: {default_path}")
print(" (override with PLAYWRIGHT_BROWSERS_PATH environment variable)")
try:
chromium_path = playwright_instance.chromium.executable_path
print(f"Chromium executable path: {chromium_path}")
print(f" exists on disk: {Path(chromium_path).exists()}")
except Exception as exc:
print(f"Chromium executable path: (could not resolve: {exc})")
print("=" * 70 + "\n")
def verify_browsers_installed(playwright_instance) -> None:
"""
STEP 2 Try launching Chromium with a short timeout.
Exit with install instructions if browser binaries are missing.
"""
print("Checking Chromium browser installation...")
try:
browser = playwright_instance.chromium.launch(
headless=True,
timeout=BROWSER_LAUNCH_TIMEOUT_MS,
)
browser.close()
print("Chromium launched successfully.\n")
except Exception as exc:
print("\n" + "=" * 70)
print("ERROR: Could not launch Chromium browser.")
print("=" * 70)
print(f"\nExact error:\n {type(exc).__name__}: {exc}\n")
if _is_browser_missing_error(exc):
print("Browser binaries appear to be missing or incomplete.")
print("\nRun these commands with the SAME Python you use for this script:\n")
print(f' "{sys.executable}" -m playwright install')
print(f' "{sys.executable}" -m playwright install chromium')
print("\nOptional install all browsers:")
print(f' "{sys.executable}" -m playwright install --with-deps')
else:
print("If this is unexpected, try reinstalling browsers:")
print(f' "{sys.executable}" -m playwright install chromium')
traceback.print_exc()
print("=" * 70)
sys.exit(1)
def run_startup_browser_test(playwright_instance, test_dir: Path) -> None:
"""
STEP 3 Quick smoke test: open example.com, screenshot, close.
Exits with the exact error if anything fails.
"""
print("Running startup browser test (https://example.com)...")
test_dir.mkdir(parents=True, exist_ok=True)
test_screenshot = test_dir / "_playwright_startup_test.png"
try:
with playwright_instance.chromium.launch(
headless=True,
timeout=BROWSER_LAUNCH_TIMEOUT_MS,
) as browser:
with browser.new_context() as context:
page = context.new_page()
page.goto(
"https://example.com",
wait_until="domcontentloaded",
timeout=STARTUP_TEST_TIMEOUT_MS,
)
page.screenshot(path=str(test_screenshot))
title = page.title()
print(f"Startup test PASSED (page title: {title!r})")
print(f"Test screenshot saved: {test_screenshot.resolve()}\n")
except Exception as exc:
print("\n" + "=" * 70)
print("ERROR: Startup browser test FAILED.")
print("=" * 70)
print(f"\nExact error:\n {type(exc).__name__}: {exc}\n")
traceback.print_exc()
print("\nFix suggestions:")
print(f' 1. "{sys.executable}" -m playwright install chromium')
print(" 2. Check firewall / proxy if navigation to example.com fails")
print(" 3. Confirm you run the script with the same Python that has playwright:")
print(f' "{sys.executable}" mithal_search_automation.py')
print("=" * 70)
sys.exit(1)
def generate_arabic_topics() -> list[str]:
"""
Build exactly 100 diverse Arabic search queries in-code (no external file).
Twelve thematic buckets; each bucket contributes several phrases.
"""
category_phrases: dict[str, list[str]] = {
"science": [
"الفيزياء الكمية",
"تلسكوب جيمس ويب",
"نظرية النسبية",
"الخلايا الجذعية",
"تغير المناخ والعلوم",
"الحمض النووي DNA",
"الطاقة الشمسية",
"المجرة والثقوب السوداء",
"التطعيمات والمناعة",
],
"technology": [
"الذكاء الاصطناعي",
"تعلم الآلة",
"الحوسبة السحابية",
"الأمن السيبراني",
"بلوك تشين",
"إنترنت الأشياء",
"الواقع الافتراضي",
"5G والاتصالات",
"روبوتات الخدمة",
],
"history": [
"الحضارة الإسلامية",
"الدولة العباسية",
"الحرب العالمية الأولى",
"الحرب العالمية الثانية",
"الحضارة المصرية القديمة",
"الأندلس",
"الدولة العثمانية",
"ثورة التكنولوجيا الصناعية",
"تاريخ الخليج العربي",
],
"culture": [
"الأدب العربي",
"الشعر الجاهلي",
"المطبخ العربي",
"التراث الشعبي",
"الخط العربي",
"الموسيقى العربية",
"الأزياء التقليدية",
"الاحتفالات الوطنية",
"اللغة العربية الفصحى",
],
"religion": [
"القرآن الكريم",
"الحديث النبوي",
"الصلاة وأحكامها",
"الزكاة والصدقة",
"الحج والعمرة",
"الأخلاق الإسلامية",
"الفقه الإسلامي",
"السيرة النبوية",
"الإيمان والتوحيد",
],
"sports": [
"كرة القدم العالمية",
"دوري روشن السعودي",
"كأس العالم FIFA",
"الألعاب الأولمبية",
"كرة السلة NBA",
"التنس وجائزة ويمبلدون",
"السباقات والماراثون",
"الرياضة النسائية",
"اللياقة البدنية",
],
"business": [
"ريادة الأعمال",
"التسويق الرقمي",
"الشركات الناشئة",
"الاستثمار في الأسهم",
"إدارة المشاريع",
"التجارة الإلكترونية",
"ريادة الأعمال الاجتماعية",
"رؤية السعودية 2030",
"الابتكار في الأعمال",
],
"health": [
"التغذية الصحية",
"السكري وطرق الوقاية",
"الصحة النفسية",
"اللقاحات والأوبئة",
"الرياضة والصحة",
"النوم وجودته",
"أمراض القلب",
"الطب عن بعد",
"الصحة العامة",
],
"arts": [
"الرسم والفن التشكيلي",
"السينما العربية",
"المسرح والدراما",
"التصوير الفوتوغرافي",
"العمارة الإسلامية",
"النحت والتركيب",
"الفن الرقمي",
"الأوبرا والباليه",
"المتاحف والمعارض",
],
"politics": [
"الأمم المتحدة",
"حقوق الإنسان",
"الدبلوماسية الدولية",
"الانتخابات والديمقراطية",
"العلاقات العربية",
"القانون الدولي",
"السياسة الخارجية",
"الحكم الرشيد",
"السلام والنزاعات",
],
"environment": [
"إعادة التدوير",
"التنوع البيولوجي",
"الطاقة المتجددة",
"تلوث الهواء",
"حماية المحيطات",
"الغابات والتصحر",
"المياه العذبة",
"الاستدامة البيئية",
"انبعاثات الكربون",
],
"daily_life": [
"وصفات طبخ سريعة",
"تربية الأطفال",
"إدارة الوقت",
"السفر والسياحة",
"التسوق عبر الإنترنت",
"العمل من المنزل",
"الادخار والميزانية",
"الحيوانات الأليفة",
"الحدائق المنزلية",
],
}
modifiers = [
"{phrase}",
"ما هو {phrase}",
"أفضل {phrase}",
"تاريخ {phrase}",
"دليل {phrase} للمبتدئين",
]
topics: list[str] = []
for category_index, (_category, phrases) in enumerate(category_phrases.items()):
for phrase_index, phrase in enumerate(phrases):
template = modifiers[(category_index + phrase_index) % len(modifiers)]
topics.append(template.format(phrase=phrase))
return topics[:100]
def sanitize_filename(topic: str, max_length: int = 80) -> str:
"""Make a Windows-safe filename stem from an Arabic topic string."""
invalid_chars = r'<>:"/\|?*'
cleaned = "".join("_" if ch in invalid_chars else ch for ch in topic)
cleaned = re.sub(r"\s+", " ", cleaned).strip().strip(".")
if not cleaned:
cleaned = "topic"
if len(cleaned) > max_length:
cleaned = cleaned[:max_length].rstrip()
return cleaned
def build_search_url(topic: str) -> str:
"""URL-encode the Arabic query for the search endpoint."""
return f"{BASE_URL}?q={quote(topic, safe='')}"
def setup_output_dirs() -> tuple[Path, Path, Path]:
"""Create output/YYYY-MM-DD_HH-MM-SS/ with screenshots/ subfolder."""
run_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
run_dir = Path("output") / run_timestamp
screenshots_dir = run_dir / "screenshots"
screenshots_dir.mkdir(parents=True, exist_ok=True)
csv_path = run_dir / "data.csv"
return run_dir, screenshots_dir, csv_path
def setup_logging(log_path: Path) -> logging.Logger:
"""Plain-text log file plus console output for actions and errors."""
logger = logging.getLogger("mithal_search")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler = logging.FileHandler(log_path, encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def wait_for_results_container(page, logger: logging.Logger) -> str | None:
"""
Wait until any known results indicator appears, or fall back after ~10 seconds.
Returns the selector that matched, or None on timeout.
"""
per_selector_ms = max(2000, RESULTS_WAIT_TIMEOUT_MS // len(RESULTS_CONTAINER_SELECTORS))
for selector in RESULTS_CONTAINER_SELECTORS:
try:
page.wait_for_selector(selector, timeout=per_selector_ms)
logger.debug("Results container found via: %s", selector)
return selector
except PlaywrightTimeoutError:
continue
logger.warning(
"No results container matched within %sms; continuing anyway.",
RESULTS_WAIT_TIMEOUT_MS,
)
return None
def extract_page_data(page) -> dict:
"""Pull result count, top 10 hits, and special UI blocks from the live DOM."""
return page.evaluate(EXTRACT_RESULTS_JS)
def process_single_query(
page,
topic: str,
screenshots_dir: Path,
logger: logging.Logger,
used_filenames: set[str],
) -> dict:
"""
Navigate, wait, screenshot, and extract data for one topic.
Retries up to MAX_RETRIES times on failure.
"""
search_url = build_search_url(topic)
last_error = ""
for attempt in range(1, MAX_RETRIES + 2):
try:
logger.info("Attempt %s | %s | %s", attempt, topic[:60], search_url)
page.goto(
search_url,
wait_until="domcontentloaded",
timeout=NAVIGATION_TIMEOUT_MS,
)
matched = wait_for_results_container(page, logger)
if matched is None:
logger.warning("Proceeding without confirmed results container for: %s", topic)
page.wait_for_timeout(500)
stem = sanitize_filename(topic)
if stem in used_filenames:
stem = f"{stem}_{attempt}"
used_filenames.add(stem)
screenshot_path = screenshots_dir / f"{stem}.png"
page.screenshot(path=str(screenshot_path), full_page=True)
logger.info("Screenshot saved: %s", screenshot_path)
extracted = extract_page_data(page)
return {
"success": True,
"error_message": "",
"search_url": search_url,
"result_count": extracted.get("result_count"),
"results": extracted.get("results", []),
"special_features": extracted.get("special_features", []),
"screenshot": str(screenshot_path),
}
except PlaywrightTimeoutError as exc:
last_error = f"Timeout: {exc}"
logger.warning("Attempt %s timed out for '%s': %s", attempt, topic, exc)
except Exception as exc:
last_error = f"{type(exc).__name__}: {exc}"
logger.warning("Attempt %s failed for '%s': %s", attempt, topic, exc)
if attempt <= MAX_RETRIES:
logger.info("Retrying (%s/%s)...", attempt, MAX_RETRIES)
time.sleep(1)
return {
"success": False,
"error_message": last_error,
"search_url": search_url,
"result_count": None,
"results": [],
"special_features": [],
"screenshot": "",
}
def write_csv_row(
writer: csv.DictWriter,
row_timestamp: str,
topic: str,
payload: dict,
) -> None:
"""Append one row to data.csv with UTF-8-safe JSON in results_json."""
special_text = json.dumps(payload.get("special_features", []), ensure_ascii=False)
results_json = json.dumps(payload.get("results", []), ensure_ascii=False)
writer.writerow(
{
"timestamp": row_timestamp,
"topic": topic,
"search_url": payload.get("search_url", ""),
"result_count": payload.get("result_count") if payload.get("result_count") is not None else "",
"success": payload.get("success", False),
"error_message": payload.get("error_message", ""),
"special_features": special_text,
"results_json": results_json,
}
)
def run_automation() -> None:
"""Main entry: preflight checks, then 100 queries with sync Playwright."""
topics = generate_arabic_topics()
if len(topics) != 100:
print(f"Warning: expected 100 topics, got {len(topics)}.")
run_dir, screenshots_dir, csv_path = setup_output_dirs()
log_path = run_dir / "log.txt"
logger = setup_logging(log_path)
logger.info("Starting mithal.space search automation")
logger.info("Output directory: %s", run_dir.resolve())
logger.info("Total topics: %s", len(topics))
csv_columns = [
"timestamp",
"topic",
"search_url",
"result_count",
"success",
"error_message",
"special_features",
"results_json",
]
success_count = 0
fail_count = 0
used_filenames: set[str] = set()
# sync_playwright() is a context manager ensures driver cleanup on exit
with sync_playwright() as playwright_instance:
print_playwright_debug_info(playwright_instance)
verify_browsers_installed(playwright_instance)
run_startup_browser_test(playwright_instance, run_dir)
logger.info("Preflight checks passed; starting 100 search queries")
# Nested context managers: browser and context always close cleanly
with playwright_instance.chromium.launch(
headless=True,
timeout=BROWSER_LAUNCH_TIMEOUT_MS,
) as browser:
with browser.new_context(
locale="ar-SA",
viewport={"width": 1280, "height": 900},
) as context:
page = context.new_page()
with csv_path.open("w", newline="", encoding="utf-8-sig") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=csv_columns)
writer.writeheader()
for index, topic in enumerate(topics, start=1):
row_timestamp = datetime.now().isoformat(timespec="seconds")
logger.info("--- [%s/100] %s ---", index, topic)
payload = process_single_query(
page, topic, screenshots_dir, logger, used_filenames
)
if payload["success"]:
success_count += 1
else:
fail_count += 1
try:
stem = sanitize_filename(topic) + "_failed"
fail_path = screenshots_dir / f"{stem}.png"
page.screenshot(path=str(fail_path), full_page=True)
logger.info("Failure screenshot: %s", fail_path)
except Exception as shot_exc:
logger.debug("Could not save failure screenshot: %s", shot_exc)
write_csv_row(writer, row_timestamp, topic, payload)
csv_file.flush()
time.sleep(DELAY_BETWEEN_QUERIES_SEC)
print("\n" + "=" * 60)
print("MITHAL.SPACE SEARCH AUTOMATION SUMMARY")
print("=" * 60)
print(f"Total topics: {len(topics)}")
print(f"Successful queries: {success_count}")
print(f"Failed queries: {fail_count}")
print(f"Output directory: {run_dir.resolve()}")
print(f" - CSV: {csv_path.name}")
print(f" - Log: {log_path.name}")
print(f" - Screenshots: {screenshots_dir.name}/")
print("=" * 60)
logger.info(
"Finished. success=%s failed=%s dir=%s",
success_count,
fail_count,
run_dir.resolve(),
)
def main() -> None:
"""Run preflight + automation (sync no asyncio event loop)."""
run_automation()
if __name__ == "__main__":
main()