commit 960e46bd6d68dfa46d2b09559c92cff01e8ccd00 Author: Abdulaziz04 Date: Sun Sep 7 00:04:26 2025 +0000 رفع الملفات إلى "app" diff --git a/app/Dockerfile b/app/Dockerfile new file mode 100644 index 0000000..f098f9f --- /dev/null +++ b/app/Dockerfile @@ -0,0 +1,123 @@ +# Official Playwright image (includes browsers) +FROM mcr.microsoft.com/playwright/python:v1.44.0 + +# Build-time arg: اجعلها 1 لفشل البناء إذا وُجدت أي استخدامات pkg_resources في site-packages +ARG FAIL_ON_PKG_RESOURCES=0 +ENV FAIL_ON_PKG_RESOURCES=${FAIL_ON_PKG_RESOURCES} + +# Non-interactive apt +ENV DEBIAN_FRONTEND=noninteractive +WORKDIR /app + +# Copy requirements first to leverage Docker cache +COPY requirements.txt . + +# Install useful system packages +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + git \ + build-essential \ + libxml2-dev \ + libxslt1-dev \ + libssl-dev \ + libffi-dev \ + ca-certificates \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Upgrade pip/setuptools/wheel to latest (we aim to support setuptools >= 81 after code migration) +RUN python -m pip install --upgrade pip setuptools wheel + +# Install runtime Python deps from requirements +RUN pip install --no-cache-dir -r requirements.txt + +# Install auxiliary packages / backports & tooling we rely on +# - packaging: requirement parsing & version handling +# - importlib_metadata / importlib_resources: backports if running on older Python +# - wafw00f: WAF detection tool used by the project +RUN pip install --no-cache-dir \ + packaging \ + importlib_metadata \ + importlib_resources \ + wafw00f + +# Copy the rest of the project files +COPY . . + +# Create evidence directory in tmpfs location and set permissions (used by utils.store_raw_evidence) +ENV SUPERR_EVIDENCE_DIR=/dev/shm/superrecon_evidence +RUN mkdir -p ${SUPERR_EVIDENCE_DIR} \ + && chown -R pwuser:pwuser ${SUPERR_EVIDENCE_DIR} \ + && chmod 750 ${SUPERR_EVIDENCE_DIR} || true + +# Optional build-time check: look for any remaining 'import pkg_resources' usages +# If FAIL_ON_PKG_RESOURCES=1 the build will fail when any occurrences are found. +# This check scans site-packages for python files mentioning pkg_resources. +RUN python - <<'PY' || (test "$FAIL_ON_PKG_RESOURCES" = "0" && exit 0) +import os, sys, site +from pathlib import Path + +def scan_paths(paths): + hits = [] + for root in paths: + rootp = Path(root) + if not rootp.exists(): + continue + for p in rootp.rglob("*.py"): + try: + txt = p.read_text(encoding="utf-8", errors="ignore") + except Exception: + continue + if "import pkg_resources" in txt or "pkg_resources." in txt: + hits.append(str(p)) + return hits + +paths = [] +try: + sp = site.getsitepackages() + for p in sp: + paths.append(p) +except Exception: + # fallback common locations + paths += [ + "/usr/local/lib/python3.10/site-packages", + "/usr/lib/python3/dist-packages", + "/usr/local/lib/python3.9/site-packages", + ] + +hits = scan_paths(paths) +if hits: + print("==========================================") + print("WARNING: Detected uses of pkg_resources in installed packages (first 200 shown):") + for h in hits[:200]: + print(" -", h) + print("==========================================") + # If FAIL_ON_PKG_RESOURCES is set, fail the build + if os.environ.get("FAIL_ON_PKG_RESOURCES", "0") == "1": + print("FAIL_ON_PKG_RESOURCES=1 -> Failing build due to pkg_resources usages.") + sys.exit(1) +else: + print("No pkg_resources usages found in scanned site-packages paths.") +PY + +# Ensure non-root runtime (pwuser exists in Playwright base image) +USER pwuser + +# Expose application port (configurable via APP_PORT env) +ENV APP_PORT=8000 +EXPOSE ${APP_PORT} + +# Healthcheck +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD curl -f http://127.0.0.1:${APP_PORT}/health || exit 1 + +# Default environment variables (can be overridden at runtime) +ENV PYTHONUNBUFFERED=1 +ENV MAX_CONCURRENT_SCANS=8 +ENV SCAN_TIMEOUT=180 +ENV RATE_LIMIT="15/minute" +ENV LOG_LEVEL=INFO +ENV UVICORN_WORKERS=1 + +# Default command: run Uvicorn (assumes app package path app.main:app) +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] diff --git a/app/analyze_site.py b/app/analyze_site.py new file mode 100644 index 0000000..1d71d0d --- /dev/null +++ b/app/analyze_site.py @@ -0,0 +1,274 @@ +# analyze_site.py +# Updated to match the improved utils.py (compat_resources, run_scan_for_url, etc.) +import logging +from typing import Dict, Any, Optional +import asyncio +import sys + +# Try flexible imports so this file works whether utils.py is at project root or inside `app` package. +try: + # Preferred when utils is inside the `app` package (app/utils.py) + from app.utils import safe_json, run_scan_for_url, generate_scan_id +except Exception: + try: + # Fallback to top-level utils.py + from utils import safe_json, run_scan_for_url, generate_scan_id # type: ignore + except Exception as e: + raise ImportError("Could not import required utilities (safe_json, run_scan_for_url, generate_scan_id).") from e + + +logger = logging.getLogger("SuperRecon") +if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) +logger.setLevel("INFO") + + +async def run_scan(target_url: str, render_js: bool = False) -> Dict[str, Any]: + """ + Orchestrates a full site scan for a single URL using run_scan_for_url from utils. + Returns the raw report (dict) or a safe_json-wrapped error dict. + """ + scan_id = generate_scan_id() + logger.info(f"Starting scan {scan_id} for URL: {target_url} (render_js={render_js})") + + try: + # run_scan_for_url already accepts scan_id and render_js and returns a dict + report = await run_scan_for_url(target_url, render_js=render_js, scan_id=scan_id) + logger.info(f"Scan {scan_id} completed successfully for {target_url}.") + # Ensure report is a dict and include scan_id + if not isinstance(report, dict): + report = {"error": "invalid_report", "details": "Scanner returned non-dict result", "raw": str(report)} + report.setdefault("scan_id", scan_id) + report.setdefault("scanned_url", report.get("url", target_url)) + return report + except Exception as e: + logger.error(f"Scan {scan_id} failed with error: {e}", exc_info=True) + return safe_json({"error": "Scan failed", "details": str(e), "scan_id": scan_id, "scanned_url": target_url}) + + +def _fmt_confidence(conf: Optional[Any]) -> str: + try: + if conf is None: + return "0%" + # If float in [0,1], convert to percent + if isinstance(conf, float) and 0.0 <= conf <= 1.0: + return f"{int(round(conf * 100))}%" + # else try numeric + val = int(float(conf)) + if 0 <= val <= 100: + return f"{val}%" + return f"{max(0, min(val, 100))}%" + except Exception: + try: + return f"{int(conf)}%" + except Exception: + return str(conf) + + +def format_final_report(report_data: Dict[str, Any]) -> str: + """ + Formats the raw scan report data into a human-readable, well-structured string (Arabic). + Tolerant to different shapes of report_data (single report or wrapper). + """ + if "error" in report_data: + return f"❌ تقرير الفحص: حدث خطأ\n\n{report_data.get('details', 'لا يوجد تفاصيل')}" + + # Accept either {"full_report": [...]} or a single report dict + full_reports = report_data.get("full_report") + if not full_reports: + # If the provided dict already looks like a single scan report, wrap it + if "scanned_url" in report_data or "url" in report_data: + full_reports = [report_data] + else: + # If a summary with list of reports is provided, try extracting + if isinstance(report_data.get("reports"), list): + full_reports = report_data.get("reports") + else: + return "⚠️ لم يتم العثور على تقارير فحص.\nقد يكون الموقع غير متاح أو لم يتم تنفيذ الفحص." + + output_str = "✨ **تقرير فحص شامل للموقع** ✨\n\n" + output_str += "---\n\n" + + # Summary of scanned URLs (if available) + scanned_urls_summary = report_data.get("summary", {}).get("scanned_urls", []) + output_str += "**✅ الصفحات التي تم فحصها:**\n" + if scanned_urls_summary: + output_str += "\n".join([f"• {url}" for url in scanned_urls_summary]) + "\n\n" + else: + collected = [r.get("scanned_url") or r.get("url") for r in full_reports if r.get("scanned_url") or r.get("url")] + if collected: + output_str += "\n".join([f"• {url}" for url in collected]) + "\n\n" + else: + output_str += "• لم يتم توفير ملخص للروابط المفحوصة.\n\n" + + for report in full_reports: + url = report.get("scanned_url", report.get("url", "URL غير معروف")) + scan_id = report.get("scan_id", "") + scanned_at = report.get("scanned_at", report.get("scanned_at", "غير معروف")) + + output_str += "---\n\n" + output_str += f"### **🌐 تقرير الفحص لصفحة: {url}**\n" + if scan_id: + output_str += f"- **معرّف الفحص:** `{scan_id}`\n" + if scanned_at: + output_str += f"- **وقت الفحص:** {scanned_at}\n" + output_str += "\n" + + # Security Headers + output_str += "**🛡️ رؤوس الأمان (Security Headers):**\n" + sec_headers = report.get("security_headers", {}) + if sec_headers: + for h, d in sec_headers.items(): + try: + # d may be dict with status/value + if isinstance(d, dict): + status = d.get("status", "") + value = d.get("value", "") + output_str += f" - **{h}**: {status} — `{value}`\n" + else: + output_str += f" - **{h}**: {d}\n" + except Exception: + output_str += f" - **{h}**: {d}\n" + else: + output_str += " - لم يتم العثور على رؤوس أمان أساسية.\n" + output_str += "\n" + + # DNS Records + output_str += "**📡 معلومات DNS:**\n" + dns_records = report.get("dns_records", {}) + if dns_records: + for rtype, records in dns_records.items(): + try: + recs_display = ", ".join(records) if isinstance(records, (list, tuple)) and records else str(records) + except Exception: + recs_display = str(records) + output_str += f" - **{rtype}**: {recs_display}\n" + else: + output_str += " - لا توجد سجلات DNS أو لم يتم استردادها.\n" + output_str += "\n" + + # SSL Info + output_str += "**🔒 شهادة SSL:**\n" + ssl_info = report.get("ssl_info", {}) or {} + if ssl_info.get("valid"): + not_after = ssl_info.get("not_after", "غير معروف") + issuer = ssl_info.get("issuer") or {} + issuer_cn = issuer.get("CN") if isinstance(issuer, dict) else issuer + output_str += f" - ✅ صالحة حتى: {not_after}\n" + output_str += f" - جهة الإصدار: {issuer_cn if issuer_cn else issuer}\n" + elif ssl_info.get("error"): + output_str += f" - ❌ خطأ في فحص الشهادة: {ssl_info.get('error')}\n" + else: + output_str += " - ❌ غير مفعلة أو غير متاحة.\n" + output_str += "\n" + + # Technologies + output_str += "**🛠️ التقنيات المكتشفة:**\n" + teks = report.get("technologies", []) or [] + if teks: + # Sort by confidence desc and show all (or limit if you want) + for t in sorted(teks, key=lambda x: x.get('confidence', 0), reverse=True): + name = t.get("name", "غير معروف") + confidence = _fmt_confidence(t.get("confidence", 0)) + category = t.get("categories") or t.get("category") or [] + if isinstance(category, (list, tuple)): + cat_display = ", ".join(category) if category else "غير محدد" + else: + cat_display = str(category) + source = t.get("source", "غير معروف") + version = t.get("version", "") or "" + emoji = "⭐" if int(confidence.strip("%")) > 90 else "👍" if int(confidence.strip("%")) > 70 else "🧐" + output_str += f" - {emoji} **{name}**" + if version: + output_str += f" (الإصدار: {version})" + output_str += f"\n" + output_str += f" - **الفئة**: {cat_display}\n" + output_str += f" - **الثقة**: {confidence}\n" + output_str += f" - **المصدر**: {source}\n" + else: + output_str += " - لم يتم العثور على تقنيات.\n" + output_str += "\n" + + # Robots.txt + output_str += "**🤖 ملف Robots.txt:**\n" + robots_info = report.get("robots_info", {}) or {} + if robots_info.get("exists"): + output_str += f" - ✅ **موجود** في: {robots_info.get('fetched_from')}\n" + if robots_info.get("sitemaps"): + s = robots_info.get("sitemaps") + output_str += f" - **Sitemaps**: {', '.join(s)}\n" + if robots_info.get("rules"): + output_str += " - **قواعد**: يحتوي على قواعد Allow/Disallow.\n" + else: + tried = robots_info.get("tried") or [] + if tried: + output_str += f" - ❌ غير موجود بعد محاولة الوصول إلى: {', '.join(tried)}\n" + else: + output_str += " - ❌ غير موجود أو لم يتم فحصه.\n" + output_str += "\n" + + # Payment Methods + output_str += "**💳 طرق الدفع:**\n" + payment_methods = report.get("payment_methods", []) or [] + if payment_methods: + names = [] + for method in payment_methods: + if isinstance(method, dict): + names.append(method.get("name") or str(method)) + else: + names.append(str(method)) + output_str += f" - تم العثور على: {', '.join(names)}\n" + else: + output_str += " - لم يتم العثور على طرق دفع معروفة.\n" + output_str += "\n" + + # Trackers & Analytics + output_str += "**📈 المتتبعات (Trackers & Analytics):**\n" + trackers_info = report.get("trackers_and_analytics", []) or [] + if trackers_info: + output_str += " - " + ", ".join(trackers_info) + "\n" + else: + output_str += " - لا توجد متتبعات معروفة.\n" + output_str += "\n" + + # WAF & CDN + output_str += "**🛡️ WAF و CDN (استدلالي):**\n" + waf = report.get("waf_info") or report.get("waf") or {} + if waf and waf.get("detected"): + output_str += f" - WAF مكتشف: {waf.get('provider')} (ثقة: {_fmt_confidence(waf.get('confidence'))})\n" + else: + output_str += " - لا يوجد WAF واضح أو لم يتم اكتشافه.\n" + cdn = report.get("cdn_info") or report.get("cdn") or {} + if cdn and cdn.get("provider"): + output_str += f" - CDN مفترض/مكتشف: {cdn.get('provider')} (ثقة: {_fmt_confidence(cdn.get('confidence'))})\n" + else: + output_str += " - لا يوجد CDN واضح.\n" + output_str += "\n" + + # Final notes + output_str += f"**📝 ملاحظات:**\n" + output_str += f"- مسار الأدلة الخام محفوظ في: {report.get('raw_evidence', {}).get('body', {}).get('path', 'غير متوفر')} (إن وُجد)\n" + output_str += "\n\n" + + output_str += "---\n\n✨ تم الفحص بنجاح.\n" + return output_str + + +if __name__ == "__main__": + # CLI usage: python analyze_site.py + if len(sys.argv) > 1: + test_url = sys.argv[1] + render_js_flag = False + if len(sys.argv) > 2 and sys.argv[2].lower() in ("true", "1", "yes", "y"): + render_js_flag = True + try: + res = asyncio.run(run_scan(test_url, render_js=render_js_flag)) + formatted = format_final_report({"full_report": [res], "summary": {"scanned_urls": [test_url]}}) + print(formatted) + except Exception as e: + print("فشل تشغيل الفحص:", e) + else: + print("Usage: python analyze_site.py [render_js: true|false]") diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..313837c --- /dev/null +++ b/app/main.py @@ -0,0 +1,220 @@ +import os +import logging +import asyncio +from fastapi import FastAPI, HTTPException, Query, Request +from fastapi.responses import JSONResponse, HTMLResponse +from fastapi.middleware.cors import CORSMiddleware +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.middleware import SlowAPIMiddleware +from pydantic import BaseModel, HttpUrl +from typing import Optional, List +from dotenv import load_dotenv +from datetime import datetime, timezone +from urllib.parse import urlparse +import ipaddress + +# تحميل المتغيرات من ملف .env +load_dotenv() + +# Logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger("SuperReconAPI") +logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) + +# إنشاء تطبيق FastAPI +app = FastAPI( + title="SuperRecon API", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc" +) + +# تكوين CORS — تنظيف القيم وفحص القوائم الفارغة +raw_origins = os.getenv("CORS_ALLOW_ORIGINS", "") +if raw_origins.strip() == "": + allow_origins: List[str] = ["*"] +else: + allow_origins = [o.strip() for o in raw_origins.split(",") if o.strip()] + +app.add_middleware( + CORSMiddleware, + allow_origins=allow_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# تهيئة Rate Limiter +limiter = Limiter(key_func=get_remote_address) +app.state.limiter = limiter +app.add_exception_handler(429, _rate_limit_exceeded_handler) +app.add_middleware(SlowAPIMiddleware) + +# المتغيرات البيئية +MAX_CONCURRENT_SCANS = int(os.getenv("MAX_CONCURRENT_SCANS", "8")) +SCAN_TIMEOUT = int(os.getenv("SCAN_TIMEOUT", "180")) # seconds +rate_limit = os.getenv("RATE_LIMIT", "15/minute") + +scan_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SCANS) + +# محاولة استيراد run_scan بمرونة (app.analyze_site أو analyze_site) +try: + from app.analyze_site import run_scan # type: ignore +except Exception: + try: + from analyze_site import run_scan # type: ignore + except Exception as e: + # لنفشل مبكرًا مع رسالة واضحة لو لم نتمكن من إيجاد دالة الفحص + logger.exception("Cannot import run_scan from app.analyze_site or analyze_site.") + raise ImportError("Could not import run_scan from app.analyze_site or analyze_site.") from e + + +class ReconRequest(BaseModel): + url: HttpUrl + render_js: Optional[bool] = True + + +@app.get("/", response_class=HTMLResponse) +async def index(): + html = """ + SuperRecon API + + ✅ SuperRecon API جاهز
+ استخدم الرابط التالي لفحص موقع:
+ /recon?url=https://example.com

