Upload files to "/"

هذا الالتزام موجود في:
2026-06-10 18:44:45 +00:00
الأصل 9835ad866b
التزام b5a855ed30
2 ملفات معدلة مع 719 إضافات و0 حذوفات

718
mithal_search_automation.py Normal file
عرض الملف

@@ -0,0 +1,718 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Automate 100 Arabic search queries on https://mithal.space/ for research.
Uses Playwright (sync API) to capture screenshots and structured result data.
"""
from __future__ import annotations
# =============================================================================
# STEP 1 Verify the Playwright Python package is importable
# =============================================================================
import sys
try:
import playwright # noqa: F401 package presence check only here
except ImportError:
print("=" * 70)
print("ERROR: Playwright Python package is NOT installed for this Python.")
print("=" * 70)
print()
print("You are running:")
print(f" {sys.executable}")
print()
print("Install Playwright with the SAME interpreter you use to run this script:")
print()
print(f' "{sys.executable}" -m pip install playwright')
print(f' "{sys.executable}" -m playwright install')
print()
print("If `playwright` is not found as a command, always use -m playwright instead.")
print("=" * 70)
sys.exit(1)
# Safe to import sync API after the package check above
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
import csv
import json
import logging
import os
import re
import time
import traceback
from datetime import datetime
from pathlib import Path
from urllib.parse import quote
# Base URL and timing constants (milliseconds unless noted)
BASE_URL = "https://mithal.space/search"
NAVIGATION_TIMEOUT_MS = 15_000
RESULTS_WAIT_TIMEOUT_MS = 10_000
DELAY_BETWEEN_QUERIES_SEC = 2
MAX_RETRIES = 2 # up to 2 retries after the first attempt (3 tries total)
BROWSER_LAUNCH_TIMEOUT_MS = 10_000
STARTUP_TEST_TIMEOUT_MS = 15_000
# CSS selectors for mithal.space (with fallbacks)
RESULTS_CONTAINER_SELECTORS = [
"#results-container",
".results-container",
"#visible-results",
"a.result-link",
".result",
"div[class*='result']",
]
# JavaScript run in the page to extract results (Arabic-safe via browser DOM)
EXTRACT_RESULTS_JS = r"""() => {
const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
const links = Array.from(document.querySelectorAll('a.result-link'));
const results = links.slice(0, 10).map((anchor) => {
const box = anchor.querySelector('.result') || anchor;
const titleEl = box.querySelector('h3');
const snippetEl = box.querySelector('p');
const urlEl = box.querySelector('small.arabic-url');
const href = anchor.getAttribute('href') || '';
const dataUrl = urlEl ? urlEl.getAttribute('data-url') : '';
return {
title: norm(titleEl ? titleEl.innerText : anchor.innerText.split('\n')[0]),
url: norm(dataUrl || href),
snippet: norm(snippetEl ? snippetEl.innerText : ''),
};
});
let resultCount = null;
const bodyText = document.body ? document.body.innerText : '';
const countPatterns = [
/(\d[\d,\.]*)\s*نتيجة/iu,
/(\d[\d,\.]*)\s*نتائج/iu,
/نتائج\s*[:]?\s*(\d[\d,\.]*)/iu,
/(\d[\d,\.]*)\s*results?/i,
/about\s+(\d[\d,\.]*)\s+results?/i,
];
for (const re of countPatterns) {
const m = bodyText.match(re);
if (m) {
resultCount = parseInt(m[1].replace(/[,\.]/g, ''), 10);
if (!Number.isNaN(resultCount)) break;
}
}
if (resultCount === null) {
const stats = document.querySelector('.search-header-stats');
if (stats) {
const digits = (stats.innerText || '').match(/\d+/);
if (digits) resultCount = parseInt(digits[0], 10);
}
}
if (resultCount === null) {
const itemCount = document.querySelectorAll('a.result-link, .result').length;
if (itemCount > 0) resultCount = itemCount;
}
const special = [];
const spellPatterns = [/هل تقصد/iu, /did you mean/i, /ربما تقصد/iu];
for (const el of document.querySelectorAll('a, p, div, span, h2, h3, h4')) {
const t = norm(el.innerText);
if (!t || t.length > 300) continue;
if (spellPatterns.some((re) => re.test(t))) {
special.push({ type: 'spell_suggestion', text: t });
break;
}
}
for (const sel of ['.related-searches', '[class*="related"]', '[id*="related"]']) {
const block = document.querySelector(sel);
if (block && norm(block.innerText)) {
special.push({ type: 'related_searches', text: norm(block.innerText).slice(0, 500) });
break;
}
}
for (const sel of ['.knowledge-panel', '[class*="knowledge"]', '[id*="knowledge"]']) {
const block = document.querySelector(sel);
if (block && norm(block.innerText)) {
special.push({ type: 'knowledge_panel', text: norm(block.innerText).slice(0, 500) });
break;
}
}
const aiPanel = document.querySelector('.ai-mode-tab.active, [class*="ai-response"], #aiResponse');
if (aiPanel && norm(aiPanel.innerText)) {
special.push({ type: 'ai_mode', text: norm(aiPanel.innerText).slice(0, 300) });
}
return { result_count: resultCount, results, special_features: special };
}"""
def _is_browser_missing_error(exc: BaseException) -> bool:
"""Detect Playwright errors that mean browser binaries were not installed."""
msg = str(exc).lower()
needles = (
"executable doesn't exist",
"executable does not exist",
"browser not found",
"failed to launch",
"please run the following command to download",
"playwright install",
"browserType.launch",
)
return any(n in msg for n in needles)
def _get_playwright_package_version() -> str:
"""Read installed playwright version from package metadata."""
try:
from importlib.metadata import version
return version("playwright")
except Exception:
return "unknown (could not read package metadata)"
def print_playwright_debug_info(playwright_instance) -> None:
"""Print version, Python path, and browser install locations for debugging."""
print("\n" + "=" * 70)
print("PLAYWRIGHT DEBUG INFO")
print("=" * 70)
print(f"Playwright package version: {_get_playwright_package_version()}")
print(f"Python executable: {sys.executable}")
print(f"Python version: {sys.version.split()[0]}")
env_browsers = os.environ.get("PLAYWRIGHT_BROWSERS_PATH")
if env_browsers:
print(f"PLAYWRIGHT_BROWSERS_PATH: {env_browsers}")
else:
if sys.platform == "win32":
default_path = Path.home() / "AppData" / "Local" / "ms-playwright"
elif sys.platform == "darwin":
default_path = Path.home() / "Library" / "Caches" / "ms-playwright"
else:
default_path = Path.home() / ".cache" / "ms-playwright"
print(f"Default browsers folder: {default_path}")
print(" (override with PLAYWRIGHT_BROWSERS_PATH environment variable)")
try:
chromium_path = playwright_instance.chromium.executable_path
print(f"Chromium executable path: {chromium_path}")
print(f" exists on disk: {Path(chromium_path).exists()}")
except Exception as exc:
print(f"Chromium executable path: (could not resolve: {exc})")
print("=" * 70 + "\n")
def verify_browsers_installed(playwright_instance) -> None:
"""
STEP 2 Try launching Chromium with a short timeout.
Exit with install instructions if browser binaries are missing.
"""
print("Checking Chromium browser installation...")
try:
browser = playwright_instance.chromium.launch(
headless=True,
timeout=BROWSER_LAUNCH_TIMEOUT_MS,
)
browser.close()
print("Chromium launched successfully.\n")
except Exception as exc:
print("\n" + "=" * 70)
print("ERROR: Could not launch Chromium browser.")
print("=" * 70)
print(f"\nExact error:\n {type(exc).__name__}: {exc}\n")
if _is_browser_missing_error(exc):
print("Browser binaries appear to be missing or incomplete.")
print("\nRun these commands with the SAME Python you use for this script:\n")
print(f' "{sys.executable}" -m playwright install')
print(f' "{sys.executable}" -m playwright install chromium')
print("\nOptional install all browsers:")
print(f' "{sys.executable}" -m playwright install --with-deps')
else:
print("If this is unexpected, try reinstalling browsers:")
print(f' "{sys.executable}" -m playwright install chromium')
traceback.print_exc()
print("=" * 70)
sys.exit(1)
def run_startup_browser_test(playwright_instance, test_dir: Path) -> None:
"""
STEP 3 Quick smoke test: open example.com, screenshot, close.
Exits with the exact error if anything fails.
"""
print("Running startup browser test (https://example.com)...")
test_dir.mkdir(parents=True, exist_ok=True)
test_screenshot = test_dir / "_playwright_startup_test.png"
try:
with playwright_instance.chromium.launch(
headless=True,
timeout=BROWSER_LAUNCH_TIMEOUT_MS,
) as browser:
with browser.new_context() as context:
page = context.new_page()
page.goto(
"https://example.com",
wait_until="domcontentloaded",
timeout=STARTUP_TEST_TIMEOUT_MS,
)
page.screenshot(path=str(test_screenshot))
title = page.title()
print(f"Startup test PASSED (page title: {title!r})")
print(f"Test screenshot saved: {test_screenshot.resolve()}\n")
except Exception as exc:
print("\n" + "=" * 70)
print("ERROR: Startup browser test FAILED.")
print("=" * 70)
print(f"\nExact error:\n {type(exc).__name__}: {exc}\n")
traceback.print_exc()
print("\nFix suggestions:")
print(f' 1. "{sys.executable}" -m playwright install chromium')
print(" 2. Check firewall / proxy if navigation to example.com fails")
print(" 3. Confirm you run the script with the same Python that has playwright:")
print(f' "{sys.executable}" mithal_search_automation.py')
print("=" * 70)
sys.exit(1)
def generate_arabic_topics() -> list[str]:
"""
Build exactly 100 diverse Arabic search queries in-code (no external file).
Twelve thematic buckets; each bucket contributes several phrases.
"""
category_phrases: dict[str, list[str]] = {
"science": [
"الفيزياء الكمية",
"تلسكوب جيمس ويب",
"نظرية النسبية",
"الخلايا الجذعية",
"تغير المناخ والعلوم",
"الحمض النووي DNA",
"الطاقة الشمسية",
"المجرة والثقوب السوداء",
"التطعيمات والمناعة",
],
"technology": [
"الذكاء الاصطناعي",
"تعلم الآلة",
"الحوسبة السحابية",
"الأمن السيبراني",
"بلوك تشين",
"إنترنت الأشياء",
"الواقع الافتراضي",
"5G والاتصالات",
"روبوتات الخدمة",
],
"history": [
"الحضارة الإسلامية",
"الدولة العباسية",
"الحرب العالمية الأولى",
"الحرب العالمية الثانية",
"الحضارة المصرية القديمة",
"الأندلس",
"الدولة العثمانية",
"ثورة التكنولوجيا الصناعية",
"تاريخ الخليج العربي",
],
"culture": [
"الأدب العربي",
"الشعر الجاهلي",
"المطبخ العربي",
"التراث الشعبي",
"الخط العربي",
"الموسيقى العربية",
"الأزياء التقليدية",
"الاحتفالات الوطنية",
"اللغة العربية الفصحى",
],
"religion": [
"القرآن الكريم",
"الحديث النبوي",
"الصلاة وأحكامها",
"الزكاة والصدقة",
"الحج والعمرة",
"الأخلاق الإسلامية",
"الفقه الإسلامي",
"السيرة النبوية",
"الإيمان والتوحيد",
],
"sports": [
"كرة القدم العالمية",
"دوري روشن السعودي",
"كأس العالم FIFA",
"الألعاب الأولمبية",
"كرة السلة NBA",
"التنس وجائزة ويمبلدون",
"السباقات والماراثون",
"الرياضة النسائية",
"اللياقة البدنية",
],
"business": [
"ريادة الأعمال",
"التسويق الرقمي",
"الشركات الناشئة",
"الاستثمار في الأسهم",
"إدارة المشاريع",
"التجارة الإلكترونية",
"ريادة الأعمال الاجتماعية",
"رؤية السعودية 2030",
"الابتكار في الأعمال",
],
"health": [
"التغذية الصحية",
"السكري وطرق الوقاية",
"الصحة النفسية",
"اللقاحات والأوبئة",
"الرياضة والصحة",
"النوم وجودته",
"أمراض القلب",
"الطب عن بعد",
"الصحة العامة",
],
"arts": [
"الرسم والفن التشكيلي",
"السينما العربية",
"المسرح والدراما",
"التصوير الفوتوغرافي",
"العمارة الإسلامية",
"النحت والتركيب",
"الفن الرقمي",
"الأوبرا والباليه",
"المتاحف والمعارض",
],
"politics": [
"الأمم المتحدة",
"حقوق الإنسان",
"الدبلوماسية الدولية",
"الانتخابات والديمقراطية",
"العلاقات العربية",
"القانون الدولي",
"السياسة الخارجية",
"الحكم الرشيد",
"السلام والنزاعات",
],
"environment": [
"إعادة التدوير",
"التنوع البيولوجي",
"الطاقة المتجددة",
"تلوث الهواء",
"حماية المحيطات",
"الغابات والتصحر",
"المياه العذبة",
"الاستدامة البيئية",
"انبعاثات الكربون",
],
"daily_life": [
"وصفات طبخ سريعة",
"تربية الأطفال",
"إدارة الوقت",
"السفر والسياحة",
"التسوق عبر الإنترنت",
"العمل من المنزل",
"الادخار والميزانية",
"الحيوانات الأليفة",
"الحدائق المنزلية",
],
}
modifiers = [
"{phrase}",
"ما هو {phrase}",
"أفضل {phrase}",
"تاريخ {phrase}",
"دليل {phrase} للمبتدئين",
]
topics: list[str] = []
for category_index, (_category, phrases) in enumerate(category_phrases.items()):
for phrase_index, phrase in enumerate(phrases):
template = modifiers[(category_index + phrase_index) % len(modifiers)]
topics.append(template.format(phrase=phrase))
return topics[:100]
def sanitize_filename(topic: str, max_length: int = 80) -> str:
"""Make a Windows-safe filename stem from an Arabic topic string."""
invalid_chars = r'<>:"/\|?*'
cleaned = "".join("_" if ch in invalid_chars else ch for ch in topic)
cleaned = re.sub(r"\s+", " ", cleaned).strip().strip(".")
if not cleaned:
cleaned = "topic"
if len(cleaned) > max_length:
cleaned = cleaned[:max_length].rstrip()
return cleaned
def build_search_url(topic: str) -> str:
"""URL-encode the Arabic query for the search endpoint."""
return f"{BASE_URL}?q={quote(topic, safe='')}"
def setup_output_dirs() -> tuple[Path, Path, Path]:
"""Create output/YYYY-MM-DD_HH-MM-SS/ with screenshots/ subfolder."""
run_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
run_dir = Path("output") / run_timestamp
screenshots_dir = run_dir / "screenshots"
screenshots_dir.mkdir(parents=True, exist_ok=True)
csv_path = run_dir / "data.csv"
return run_dir, screenshots_dir, csv_path
def setup_logging(log_path: Path) -> logging.Logger:
"""Plain-text log file plus console output for actions and errors."""
logger = logging.getLogger("mithal_search")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler = logging.FileHandler(log_path, encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def wait_for_results_container(page, logger: logging.Logger) -> str | None:
"""
Wait until any known results indicator appears, or fall back after ~10 seconds.
Returns the selector that matched, or None on timeout.
"""
per_selector_ms = max(2000, RESULTS_WAIT_TIMEOUT_MS // len(RESULTS_CONTAINER_SELECTORS))
for selector in RESULTS_CONTAINER_SELECTORS:
try:
page.wait_for_selector(selector, timeout=per_selector_ms)
logger.debug("Results container found via: %s", selector)
return selector
except PlaywrightTimeoutError:
continue
logger.warning(
"No results container matched within %sms; continuing anyway.",
RESULTS_WAIT_TIMEOUT_MS,
)
return None
def extract_page_data(page) -> dict:
"""Pull result count, top 10 hits, and special UI blocks from the live DOM."""
return page.evaluate(EXTRACT_RESULTS_JS)
def process_single_query(
page,
topic: str,
screenshots_dir: Path,
logger: logging.Logger,
used_filenames: set[str],
) -> dict:
"""
Navigate, wait, screenshot, and extract data for one topic.
Retries up to MAX_RETRIES times on failure.
"""
search_url = build_search_url(topic)
last_error = ""
for attempt in range(1, MAX_RETRIES + 2):
try:
logger.info("Attempt %s | %s | %s", attempt, topic[:60], search_url)
page.goto(
search_url,
wait_until="domcontentloaded",
timeout=NAVIGATION_TIMEOUT_MS,
)
matched = wait_for_results_container(page, logger)
if matched is None:
logger.warning("Proceeding without confirmed results container for: %s", topic)
page.wait_for_timeout(500)
stem = sanitize_filename(topic)
if stem in used_filenames:
stem = f"{stem}_{attempt}"
used_filenames.add(stem)
screenshot_path = screenshots_dir / f"{stem}.png"
page.screenshot(path=str(screenshot_path), full_page=True)
logger.info("Screenshot saved: %s", screenshot_path)
extracted = extract_page_data(page)
return {
"success": True,
"error_message": "",
"search_url": search_url,
"result_count": extracted.get("result_count"),
"results": extracted.get("results", []),
"special_features": extracted.get("special_features", []),
"screenshot": str(screenshot_path),
}
except PlaywrightTimeoutError as exc:
last_error = f"Timeout: {exc}"
logger.warning("Attempt %s timed out for '%s': %s", attempt, topic, exc)
except Exception as exc:
last_error = f"{type(exc).__name__}: {exc}"
logger.warning("Attempt %s failed for '%s': %s", attempt, topic, exc)
if attempt <= MAX_RETRIES:
logger.info("Retrying (%s/%s)...", attempt, MAX_RETRIES)
time.sleep(1)
return {
"success": False,
"error_message": last_error,
"search_url": search_url,
"result_count": None,
"results": [],
"special_features": [],
"screenshot": "",
}
def write_csv_row(
writer: csv.DictWriter,
row_timestamp: str,
topic: str,
payload: dict,
) -> None:
"""Append one row to data.csv with UTF-8-safe JSON in results_json."""
special_text = json.dumps(payload.get("special_features", []), ensure_ascii=False)
results_json = json.dumps(payload.get("results", []), ensure_ascii=False)
writer.writerow(
{
"timestamp": row_timestamp,
"topic": topic,
"search_url": payload.get("search_url", ""),
"result_count": payload.get("result_count") if payload.get("result_count") is not None else "",
"success": payload.get("success", False),
"error_message": payload.get("error_message", ""),
"special_features": special_text,
"results_json": results_json,
}
)
def run_automation() -> None:
"""Main entry: preflight checks, then 100 queries with sync Playwright."""
topics = generate_arabic_topics()
if len(topics) != 100:
print(f"Warning: expected 100 topics, got {len(topics)}.")
run_dir, screenshots_dir, csv_path = setup_output_dirs()
log_path = run_dir / "log.txt"
logger = setup_logging(log_path)
logger.info("Starting mithal.space search automation")
logger.info("Output directory: %s", run_dir.resolve())
logger.info("Total topics: %s", len(topics))
csv_columns = [
"timestamp",
"topic",
"search_url",
"result_count",
"success",
"error_message",
"special_features",
"results_json",
]
success_count = 0
fail_count = 0
used_filenames: set[str] = set()
# sync_playwright() is a context manager ensures driver cleanup on exit
with sync_playwright() as playwright_instance:
print_playwright_debug_info(playwright_instance)
verify_browsers_installed(playwright_instance)
run_startup_browser_test(playwright_instance, run_dir)
logger.info("Preflight checks passed; starting 100 search queries")
# Nested context managers: browser and context always close cleanly
with playwright_instance.chromium.launch(
headless=True,
timeout=BROWSER_LAUNCH_TIMEOUT_MS,
) as browser:
with browser.new_context(
locale="ar-SA",
viewport={"width": 1280, "height": 900},
) as context:
page = context.new_page()
with csv_path.open("w", newline="", encoding="utf-8-sig") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=csv_columns)
writer.writeheader()
for index, topic in enumerate(topics, start=1):
row_timestamp = datetime.now().isoformat(timespec="seconds")
logger.info("--- [%s/100] %s ---", index, topic)
payload = process_single_query(
page, topic, screenshots_dir, logger, used_filenames
)
if payload["success"]:
success_count += 1
else:
fail_count += 1
try:
stem = sanitize_filename(topic) + "_failed"
fail_path = screenshots_dir / f"{stem}.png"
page.screenshot(path=str(fail_path), full_page=True)
logger.info("Failure screenshot: %s", fail_path)
except Exception as shot_exc:
logger.debug("Could not save failure screenshot: %s", shot_exc)
write_csv_row(writer, row_timestamp, topic, payload)
csv_file.flush()
time.sleep(DELAY_BETWEEN_QUERIES_SEC)
print("\n" + "=" * 60)
print("MITHAL.SPACE SEARCH AUTOMATION SUMMARY")
print("=" * 60)
print(f"Total topics: {len(topics)}")
print(f"Successful queries: {success_count}")
print(f"Failed queries: {fail_count}")
print(f"Output directory: {run_dir.resolve()}")
print(f" - CSV: {csv_path.name}")
print(f" - Log: {log_path.name}")
print(f" - Screenshots: {screenshots_dir.name}/")
print("=" * 60)
logger.info(
"Finished. success=%s failed=%s dir=%s",
success_count,
fail_count,
run_dir.resolve(),
)
def main() -> None:
"""Run preflight + automation (sync no asyncio event loop)."""
run_automation()
if __name__ == "__main__":
main()

1
requirements.txt Normal file
عرض الملف

@@ -0,0 +1 @@
playwright>=1.40.0