+ عرض وثائق API + """ + return HTMLResponse(content=html) + + +@app.get("/health") +async def health(): + return { + "status": "healthy", + "service": "SuperRecon API", + "version": "1.0.0", + "timestamp": datetime.now(timezone.utc).isoformat() + } + + +def _is_ip_private(ip_str: str) -> bool: + """Returns True if ip_str is private/reserved/loopback/link-local/multicast.""" + try: + ip_obj = ipaddress.ip_address(ip_str) + return ( + ip_obj.is_private + or ip_obj.is_loopback + or ip_obj.is_link_local + or ip_obj.is_reserved + or ip_obj.is_multicast + or ip_obj.is_unspecified + ) + except Exception: + return False + + +async def _ensure_not_local_target(parsed_url): + """If the target resolves to private/loopback IPs, raise HTTPException (for safety).""" + host = parsed_url.hostname + if not host: + raise HTTPException(status_code=400, detail="Invalid host in URL.") + # if host is an IP literal + try: + ipaddress.ip_address(host) + if _is_ip_private(host): + raise HTTPException(status_code=400, detail="Scanning private/loopback addresses is not allowed.") + return + except ValueError: + # hostname - resolve asynchronously using event loop resolver + try: + loop = asyncio.get_running_loop() + # getaddrinfo returns list of tuples; we'll extract the sockaddr[0] + infos = await loop.getaddrinfo(host, None) + ips = set(sockaddr[0] for _, _, _, _, sockaddr in infos if sockaddr) + if not ips: + raise HTTPException(status_code=400, detail="Target hostname could not be resolved to any IP.") + for ip in ips: + if _is_ip_private(ip): + raise HTTPException(status_code=400, detail="Target resolves to private/loopback addresses; scanning is blocked.") + return + except HTTPException: + raise + except Exception as e: + logger.debug(f"DNS resolution error for host {host}: {e}") + raise HTTPException(status_code=400, detail="Target hostname could not be resolved.") from e + + +@app.get("/recon") +@limiter.limit(rate_limit) +async def recon_get( + request: Request, + url: str = Query(..., description="Target URL to analyze (e.g., https://example.com)"), + render_js: bool = Query(True, description="Render page with JavaScript before analysis") +): + # validate via pydantic model then dispatch to the POST handler + payload = ReconRequest(url=url, render_js=render_js) + return await recon_post(request, payload) + + +@app.post("/recon") +@limiter.limit(rate_limit) +async def recon_post(request: Request, payload: ReconRequest): + url_str = str(payload.url) + render_js = payload.render_js + + if not url_str: + raise HTTPException(status_code=400, detail="Missing 'url' in payload") + + # basic sanity: avoid extremely long URLs (simple DoS protection) + if len(url_str) > 4096: + raise HTTPException(status_code=400, detail="URL too long.") + + parsed = urlparse(url_str) + if parsed.scheme.lower() not in ("http", "https"): + raise HTTPException(status_code=400, detail="Only http and https schemes are allowed.") + + # Ensure the target is not local/private + await _ensure_not_local_target(parsed) + + # get remote address (with fallback) + try: + remote_addr = get_remote_address(request) + except Exception: + try: + remote_addr = request.client.host # type: ignore + except Exception: + remote_addr = "unknown" + + logger.info(f"Scan requested by {remote_addr} for {url_str} (render_js={render_js})") + + async with scan_semaphore: + try: + logger.info(f"Starting scan for {url_str}") + # run_scan is expected to return a serializable dict (or safe_json already) + result = await asyncio.wait_for( + run_scan(url_str, render_js=render_js), + timeout=SCAN_TIMEOUT + ) + logger.info(f"Scan completed for {url_str}") + # ensure result is JSON serializable; if not, wrap minimally + if not isinstance(result, dict): + logger.warning("run_scan returned non-dict result; coercing to dict.") + result = {"result": str(result)} + return JSONResponse(content=result) + except asyncio.TimeoutError: + logger.warning(f"Scan timed out for {url_str}") + return JSONResponse( + status_code=504, + content={ + "success": False, + "error": "timeout", + "message": f"Scan timed out after {SCAN_TIMEOUT} seconds" + } + ) + except HTTPException: + # re-raise HTTPException as-is (e.g., blocked target) + raise + except Exception as e: + logger.exception("Scan failed") + raise HTTPException(status_code=500, detail="Internal server error during scan.") from e diff --git a/app/requirements.txt b/app/requirements.txt new file mode 100644 index 0000000..a35987a --- /dev/null +++ b/app/requirements.txt @@ -0,0 +1,30 @@ +aiohttp +apify-client +beautifulsoup4 +builtwith +certifi +charset-normalizer +cryptography +dnspython +fake-useragent +fastapi[all] +httpx[http2] +importlib_metadata; python_version < "3.10" +importlib_resources; python_version < "3.9" +ipwhois +js2py +lxml +packaging +playwright +pyOpenSSL +python-Wappalyzer +python-dotenv +python-socks +python-whois +redis +requests +slowapi +tenacity +tldextract +uvicorn[standard] +wafw00f diff --git a/app/utils.py b/app/utils.py new file mode 100644 index 0000000..178fb5f --- /dev/null +++ b/app/utils.py @@ -0,0 +1,1423 @@ +# utils.py +# SuperRecon utils - improved (compat_resources + Wappalyzer DB validation + regex fixes) +import os +import re +import json +import socket +import logging +import ssl +import gzip +import OpenSSL +import dns.resolver +import httpx +from urllib.parse import urljoin, urlparse, quote_plus +from bs4 import BeautifulSoup +from datetime import datetime, date, timezone +from collections import defaultdict +from typing import List, Dict, Any, Optional, Tuple +import asyncio +import random +import ipaddress +import ipwhois +import time +from functools import lru_cache +from playwright.async_api import async_playwright +import whois +from Wappalyzer import Wappalyzer, WebPage +import builtwith +import subprocess +import hashlib + +# optional import for charset detection (best-effort) +try: + from charset_normalizer import from_bytes +except Exception: + from_bytes = None + +# optional brotli decompress +try: + import brotli +except Exception: + brotli = None + +# -------------------- Logger setup -------------------- +logger = logging.getLogger("SuperRecon.utils") +if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) +logger.setLevel(os.environ.get("SUPERR_LOG_LEVEL", "INFO")) + +# Directory to store raw evidence +EVIDENCE_DIR = os.environ.get("SUPERR_EVIDENCE_DIR", "./evidence") +os.makedirs(EVIDENCE_DIR, exist_ok=True) + + +# -------------------- Compatibility layer (replacement for pkg_resources) -------------------- +# Provides: get_version, resource_bytes, resource_text, resource_path (context manager), +# iter_entry_points, load_entry_point, parse_requirement, installed_distributions, dist_metadata +try: + # importlib.metadata (stdlib) with backport fallback + from importlib.metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore +except Exception: + from importlib_metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore + +# importlib.resources with backport fallback +try: + from importlib.resources import files, as_file, read_binary, read_text # type: ignore +except Exception: + from importlib_resources import files, as_file, read_binary, read_text # type: ignore + +from contextlib import contextmanager +from packaging.requirements import Requirement +from packaging.version import Version, InvalidVersion + +def get_version(package_name: str) -> Optional[str]: + try: + return _version(package_name) + except Exception: + return None + +def resource_bytes(package: str, resource: str) -> bytes: + return read_binary(package, resource) + +def resource_text(package: str, resource: str, encoding: str = "utf-8") -> str: + return read_text(package, resource, encoding=encoding) + +@contextmanager +def resource_path(package: str, resource: str): + """ + Yields a filesystem Path for resource if possible. + Usage: + with resource_path('mypkg', 'data/file.txt') as p: + open(p)... + """ + p = files(package).joinpath(resource) + with as_file(p) as fp: + yield fp + +class EP: + def __init__(self, ep): + self._ep = ep + + @property + def name(self): + return self._ep.name + + @property + def value(self): + return self._ep.value + + def load(self): + return self._ep.load() + +def iter_entry_points(group: str): + eps = entry_points() + try: + group_eps = eps.select(group=group) # py3.10+ + except Exception: + try: + group_eps = [e for e in eps if getattr(e, "group", None) == group] + except Exception: + group_eps = eps.get(group, []) # type: ignore + for e in group_eps: + yield EP(e) + +def load_entry_point(group: str, name: str): + for ep in iter_entry_points(group): + if ep.name == name: + return ep.load() + raise LookupError(f"entry point {group}:{name} not found") + +def parse_requirement(req_str: str) -> Requirement: + return Requirement(req_str) + +def installed_distributions(): + for dist in distributions(): + yield dist + +def dist_metadata(name: str): + try: + return distribution(name).metadata + except PackageNotFoundError: + return None + +def dist_files(name: str): + try: + return distribution(name).files + except PackageNotFoundError: + return None + + +# -------------------- Safe JSON Helpers -------------------- +def _make_json_safe(obj): + if obj is None or isinstance(obj, (bool, int, float, str)): + return obj + if isinstance(obj, dict): + new = {} + for k, v in obj.items(): + try: + key = str(k) + except Exception: + key = repr(k) + new[key] = _make_json_safe(v) + return new + if isinstance(obj, (list, tuple, set)): + return [_make_json_safe(i) for i in obj] + try: + if isinstance(obj, (datetime, date)): + return obj.isoformat() + except Exception: + pass + try: + import httpx as _httpx + if isinstance(obj, _httpx.Response): + try: + text_snippet = obj.text[:1000] + except Exception: + text_snippet = None + return { + "status_code": obj.status_code, + "url": str(obj.url) if hasattr(obj, "url") else None, + "headers": dict(obj.headers) if hasattr(obj, "headers") else None, + "text_snippet": text_snippet + } + except Exception: + pass + try: + return str(obj) + except Exception: + return repr(obj) + +def safe_json(obj): + try: + safe = _make_json_safe(obj) + json.dumps(safe, ensure_ascii=False) + return safe + except Exception as e: + logger.exception("safe_json conversion failed") + return { + "error": "safe_json_conversion_failed", + "error_str": str(e), + "repr": repr(obj)[:2000] + } + + +# -------------------- UUID Generator -------------------- +def generate_scan_id(): + import uuid + return str(uuid.uuid4()) + + +# -------------------- Stealth Mode Enhancements -------------------- +def get_random_user_agent(): + user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edg/120.0.0.0" + ] + return random.choice(user_agents) + +def get_realistic_headers(url: Optional[str] = None): + from urllib.parse import urlparse + time.sleep(random.uniform(0.02, 0.15)) + domain = urlparse(url).netloc if url else "example.com" + user_agent = get_random_user_agent() + accept_headers = { + "Chrome": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Firefox": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", + "Safari": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Edge": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", + "Opera": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" + } + browser = "Chrome" + if "Firefox" in user_agent: + browser = "Firefox" + elif "Safari" in user_agent and "Chrome" not in user_agent: + browser = "Safari" + elif "Edg" in user_agent or "Edge" in user_agent: + browser = "Edge" + languages = ["en-US,en;q=0.9", "en-GB,en;q=0.9", "ar-JO,ar;q=0.9,en;q=0.8", "fr-FR,fr;q=0.9,en;q=0.8"] + encodings = ["gzip, deflate, br", "gzip, deflate", "gzip, br", "deflate, br"] + headers = { + "User-Agent": user_agent, + "Accept": accept_headers.get(browser, accept_headers["Chrome"]), + "Accept-Language": random.choice(languages), + "Accept-Encoding": random.choice(encodings), + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "DNT": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "none", + "Sec-Fetch-User": "?1", + "Referer": f"https://www.google.com/search?q={domain.replace('.', '+')}", + "Cache-Control": "max-age=0" + } + return headers + + +# -------------------- Evidence storage -------------------- +def store_raw_evidence(content: bytes, prefix: str = "body") -> Dict[str, Any]: + sha = hashlib.sha256(content).hexdigest() + filename = f"{prefix}_{sha}.bin" + path = os.path.join(EVIDENCE_DIR, filename) + try: + if not os.path.exists(path): + with open(path, "wb") as fh: + fh.write(content) + return {"path": path, "sha256": sha, "timestamp": datetime.utcnow().isoformat() + "Z"} + except Exception as e: + logger.debug(f"Failed to store evidence: {e}") + return {"error": str(e)} + + +# -------------------- Retry/backoff wrapper (async) -------------------- +async def async_request_with_retry(method: str, url: str, client: httpx.AsyncClient, max_retries: int = 4, + base_delay: float = 0.5, timeout: int = 15, headers: dict = None): + attempt = 0 + while attempt <= max_retries: + try: + attempt += 1 + resp = await client.request(method, url, timeout=timeout, headers=headers) + if resp.status_code == 429 or (500 <= resp.status_code < 600 and resp.status_code != 501): + raise httpx.HTTPStatusError("Retryable status", request=resp.request, response=resp) + return resp + except Exception as e: + if attempt > max_retries: + logger.debug(f"Request failed (max retries) for {url}: {e}") + return None + sleep = base_delay * (2 ** (attempt - 1)) + jitter = random.uniform(0, sleep) + await asyncio.sleep(jitter) + return None + + +# -------------------- WHOIS -------------------- +def whois_lookup(domain: str) -> dict: + try: + w = whois.whois(domain) + return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "data": safe_json(w)} + except Exception as e: + logger.debug(f"whois_lookup error: {e}") + return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "error": str(e)} + + +# -------------------- DNS -------------------- +@lru_cache(maxsize=256) +def get_dns_records(domain: str) -> Dict[str, List[str]]: + records = defaultdict(list) + try: + for rtype in ("A", "AAAA", "CNAME", "MX", "NS", "TXT"): + try: + answers = dns.resolver.resolve(domain, rtype, lifetime=5) + for r in answers: + records[rtype].append(str(r).strip()) + except Exception: + continue + except Exception as e: + logger.debug(f"get_dns_records error: {e}") + return dict(records) + +def resolve_cname_chain(hostname: str, max_depth: int = 6) -> List[str]: + chain = [] + try: + resolver = dns.resolver.Resolver() + resolver.lifetime = 5 + curr = hostname + for _ in range(max_depth): + try: + answers = resolver.resolve(curr, "CNAME") + if not answers: + break + target = str(answers[0].target).rstrip(".") + chain.append(target) + curr = target + except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.NoNameservers): + break + except Exception: + break + except Exception as e: + logger.debug(f"resolve_cname_chain error for {hostname}: {e}") + return chain + + +# -------------------- SSL/TLS info -------------------- +def get_ssl_info(domain: str) -> Dict[str, Any]: + res = { + "valid": False, + "issuer": None, + "subject": None, + "not_before": None, + "not_after": None, + "expired": None, + "san": [], + "raw_pem": None, + "error": None + } + try: + ctx = ssl.create_default_context() + with socket.create_connection((domain, 443), timeout=5) as sock: + with ctx.wrap_socket(sock, server_hostname=domain) as ss: + der = ss.getpeercert(binary_form=True) + pem = ssl.DER_cert_to_PEM_cert(der) + res["raw_pem"] = pem + x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem) + res["issuer"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v + for k, v in x509.get_issuer().get_components()} + res["subject"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v + for k, v in x509.get_subject().get_components()} + not_before = x509.get_notBefore() + not_after = x509.get_notAfter() + res["not_before"] = not_before.decode() if isinstance(not_before, bytes) else str(not_before) + res["not_after"] = not_after.decode() if isinstance(not_after, bytes) else str(not_after) + for i in range(x509.get_extension_count()): + ext = x509.get_extension(i) + if ext.get_short_name() == b'subjectAltName': + res["san"] = [s.strip() for s in str(ext).split(',')] + res["valid"] = True + try: + dt = datetime.strptime(res["not_after"][:14], "%Y%m%d%H%M%S") + res["expired"] = dt < datetime.utcnow() + except Exception: + res["expired"] = None + except Exception as e: + res["error"] = str(e) + logger.debug(f"get_ssl_info error for {domain}: {e}") + return res + + +# -------------------- Robots (try https then http, handle encodings/charset/compression) -------------------- +async def analyze_robots(domain: str) -> Dict[str, Any]: + tried = [] + async with httpx.AsyncClient(follow_redirects=True) as client: + for scheme in ("https://", "http://"): + url = f"{scheme}{domain}/robots.txt" + tried.append(url) + headers = get_realistic_headers(url) + r = await async_request_with_retry("GET", url, client, headers=headers, timeout=10) + if not r: + continue + if r.status_code == 200: + raw = r.content or b"" + ev = store_raw_evidence(raw, prefix="robots") + text = None + # if content is compressed (gzip) + try: + if raw.startswith(b'\x1f\x8b'): + try: + text = gzip.decompress(raw).decode('utf-8', errors='replace') + except Exception: + try: + text = r.text + except Exception: + text = None + elif brotli and (not raw.startswith(b'\x1f\x8b')) and (b'br' in (r.headers.get('content-encoding') or '').lower() or raw[:2] == b'\x8b'): + try: + text = brotli.decompress(raw).decode('utf-8', errors='replace') + except Exception: + text = None + else: + text = None + except Exception: + text = None + + # try charset_normalizer + if text is None and from_bytes: + try: + result = from_bytes(raw) + best = result.best() + if best: + text = best.read() + except Exception: + text = None + if text is None: + try: + text = raw.decode(r.encoding or "utf-8", errors="replace") + except Exception: + try: + text = r.text + except Exception: + text = raw.decode("utf-8", errors="replace") + + # sanitize and parse lines + rules = [] + sitemaps = [] + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + parts = (line.split(":", 1) + [""])[:2] + k = parts[0].strip().lower() + v = parts[1].strip() + if k == "sitemap": + sitemaps.append(v) + else: + rules.append({"directive": k, "value": v}) + return {"exists": True, "content_snippet": text[:8000], "rules": rules, "sitemaps": sitemaps, "fetched_from": url, "raw_evidence": ev} + return {"exists": False, "tried": tried, "error": "robots not found or unreachable (checked https and http)"} + + +# -------------------- Extract links & resources -------------------- +def extract_links_and_scripts(html: str, base_url: str) -> dict: + if not html: + return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []} + try: + soup = BeautifulSoup(html, "lxml") + results = {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []} + base_domain = urlparse(base_url).netloc.split(":")[0] if base_url else "" + for s in soup.find_all("script", src=True): + src = s["src"].strip() + full = urljoin(base_url, src) + results["js_links"].append(full) + for l in soup.find_all("link", rel=lambda r: r and "stylesheet" in r, href=True): + href = l["href"].strip() + full = urljoin(base_url, href) + results["css_links"].append(full) + for m in soup.find_all("meta"): + results["meta_tags"].append({k: m.get(k) for k in ("name", "property", "content", "http-equiv") if m.get(k)}) + for a in soup.find_all("a", href=True): + href = a["href"].strip() + if href.startswith(("mailto:", "tel:", "javascript:", "#")): + continue + full = urljoin(base_url, href) + try: + netloc = urlparse(full).netloc.split(":")[0] + except Exception: + netloc = "" + if netloc == base_domain: + results["internal_links"].append(full) + else: + results["external_links"].append(full) + for img in soup.find_all("img", src=True): + src = img["src"].strip() + full = urljoin(base_url, src) + results["image_links"].append(full) + for form in soup.find_all("form", action=True): + action = form["action"].strip() + full = urljoin(base_url, action) + results["form_links"].append(full) + if "/api/" in full or "/graphql" in full: + results["api_links"].append(full) + for k in ("js_links", "css_links", "internal_links", "external_links", "image_links", "form_links", "api_links"): + results[k] = list(dict.fromkeys(results[k])) + return results + except Exception as e: + logger.debug(f"extract_links error: {e}") + return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []} + + +# -------------------- Playwright render (returns content, headers, final_url) -------------------- +async def get_dynamic_html(url: str, timeout: int = 20) -> Tuple[str, Dict[str, str], str]: + try: + async with async_playwright() as pw: + browser = await pw.chromium.launch(args=["--no-sandbox"], headless=True) + page = await browser.new_page() + await page.set_extra_http_headers(get_realistic_headers(url)) + # navigate and capture main response + resp = await page.goto(url, wait_until="networkidle", timeout=timeout * 1000) + await asyncio.sleep(0.25) + content = await page.content() + # extract headers from the main response if available + headers = {} + final_url = url + try: + if resp: + headers = resp.headers or {} + final_url = resp.url or page.url + else: + final_url = page.url + except Exception: + headers = {} + await browser.close() + headers = {str(k): str(v) for k, v in (headers or {}).items()} + return content or "", headers, final_url or url + except Exception as e: + logger.debug(f"Playwright error: {e}") + return "", {}, url + + +# -------------------- Static fetch -------------------- +async def fetch_static(url: str, timeout: int = 15) -> Optional[httpx.Response]: + headers = get_realistic_headers(url) + async with httpx.AsyncClient(follow_redirects=True) as client: + resp = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers) + return resp + + +# -------------------- Wappalyzer helpers: DB validation -------------------- +def _iter_values_recursively(obj): + if isinstance(obj, dict): + for k, v in obj.items(): + yield from _iter_values_recursively(v) + elif isinstance(obj, list): + for i in obj: + yield from _iter_values_recursively(i) + elif isinstance(obj, str): + yield obj + +def validate_wappalyzer_db(path: str) -> List[Tuple[str, str, str]]: + """ + Validate regex patterns inside a Wappalyzer technologies.json file. + Returns list of tuples: (technology_name, pattern_string, error_message) + """ + bad = [] + try: + with open(path, "r", encoding="utf-8") as fh: + data = json.load(fh) + except Exception as e: + logger.debug(f"validate_wappalyzer_db: failed to load JSON: {e}") + return bad + for tech_name, tech_def in data.items(): + try: + for s in _iter_values_recursively(tech_def): + if not isinstance(s, str): + continue + # quick skip for short tokens unlikely to be regex + if len(s) < 4: + continue + try: + re.compile(s) + except re.error as rex: + bad.append((tech_name, s, str(rex))) + except Exception: + # ignore other compile-time issues + continue + except Exception: + continue + return bad + + +# -------------------- Wappalyzer / BuiltWith / JS/CSS heuristics -------------------- +def compute_tech_confidence_from_wappalyzer(data: dict) -> int: + confidence = 50 + detection = data.get("detection", {}) + if isinstance(detection, dict): + if "headers" in detection: + confidence = max(confidence, 85) + if "script" in detection or "js" in detection: + confidence = max(confidence, 80) + if "meta" in detection: + confidence = max(confidence, 75) + return confidence + +async def detect_technologies_wappalyzer(url: str, html: str, headers: dict) -> list: + try: + webpage = WebPage(url, html or "", headers or {}) + # try Wappalyzer.latest() but be resilient + try: + w = Wappalyzer.latest() + except Exception as e: + # fallback to local DB if available (with validation) + tech_path = os.path.join(os.path.dirname(__file__), "technologies.json") + if os.path.exists(tech_path): + try: + # validate DB first to log problematic regexs + bad = validate_wappalyzer_db(tech_path) + if bad: + logger.warning(f"Wappalyzer DB contains {len(bad)} invalid regex patterns (showing up to 10).") + for tname, patt, err in bad[:10]: + logger.warning(f"Invalid regex in Wappalyzer DB - {tname}: pattern={patt!r} error={err}") + w = Wappalyzer(tech_path) + except Exception as e2: + logger.debug(f"Fallback Wappalyzer load failed: {e2}") + return [] + else: + logger.debug("Wappalyzer DB not available and no local fallback") + return [] + # analyze, but guard against regex runtime errors inside w.analyze + try: + results = w.analyze_with_categories(webpage) or {} + except re.error as rex: + logger.exception("Wappalyzer analyze raised a regex error — likely a faulty pattern in DB.") + return [] + except Exception as e: + logger.debug(f"Wappalyzer analyze failed: {e}") + return [] + + detected = [] + for name, data in results.items(): + if not isinstance(data, dict): + continue + confidence = compute_tech_confidence_from_wappalyzer(data) + prov = [] + det = data.get("detected", {}) + if det: + prov.append("wappalyzer-detected") + categories = data.get("categories", []) + detected.append({ + "name": name, + "version": data.get("version", "Unknown"), + "categories": categories, + "confidence": confidence, + "source": "Wappalyzer", + "provenance": prov + }) + detected.sort(key=lambda x: x["confidence"], reverse=True) + return detected + except Exception as e: + logger.debug(f"Wappalyzer error: {e}") + return [] + + +async def detect_technologies_builtwith(url: str) -> list: + try: + raw = builtwith.builtwith(url) + out = [] + for cat, techs in (raw or {}).items(): + for t in techs: + confidence = 70 + if "cdn" in cat.lower(): + confidence = 90 + if "framework" in cat.lower(): + confidence = 90 + out.append({ + "name": t, + "category": cat, + "confidence": confidence, + "source": "BuiltWith", + "provenance": ["builtwith-api"] + }) + out.sort(key=lambda x: x["confidence"], reverse=True) + return out + except Exception as e: + logger.debug(f"BuiltWith error: {e}") + return [] + + +async def fetch_resource_content(url: str, timeout: int = 10) -> str: + try: + headers = get_realistic_headers(url) + async with httpx.AsyncClient(follow_redirects=True) as client: + r = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers) + if r and r.status_code == 200: + return r.text or "" + except Exception as e: + logger.debug(f"Failed to fetch resource {url}: {e}") + return "" + return "" + + +async def detect_js_technologies(js_links: List[str], base_url: str, html: str) -> list: + detected = [] + content = " ".join(js_links or []) + " " + (html or "") + content_l = content.lower() + indicators = { + "jQuery": r"jquery[\w-]*\.js|jquery-ui|\$\.fn\.jquery|window\.jquery", + "React": r"react[\w-]*\.js|react-dom|__react_devtools_global_hook__|data-reactroot", + "Angular": r"angular[\w-]*\.js|ng-app|angular\.module", + "Vue.js": r"vue[\w-]*\.js|__vue_devtools_global_hook__|vue-router" + } + for tech, pattern in indicators.items(): + try: + if re.search(pattern, content_l): + detected.append({"name": tech, "confidence": 70, "source": "JS Heuristics", "provenance": ["inline", "links"]}) + except re.error: + # fallback: substring check + if pattern.lower() in content_l: + detected.append({"name": tech, "confidence": 60, "source": "JS Heuristics (fallback)", "provenance": ["inline", "links"]}) + sem = asyncio.Semaphore(10) + + async def _fetch(url_): + async with sem: + return await fetch_resource_content(url_) + + tasks = [] + for url_ in (js_links or []): + tasks.append(_fetch(url_)) + + contents = [] + if tasks: + try: + contents = await asyncio.gather(*tasks) + except Exception: + contents = [] + + for c in (contents or []): + c_l = (c or "").lower() + for tech, pattern in indicators.items(): + try: + if re.search(pattern, c_l): + if not any(d["name"] == tech for d in detected): + detected.append({"name": tech, "confidence": 85, "source": "JS Heuristics", "provenance": ["resource_content"]}) + except re.error: + if pattern.lower() in c_l: + if not any(d["name"] == tech for d in detected): + detected.append({"name": tech, "confidence": 75, "source": "JS Heuristics (fallback)", "provenance": ["resource_content"]}) + return detected + + +async def detect_css_technologies(css_links: List[str], html: str) -> list: + detected = [] + content = " ".join(css_links or []) + " " + (html or "") + content_l = content.lower() + indicators = { + "Bootstrap": r"bootstrap[\w-]*\.css|class=['\"].*col-", + # improved Tailwind detection: look for class attributes containing tw- (utility prefix) or grid-cols, flex- etc. + "Tailwind CSS": r"tailwind\.min\.css|class=['\"][^'\"]*\btw-|class=['\"].*grid-cols-|class=['\"].*flex-", + "Materialize": r"materialize[\w-]*\.css" + } + for tech, pattern in indicators.items(): + try: + if re.search(pattern, content_l): + detected.append({"name": tech, "confidence": 70, "source": "CSS Heuristics", "provenance": ["links_or_inline"]}) + except re.error: + if pattern.lower() in content_l: + detected.append({"name": tech, "confidence": 60, "source": "CSS Heuristics (fallback)", "provenance": ["links_or_inline"]}) + sem = asyncio.Semaphore(8) + + async def _fetch(url_): + async with sem: + return await fetch_resource_content(url_) + + tasks = [] + for url_ in (css_links or []): + tasks.append(_fetch(url_)) + + contents = [] + if tasks: + try: + contents = await asyncio.gather(*tasks) + except Exception: + contents = [] + + for c in (contents or []): + c_l = (c or "").lower() + for tech, pattern in indicators.items(): + try: + if re.search(pattern, c_l): + if not any(d["name"] == tech for d in detected): + detected.append({"name": tech, "confidence": 85, "source": "CSS Heuristics", "provenance": ["resource_content"]}) + except re.error: + if pattern.lower() in c_l: + if not any(d["name"] == tech for d in detected): + detected.append({"name": tech, "confidence": 75, "source": "CSS Heuristics (fallback)", "provenance": ["resource_content"]}) + return detected + + +# -------------------- CMS detection -------------------- +def compute_confidence_from_evidence(evidence: List[Dict[str, Any]]) -> float: + if not evidence: + return 0.0 + total_possible = sum(float(e.get("weight", 0.0)) for e in evidence) + if total_possible <= 0: + return 0.0 + found = sum(float(e.get("weight", 0.0)) for e in evidence if e.get("found")) + return min(1.0, found / total_possible) + +def detect_cms(html: str, headers: dict, url: str, extracted_data: dict = None) -> list: + detected_cms = [] + html_lower = (html or "").lower() + headers_lower = {k.lower(): v for k, v in (headers or {}).items()} + extracted_data = extracted_data or {} + js_links = " ".join(extracted_data.get("js_links", [])) + form_links = " ".join(extracted_data.get("form_links", [])) + image_links = " ".join(extracted_data.get("image_links", [])) + cms_signatures = { + "WordPress": [ + {"type": "path", "pattern": r"wp-content", "weight": 0.23}, + {"type": "path", "pattern": r"wp-includes", "weight": 0.22}, + {"type": "api", "pattern": r"wp-json", "weight": 0.18}, + {"type": "meta", "pattern": r" 0: + detected_cms.append({ + "name": cms_name, + "confidence": round(confidence, 3), + "evidence": evidence, + "source": "CMS Heuristics", + "provenance": [e for e in evidence if e["found"]] + }) + x_gen = headers_lower.get("x-generator", "") or headers_lower.get("server", "") + if x_gen: + if "joomla" in x_gen.lower(): + if not any(d["name"] == "Joomla" for d in detected_cms): + detected_cms.append({"name": "Joomla", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]}) + elif "wordpress" in x_gen.lower() or "wp-" in x_gen.lower(): + if not any(d["name"] == "WordPress" for d in detected_cms): + detected_cms.append({"name": "WordPress", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]}) + detected_cms.sort(key=lambda x: x["confidence"], reverse=True) + return detected_cms + + +# -------------------- Security Headers -------------------- +def analyze_security_headers(headers: dict) -> Dict[str, Any]: + headers = {k.lower(): v for k, v in (headers or {}).items()} + security = { + "x-frame-options": headers.get("x-frame-options"), + "x-xss-protection": headers.get("x-xss-protection"), + "x-content-type-options": headers.get("x-content-type-options"), + "strict-transport-security": headers.get("strict-transport-security"), + "content-security-policy": headers.get("content-security-policy"), + "referrer-policy": headers.get("referrer-policy") + } + results = {} + for header, value in security.items(): + if value: + status = "Implemented" + if header == "x-frame-options": + if value.lower() in ["deny", "sameorigin"]: + status = "Secure" + else: + status = "Weak" + results[header] = {"status": status, "value": value} + return results + + +# -------------------- Payment Method Detection -------------------- +def detect_payment_methods(html: str, extracted_data: dict = None) -> list: + detected_methods = [] + html_lower = (html or "").lower() + extracted_data = extracted_data or {} + js_links = " ".join(extracted_data.get("js_links", [])).lower() + form_links = " ".join(extracted_data.get("form_links", [])).lower() + image_links = " ".join(extracted_data.get("image_links", [])).lower() + combined = " ".join([html_lower, js_links, form_links, image_links]) + payment_patterns = { + "Visa": r"\bvisa\b|visa-logo|/visa\.(svg|png|jpg|gif)", + "Mastercard": r"mastercard|/mastercard\.(svg|png|jpg|gif)|master-card|master card", + "American Express": r"american[\s-]*express|amex|/amex\.(svg|png|jpg|gif)", + "PayPal": r"paypal\.com|paypal-button|www\.paypalobjects\.com|paypalcheckout|paypal\.me", + "Stripe": r"js\.stripe\.com|stripe\.com|Stripe\.(setPublishableKey|card)|stripe-v3|stripe-elements", + "Apple Pay": r"apple[\s-]*pay|apple-pay", + "Google Pay": r"google[\s-]*pay|pay.google.com|google-pay", + "Shop Pay": r"shopify\.com\/shop_pay|shopify|shop-pay", + "Discover": r"discover|discover-logo|/discover\.(svg|png|jpg|gif)", + "UnionPay": r"unionpay|union-pay", + "JCB": r"\bjcb\b", + "Alipay": r"alipay|alipayjsbridge|alipay\.com", + "WeChat Pay": r"wechatpay|weixin\.qq\.com|wechat[\s-]*pay", + "Square": r"squareup\.com|square\.(js|cdn)|sq-", + "Authorize.Net": r"authorize\.net|secure2.authorize\.net", + "Braintree": r"braintree\.gateway|braintree\.js|braintree", + "Adyen": r"adyen|checkoutshopper|adyen-checkout", + "Worldpay": r"worldpay|secure\.worldpay", + "SagePay": r"sagepay|opayo", + "Klarna": r"klarna|klarna-checkout", + "Amazon Pay": r"amazonpay|static-na\.amzn\.com|amazon-pay", + "Payoneer": r"payoneer", + "Razorpay": r"razorpay|checkout\.razorpay\.com", + "2Checkout": r"2checkout|2co", + "Mollie": r"mollie|checkout\.mollie", + "PayU": r"payu|payu\.com", + "MercadoPago": r"mercadopago|mercadopago\.com", + "CyberSource": r"cybersource|ics2wsa", + "Afterpay": r"afterpay|clearpay", + "Paystack": r"paystack|js\.paystack\.co", + "ePDQ": r"epdq|ogone", + "Checkout.com": r"checkout\.com|checkoutjs", + "GreenPay": r"greenpay" + } + for method, pattern in payment_patterns.items(): + try: + if re.search(pattern, combined, re.I): + if method not in detected_methods: + detected_methods.append(method) + except re.error: + if pattern.lower() in combined: + if method not in detected_methods: + detected_methods.append(method) + checkout_indicators = [r"/checkout", r"/cart", r"/pay", r"/payment", r"/order", r"/billing"] + for pat in checkout_indicators: + if re.search(pat, form_links + html_lower): + if "E-Commerce/Checkout" not in detected_methods: + detected_methods.append("E-Commerce/Checkout") + return detected_methods + + +# -------------------- Tracker and Analytics Detection -------------------- +def detect_trackers_and_analytics(html: str, js_links: list = None, meta_tags: list = None) -> list: + detected_trackers = [] + html_lower = (html or "").lower() + tracker_patterns = { + "Google Analytics": r"google-analytics\.com/|gtag\.js|analytics\.js", + "Google Tag Manager": r"googletagmanager\.com", + "Facebook Pixel": r"connect\.facebook\.net/en_US/fbevents\.js|fbq\(", + "Hotjar": r"hotjar\.com|hjid", + "Matomo (Piwik)": r"matomo\.js", + "TikTok Pixel": r"ttq\.load" + } + for tracker, pattern in tracker_patterns.items(): + if re.search(pattern, html_lower): + detected_trackers.append(tracker) + all_js_links = " ".join([link.lower() for link in (js_links or [])]) + for tracker, pattern in tracker_patterns.items(): + if re.search(pattern, all_js_links): + if tracker not in detected_trackers: + detected_trackers.append(tracker) + meta_content = " ".join([tag.get('content', '').lower() for tag in (meta_tags or [])]) + for tracker, pattern in tracker_patterns.items(): + if re.search(pattern, meta_content): + if tracker not in detected_trackers: + detected_trackers.append(tracker) + return detected_trackers + + +# -------------------- IP info -------------------- +def get_ip_info(ip: str) -> Dict: + res = {"source": "ipwhois", "timestamp": datetime.utcnow().isoformat() + "Z"} + try: + obj = ipwhois.IPWhois(ip).lookup_rdap(depth=1) + res["asn"] = obj.get("asn") + res["asn_cidr"] = obj.get("asn_cidr") + res["asn_country_code"] = obj.get("asn_country_code") + res["asn_description"] = obj.get("asn_description") + res["network"] = obj.get("network") + except Exception as e: + logger.debug(f"IPWhois lookup failed for {ip}: {e}") + res["error"] = str(e) + return res + + +# -------------------- WAF detection -------------------- +def detect_waf_subprocess(url: str) -> dict: + result = {"detected": False, "provider": None, "confidence": 0.0, "evidence": []} + try: + proc = subprocess.run(["wafw00f", "-a", url], capture_output=True, text=True, timeout=20) + out = (proc.stdout or "") + (proc.stderr or "") + if proc.returncode == 0 and out: + lines = out.splitlines() + for ln in lines: + for provider in ["Cloudflare", "Imperva", "Akamai", "Fastly", "Sucuri", "F5", "ModSecurity", "AWS WAF", "Fortinet", "Barracuda", "Incapsula"]: + if provider.lower() in ln.lower(): + result.update({"detected": True, "provider": provider, "confidence": 0.9, "evidence": ["wafw00f-output"]}) + return result + except Exception: + pass + try: + parsed = urlparse(url) + try: + r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url)) + headers = {k.lower(): v for k, v in dict(r.headers).items()} + body_snippet = (r.text or "")[:3000] + cookie_keys = " ".join([c.name for c in getattr(r, "cookies", [])]) if hasattr(r, "cookies") else "" + except Exception as e: + headers = {} + body_snippet = "" + cookie_keys = "" + header_indicators = { + "Cloudflare": ["cf-ray", "server: cloudflare", "cf-cache-status", "cf-request-id"], + "Imperva": ["x-iinfo", "incapsula", "visid_incap_"], + "Akamai": ["x-akamai-transformed", "akamai", "akamaiedge", "akamaitechnologies"], + "Fastly": ["x-served-by", "x-cache", "x-fastly-backend-request-id"], + "Sucuri": ["x-sucuri-cache", "x-sucuri-id"], + "F5": ["bigipserver", "x-lb"], + "ModSecurity": ["mod_security", "mod_sec"], + "AWS WAF": ["x-amzn-requestid", "x-amz-cf-id"], + "Fortinet": ["fortigate", "f5-"], + "Barracuda": ["barracuda"], + "Incapsula": ["visid_incap_"] + } + for provider, sigs in header_indicators.items(): + for sig in sigs: + try: + if ":" in sig: + hname, hv = [s.strip() for s in sig.split(":", 1)] + hv = hv.lower() + if headers.get(hname) and hv in headers.get(hname, "").lower(): + result.update({"detected": True, "provider": provider, "confidence": 0.75, "evidence": [f"header:{hname}"]}) + return result + else: + if any(sig in h for h in headers.keys()): + result.update({"detected": True, "provider": provider, "confidence": 0.7, "evidence": [f"header_contains:{sig}"]}) + return result + if sig in body_snippet.lower(): + result.update({"detected": True, "provider": provider, "confidence": 0.6, "evidence": ["body_snippet"]}) + return result + if re.search(re.escape(sig), cookie_keys, re.I): + result.update({"detected": True, "provider": provider, "confidence": 0.65, "evidence": ["cookie_name"]}) + return result + except Exception: + continue + challenge_patterns = [r"attention required", r"access denied", r"please enable cookies", r"security check", r"verify you are a human", r"challenge.*cloudflare"] + for pat in challenge_patterns: + if re.search(pat, body_snippet, re.I): + result.update({"detected": True, "provider": "Unknown (challenge page)", "confidence": 0.5, "evidence": ["challenge_pattern"]}) + return result + except Exception as e: + logger.debug(f"WAF detection error heuristics: {e}") + return result + + +# -------------------- CDN detection -------------------- +def detect_cdn_from_headers_and_dns(headers: dict, dns_records: dict, ip: str = None, extracted_data: dict = None) -> dict: + detected = {"source": None, "provider": None, "confidence": 0, "reasons": []} + headers_lower = {k.lower(): v for k, v in (headers or {}).items()} + extracted_data = extracted_data or {} + cdn_header_signatures = { + "Cloudflare": ["cf-ray", "cf-cache-status", "server: cloudflare", "cf-request-id"], + "Akamai": ["x-akamai-transformed", "x-akamai-request-id", "akamai"], + "Amazon CloudFront": ["x-amz-cf-id", "via: 1.1 cloudfront", "x-cache"], + "Fastly": ["x-served-by", "x-fastly-backend-request-id", "x-cache"], + "Sucuri": ["x-sucuri-cache", "x-sucuri-id"], + "Google Cloud CDN": ["x-goog-gfe-response-headers", "x-google-gfe"], + "Incapsula": ["x-iinfo", "visid_incap_"], + "Azure CDN": ["cdn-io", "azureedge", "azurefd", "akadns"], + "Netlify": ["netlify"], + "Cloudflare Stream": ["cf-stream"], + "BunnyCDN": ["bunnycdn"], + "StackPath": ["stackpathcdn"], + "KeyCDN": ["x-keycdn"], + "CDN77": ["cdn77"], + "Akamai EdgeKey": ["edgekey.net"] + } + for provider, sigs in cdn_header_signatures.items(): + for sig in sigs: + if any(sig in h for h in headers_lower.keys()) or any(sig in v.lower() for v in headers_lower.values()): + detected.update({"source": "Headers", "provider": provider, "confidence": 95}) + detected["reasons"].append(f"header signature matched {sig}") + return detected + cname_records = dns_records.get("CNAME", []) if dns_records else [] + try: + candidate_host = cname_records[0] if cname_records else None + cname_chain = resolve_cname_chain(candidate_host) if candidate_host else [] + cname_patterns = { + "Cloudflare": r"cloudflare|cloudfront|cloudflare.net", + "Akamai": r"akamai|akamaiedge|akamaitechnologies|edgekey\.net|akamaiedge\.net", + "Amazon CloudFront": r"cloudfront\.net", + "Fastly": r"fastly\.net|fastly", + "Incapsula": r"incapsula|imperva", + "Sucuri": r"sucuri\.net|sucuri", + "Azure CDN": r"azureedge|azurefd|z6rungcdn|azure", + "Netlify": r"netlify\.app|netlify", + "BunnyCDN": r"bunnycdn", + "StackPath": r"stackpathdns", + "KeyCDN": r"kccdn|kxcdn", + "CDN77": r"cdn77", + } + for provider, pattern in cname_patterns.items(): + for cname in (cname_records + cname_chain): + if re.search(pattern, cname, re.I): + detected.update({"source": "DNS CNAME", "provider": provider, "confidence": 85}) + detected["reasons"].append(f"CNAME {cname} matches {provider}") + return detected + except Exception as e: + logger.debug(f"CDN CNAME check error: {e}") + try: + asset_hosts = set() + for linklist in ("js_links", "css_links", "image_links", "form_links"): + for a in extracted_data.get(linklist, []): + try: + p = urlparse(a) + if p.hostname: + asset_hosts.add(p.hostname.lower()) + except Exception: + continue + asset_hosts_list = list(asset_hosts) + asset_host_patterns = { + "Cloudflare": ["cloudflare", "cdn-cdn.cloudflare", "cloudflare.net", "cdn-cgi"], + "Akamai": ["akamai.net", "akamaiedge", "akamaitechnologies", "edgekey.net"], + "Fastly": ["fastly.net", "fastly"], + "Amazon CloudFront": ["cloudfront.net", "amazonaws.com"], + "Netlify": ["netlify.app", "netlify"], + "BunnyCDN": ["b-cdn.net", "bunnycdn"], + "Google Cloud CDN": ["googleusercontent.com", "googleapis.com"], + "KeyCDN": ["kxcdn", "kccdn"], + "CDN77": ["cdn77"], + "StackPath": ["stackpathcdn", "stackpathdns"] + } + for provider, pats in asset_host_patterns.items(): + for pat in pats: + for ah in asset_hosts_list: + if pat in ah: + detected.update({"source": "Asset Hosts", "provider": provider, "confidence": 80}) + detected["reasons"].append(f"asset host {ah} contains {pat}") + return detected + except Exception as e: + logger.debug(f"Asset host analysis error: {e}") + return detected + + +# -------------------- Main async scan (IMPROVED) -------------------- +async def main_async_scan(url: str): + scan_start = datetime.utcnow().isoformat() + "Z" + try: + logger.info(f"Starting scan for {url}") + # Step 1: Try Playwright render (get content + headers) + dynamic_html, dynamic_headers, dynamic_final_url = await get_dynamic_html(url) + final_html = dynamic_html or "" + final_headers = dynamic_headers or {} + final_url = dynamic_final_url or url + static_response = None + + # If no dynamic content, try static fetch (async) + if not final_html: + logger.info("Dynamic fetch empty; attempting static fetch...") + static_response = await fetch_static(url) + if static_response and static_response.status_code == 200: + final_html = static_response.text or "" + final_headers = dict(static_response.headers or {}) + final_url = str(static_response.url or url) + else: + # fallback sync attempt to capture headers/body + try: + r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url)) + if r.status_code == 200: + final_html = r.text or "" + final_headers = dict(r.headers or {}) + final_url = str(r.url or url) + else: + logger.warning(f"Static fetch returned {r.status_code} for {url}") + except Exception as e: + logger.debug(f"Sync fallback static fetch failed: {e}") + else: + # We have dynamic HTML; ensure we also have headers (use static fetch or HEAD if headers missing) + if not final_headers: + try: + head_resp = httpx.head(final_url, follow_redirects=True, timeout=8, headers=get_realistic_headers(final_url)) + if head_resp and head_resp.status_code < 400: + final_headers = dict(head_resp.headers or {}) + else: + r2 = httpx.get(final_url, follow_redirects=True, timeout=10, headers=get_realistic_headers(final_url)) + if r2: + final_headers = dict(r2.headers or {}) + except Exception as e: + logger.debug(f"Failed to fetch headers fallback: {e}") + + # store raw evidence: headers + body + raw_evidence = {} + if final_html: + raw_body_bytes = (final_html.encode("utf-8") if isinstance(final_html, str) else (final_html or b"")) + raw_evidence["body"] = store_raw_evidence(raw_body_bytes, prefix="body") + if final_headers: + try: + hdr_bytes = json.dumps(dict(final_headers), ensure_ascii=False).encode("utf-8") + raw_evidence["headers"] = store_raw_evidence(hdr_bytes, prefix="headers") + except Exception: + raw_evidence["headers"] = {"error": "failed_to_store_headers"} + + # Step 2: Extract links and resources (ensure final_url passed) + logger.info("Extracting links and resources...") + extracted_data = extract_links_and_scripts(final_html or "", final_url) + js_links = extracted_data.get("js_links", []) + css_links = extracted_data.get("css_links", []) + + # Step 3: Run detection tasks concurrently + logger.info("Detecting technologies (Wappalyzer/BuiltWith/JS/CSS heuristics)...") + tasks = [ + detect_technologies_wappalyzer(final_url, final_html or "", final_headers), + detect_technologies_builtwith(final_url), + detect_js_technologies(js_links, final_url, final_html or ""), + detect_css_technologies(css_links, final_html or "") + ] + wappalyzer_res, builtwith_res, js_res, css_res = await asyncio.gather(*tasks) + + # Step 4: Combine technologies + all_tech = (wappalyzer_res or []) + (builtwith_res or []) + (js_res or []) + (css_res or []) + tech_map: Dict[str, Any] = {} + for tech in all_tech: + name = tech.get("name") + if not name: + continue + existing = tech_map.get(name) + confidence = float(tech.get("confidence", 50)) + if existing: + existing_conf = float(existing.get("confidence", 0)) + existing["confidence"] = max(existing_conf, confidence) + existing_sources = set([s.strip() for s in str(existing.get("source", "")).split(",") if s]) + incoming_source = tech.get("source") or "" + if incoming_source and incoming_source not in existing_sources: + existing_sources.add(incoming_source) + existing["source"] = ", ".join(sorted(existing_sources)) + existing_prov = set(existing.get("provenance", []) or []) + incoming_prov = set(tech.get("provenance", []) or []) + existing["provenance"] = list(existing_prov.union(incoming_prov)) + if tech.get("version") and existing.get("version") in (None, "Unknown"): + existing["version"] = tech.get("version") + else: + tech_map[name] = { + "name": name, + "version": tech.get("version", "Unknown"), + "confidence": confidence, + "source": tech.get("source", ""), + "provenance": tech.get("provenance", []) or [] + } + combined_tech = list(tech_map.values()) + combined_tech.sort(key=lambda x: x.get("confidence", 0), reverse=True) + + # Step 5: DNS and SSL + parsed = urlparse(final_url) + domain = parsed.netloc.split(":")[0] if parsed.netloc else "" + dns_records = get_dns_records(domain) if domain else {} + ssl_info = {} + if parsed.scheme == "https" and domain: + ssl_info = get_ssl_info(domain) + + # Step 6: IP info + ip_info = {} + if dns_records.get("A"): + ip = dns_records["A"][0] if isinstance(dns_records["A"], list) and dns_records["A"] else dns_records["A"] + ip_info = get_ip_info(ip) + + # Step 7: robots.txt + robots_info = await analyze_robots(domain) if domain else {"exists": False, "tried": [], "error": "no domain"} + + # Step 8: Security headers and CMS detection + security_headers = analyze_security_headers(final_headers) + cms_info = detect_cms(final_html or "", final_headers or {}, final_url, extracted_data=extracted_data) + + # Step 9: payments and trackers + payment_methods_info = detect_payment_methods(final_html or "", extracted_data=extracted_data) + trackers_info = detect_trackers_and_analytics(final_html or "", js_links=extracted_data.get("js_links", []), meta_tags=extracted_data.get("meta_tags", [])) + + # Step 10: WAF & CDN heuristics + waf_info = detect_waf_subprocess(final_url) + cdn_info = detect_cdn_from_headers_and_dns(final_headers or {}, dns_records or {}, ip_info.get("asn_cidr") if ip_info else None, extracted_data=extracted_data) + + # Inference rules for Cloudflare + try: + if (not cdn_info.get("provider")) and waf_info.get("provider") and "cloudflare" in (waf_info.get("provider") or "").lower(): + cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 90, "reasons": ["waf indicates Cloudflare"]}) + elif (not cdn_info.get("provider")) and ip_info and ip_info.get("asn_description") and "cloudflare" in str(ip_info.get("asn_description")).lower(): + cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 85, "reasons": ["ip whois ASN indicates Cloudflare"]}) + else: + ns_list = dns_records.get("NS", []) or [] + if (not cdn_info.get("provider")): + for ns in ns_list: + if "cloudflare" in ns.lower(): + cdn_info.update({"source": "dns", "provider": "Cloudflare", "confidence": 85, "reasons": [f"NS {ns} indicates Cloudflare"]}) + break + except Exception: + pass + + # Build final report + title = "No Title" + try: + soup = BeautifulSoup(final_html or "", "lxml") + if soup.title and soup.title.string: + title = soup.title.string.strip() + except Exception: + title = "No Title" + + report = { + "scan_id": generate_scan_id(), + "scanned_at": scan_start, + "url": final_url, + "title": title, + "raw_evidence": raw_evidence, + "technologies": combined_tech, + "links_and_resources": extracted_data, + "dns_records": dns_records, + "ssl_info": ssl_info, + "ip_info": ip_info, + "robots_info": robots_info, + "security_headers": security_headers, + "cms_info": cms_info, + "payment_methods": payment_methods_info, + "trackers_and_analytics": trackers_info, + "waf_info": waf_info, + "cdn_info": cdn_info, + "headers": final_headers, + "notes": "Report contains provenance (raw_evidence paths) and normalized confidence scores (0-100 for technologies)." + } + + # Normalize confidence to 0-100 for technologies + for t in report["technologies"]: + try: + t_conf = float(t.get("confidence", 50)) + if 0 <= t_conf <= 1: + t["confidence"] = int(round(t_conf * 100)) + else: + t["confidence"] = int(round(min(max(t_conf, 0), 100))) + except Exception: + t["confidence"] = 50 + + return safe_json(report) + + except Exception as e: + logger.exception("Main scan failed") + return safe_json({"error": "Main scan failed", "details": str(e), "scanned_at": scan_start}) + + +# -------------------- Convenience wrapper used by analyze_site.py -------------------- +async def run_scan_for_url(url: str, render_js: bool = False, scan_id: Optional[str] = None) -> Dict[str, Any]: + try: + report = await main_async_scan(url) + if not isinstance(report, dict): + report = {"error": "invalid_report", "details": "Scanner returned non-dict result", "raw": str(report)} + report.setdefault("scanned_url", report.get("url", url)) + if scan_id: + report["scan_id"] = scan_id + report.setdefault("url", report.get("scanned_url")) + report.setdefault("technologies", report.get("technologies", [])) + report.setdefault("dns_records", report.get("dns_records", {})) + report.setdefault("robots_info", report.get("robots_info", {"exists": False})) + report.setdefault("headers", report.get("headers", {})) + # compatibility aliases + report.setdefault("waf", report.get("waf_info")) + report.setdefault("cdn", report.get("cdn_info")) + report.setdefault("payments", report.get("payment_methods")) + return report + except Exception as e: + logger.exception("run_scan_for_url wrapper failed") + return safe_json({"error": "run_scan_for_url_failed", "details": str(e), "scanned_url": url}) + + +if __name__ == '__main__': + # quick smoke test when running standalone + test_url = "https://www.google.com" + # note: run async via: python -c "import asyncio, utils; asyncio.run(utils.main_async_scan('https://example.com'))" + pass