# utils.py # SuperRecon utils - improved (compat_resources + Wappalyzer DB validation + regex fixes) import os import re import json import socket import logging import ssl import gzip import OpenSSL import dns.resolver import httpx from urllib.parse import urljoin, urlparse, quote_plus from bs4 import BeautifulSoup from datetime import datetime, date, timezone from collections import defaultdict from typing import List, Dict, Any, Optional, Tuple import asyncio import random import ipaddress import ipwhois import time from functools import lru_cache from playwright.async_api import async_playwright import whois from Wappalyzer import Wappalyzer, WebPage import builtwith import subprocess import hashlib # optional import for charset detection (best-effort) try: from charset_normalizer import from_bytes except Exception: from_bytes = None # optional brotli decompress try: import brotli except Exception: brotli = None # -------------------- Logger setup -------------------- logger = logging.getLogger("SuperRecon.utils") if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(os.environ.get("SUPERR_LOG_LEVEL", "INFO")) # Directory to store raw evidence EVIDENCE_DIR = os.environ.get("SUPERR_EVIDENCE_DIR", "./evidence") os.makedirs(EVIDENCE_DIR, exist_ok=True) # -------------------- Compatibility layer (replacement for pkg_resources) -------------------- # Provides: get_version, resource_bytes, resource_text, resource_path (context manager), # iter_entry_points, load_entry_point, parse_requirement, installed_distributions, dist_metadata try: # importlib.metadata (stdlib) with backport fallback from importlib.metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore except Exception: from importlib_metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore # importlib.resources with backport fallback try: from importlib.resources import files, as_file, read_binary, read_text # type: ignore except Exception: from importlib_resources import files, as_file, read_binary, read_text # type: ignore from contextlib import contextmanager from packaging.requirements import Requirement from packaging.version import Version, InvalidVersion def get_version(package_name: str) -> Optional[str]: try: return _version(package_name) except Exception: return None def resource_bytes(package: str, resource: str) -> bytes: return read_binary(package, resource) def resource_text(package: str, resource: str, encoding: str = "utf-8") -> str: return read_text(package, resource, encoding=encoding) @contextmanager def resource_path(package: str, resource: str): """ Yields a filesystem Path for resource if possible. Usage: with resource_path('mypkg', 'data/file.txt') as p: open(p)... """ p = files(package).joinpath(resource) with as_file(p) as fp: yield fp class EP: def __init__(self, ep): self._ep = ep @property def name(self): return self._ep.name @property def value(self): return self._ep.value def load(self): return self._ep.load() def iter_entry_points(group: str): eps = entry_points() try: group_eps = eps.select(group=group) # py3.10+ except Exception: try: group_eps = [e for e in eps if getattr(e, "group", None) == group] except Exception: group_eps = eps.get(group, []) # type: ignore for e in group_eps: yield EP(e) def load_entry_point(group: str, name: str): for ep in iter_entry_points(group): if ep.name == name: return ep.load() raise LookupError(f"entry point {group}:{name} not found") def parse_requirement(req_str: str) -> Requirement: return Requirement(req_str) def installed_distributions(): for dist in distributions(): yield dist def dist_metadata(name: str): try: return distribution(name).metadata except PackageNotFoundError: return None def dist_files(name: str): try: return distribution(name).files except PackageNotFoundError: return None # -------------------- Safe JSON Helpers -------------------- def _make_json_safe(obj): if obj is None or isinstance(obj, (bool, int, float, str)): return obj if isinstance(obj, dict): new = {} for k, v in obj.items(): try: key = str(k) except Exception: key = repr(k) new[key] = _make_json_safe(v) return new if isinstance(obj, (list, tuple, set)): return [_make_json_safe(i) for i in obj] try: if isinstance(obj, (datetime, date)): return obj.isoformat() except Exception: pass try: import httpx as _httpx if isinstance(obj, _httpx.Response): try: text_snippet = obj.text[:1000] except Exception: text_snippet = None return { "status_code": obj.status_code, "url": str(obj.url) if hasattr(obj, "url") else None, "headers": dict(obj.headers) if hasattr(obj, "headers") else None, "text_snippet": text_snippet } except Exception: pass try: return str(obj) except Exception: return repr(obj) def safe_json(obj): try: safe = _make_json_safe(obj) json.dumps(safe, ensure_ascii=False) return safe except Exception as e: logger.exception("safe_json conversion failed") return { "error": "safe_json_conversion_failed", "error_str": str(e), "repr": repr(obj)[:2000] } # -------------------- UUID Generator -------------------- def generate_scan_id(): import uuid return str(uuid.uuid4()) # -------------------- Stealth Mode Enhancements -------------------- def get_random_user_agent(): user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edg/120.0.0.0" ] return random.choice(user_agents) def get_realistic_headers(url: Optional[str] = None): from urllib.parse import urlparse time.sleep(random.uniform(0.02, 0.15)) domain = urlparse(url).netloc if url else "example.com" user_agent = get_random_user_agent() accept_headers = { "Chrome": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Firefox": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Safari": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Edge": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Opera": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" } browser = "Chrome" if "Firefox" in user_agent: browser = "Firefox" elif "Safari" in user_agent and "Chrome" not in user_agent: browser = "Safari" elif "Edg" in user_agent or "Edge" in user_agent: browser = "Edge" languages = ["en-US,en;q=0.9", "en-GB,en;q=0.9", "ar-JO,ar;q=0.9,en;q=0.8", "fr-FR,fr;q=0.9,en;q=0.8"] encodings = ["gzip, deflate, br", "gzip, deflate", "gzip, br", "deflate, br"] headers = { "User-Agent": user_agent, "Accept": accept_headers.get(browser, accept_headers["Chrome"]), "Accept-Language": random.choice(languages), "Accept-Encoding": random.choice(encodings), "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "DNT": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Referer": f"https://www.google.com/search?q={domain.replace('.', '+')}", "Cache-Control": "max-age=0" } return headers # -------------------- Evidence storage -------------------- def store_raw_evidence(content: bytes, prefix: str = "body") -> Dict[str, Any]: sha = hashlib.sha256(content).hexdigest() filename = f"{prefix}_{sha}.bin" path = os.path.join(EVIDENCE_DIR, filename) try: if not os.path.exists(path): with open(path, "wb") as fh: fh.write(content) return {"path": path, "sha256": sha, "timestamp": datetime.utcnow().isoformat() + "Z"} except Exception as e: logger.debug(f"Failed to store evidence: {e}") return {"error": str(e)} # -------------------- Retry/backoff wrapper (async) -------------------- async def async_request_with_retry(method: str, url: str, client: httpx.AsyncClient, max_retries: int = 4, base_delay: float = 0.5, timeout: int = 15, headers: dict = None): attempt = 0 while attempt <= max_retries: try: attempt += 1 resp = await client.request(method, url, timeout=timeout, headers=headers) if resp.status_code == 429 or (500 <= resp.status_code < 600 and resp.status_code != 501): raise httpx.HTTPStatusError("Retryable status", request=resp.request, response=resp) return resp except Exception as e: if attempt > max_retries: logger.debug(f"Request failed (max retries) for {url}: {e}") return None sleep = base_delay * (2 ** (attempt - 1)) jitter = random.uniform(0, sleep) await asyncio.sleep(jitter) return None # -------------------- WHOIS -------------------- def whois_lookup(domain: str) -> dict: try: w = whois.whois(domain) return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "data": safe_json(w)} except Exception as e: logger.debug(f"whois_lookup error: {e}") return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "error": str(e)} # -------------------- DNS -------------------- @lru_cache(maxsize=256) def get_dns_records(domain: str) -> Dict[str, List[str]]: records = defaultdict(list) try: for rtype in ("A", "AAAA", "CNAME", "MX", "NS", "TXT"): try: answers = dns.resolver.resolve(domain, rtype, lifetime=5) for r in answers: records[rtype].append(str(r).strip()) except Exception: continue except Exception as e: logger.debug(f"get_dns_records error: {e}") return dict(records) def resolve_cname_chain(hostname: str, max_depth: int = 6) -> List[str]: chain = [] try: resolver = dns.resolver.Resolver() resolver.lifetime = 5 curr = hostname for _ in range(max_depth): try: answers = resolver.resolve(curr, "CNAME") if not answers: break target = str(answers[0].target).rstrip(".") chain.append(target) curr = target except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.NoNameservers): break except Exception: break except Exception as e: logger.debug(f"resolve_cname_chain error for {hostname}: {e}") return chain # -------------------- SSL/TLS info -------------------- def get_ssl_info(domain: str) -> Dict[str, Any]: res = { "valid": False, "issuer": None, "subject": None, "not_before": None, "not_after": None, "expired": None, "san": [], "raw_pem": None, "error": None } try: ctx = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=5) as sock: with ctx.wrap_socket(sock, server_hostname=domain) as ss: der = ss.getpeercert(binary_form=True) pem = ssl.DER_cert_to_PEM_cert(der) res["raw_pem"] = pem x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem) res["issuer"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in x509.get_issuer().get_components()} res["subject"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in x509.get_subject().get_components()} not_before = x509.get_notBefore() not_after = x509.get_notAfter() res["not_before"] = not_before.decode() if isinstance(not_before, bytes) else str(not_before) res["not_after"] = not_after.decode() if isinstance(not_after, bytes) else str(not_after) for i in range(x509.get_extension_count()): ext = x509.get_extension(i) if ext.get_short_name() == b'subjectAltName': res["san"] = [s.strip() for s in str(ext).split(',')] res["valid"] = True try: dt = datetime.strptime(res["not_after"][:14], "%Y%m%d%H%M%S") res["expired"] = dt < datetime.utcnow() except Exception: res["expired"] = None except Exception as e: res["error"] = str(e) logger.debug(f"get_ssl_info error for {domain}: {e}") return res # -------------------- Robots (try https then http, handle encodings/charset/compression) -------------------- async def analyze_robots(domain: str) -> Dict[str, Any]: tried = [] async with httpx.AsyncClient(follow_redirects=True) as client: for scheme in ("https://", "http://"): url = f"{scheme}{domain}/robots.txt" tried.append(url) headers = get_realistic_headers(url) r = await async_request_with_retry("GET", url, client, headers=headers, timeout=10) if not r: continue if r.status_code == 200: raw = r.content or b"" ev = store_raw_evidence(raw, prefix="robots") text = None # if content is compressed (gzip) try: if raw.startswith(b'\x1f\x8b'): try: text = gzip.decompress(raw).decode('utf-8', errors='replace') except Exception: try: text = r.text except Exception: text = None elif brotli and (not raw.startswith(b'\x1f\x8b')) and (b'br' in (r.headers.get('content-encoding') or '').lower() or raw[:2] == b'\x8b'): try: text = brotli.decompress(raw).decode('utf-8', errors='replace') except Exception: text = None else: text = None except Exception: text = None # try charset_normalizer if text is None and from_bytes: try: result = from_bytes(raw) best = result.best() if best: text = best.read() except Exception: text = None if text is None: try: text = raw.decode(r.encoding or "utf-8", errors="replace") except Exception: try: text = r.text except Exception: text = raw.decode("utf-8", errors="replace") # sanitize and parse lines rules = [] sitemaps = [] for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue parts = (line.split(":", 1) + [""])[:2] k = parts[0].strip().lower() v = parts[1].strip() if k == "sitemap": sitemaps.append(v) else: rules.append({"directive": k, "value": v}) return {"exists": True, "content_snippet": text[:8000], "rules": rules, "sitemaps": sitemaps, "fetched_from": url, "raw_evidence": ev} return {"exists": False, "tried": tried, "error": "robots not found or unreachable (checked https and http)"} # -------------------- Extract links & resources -------------------- def extract_links_and_scripts(html: str, base_url: str) -> dict: if not html: return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []} try: soup = BeautifulSoup(html, "lxml") results = {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []} base_domain = urlparse(base_url).netloc.split(":")[0] if base_url else "" for s in soup.find_all("script", src=True): src = s["src"].strip() full = urljoin(base_url, src) results["js_links"].append(full) for l in soup.find_all("link", rel=lambda r: r and "stylesheet" in r, href=True): href = l["href"].strip() full = urljoin(base_url, href) results["css_links"].append(full) for m in soup.find_all("meta"): results["meta_tags"].append({k: m.get(k) for k in ("name", "property", "content", "http-equiv") if m.get(k)}) for a in soup.find_all("a", href=True): href = a["href"].strip() if href.startswith(("mailto:", "tel:", "javascript:", "#")): continue full = urljoin(base_url, href) try: netloc = urlparse(full).netloc.split(":")[0] except Exception: netloc = "" if netloc == base_domain: results["internal_links"].append(full) else: results["external_links"].append(full) for img in soup.find_all("img", src=True): src = img["src"].strip() full = urljoin(base_url, src) results["image_links"].append(full) for form in soup.find_all("form", action=True): action = form["action"].strip() full = urljoin(base_url, action) results["form_links"].append(full) if "/api/" in full or "/graphql" in full: results["api_links"].append(full) for k in ("js_links", "css_links", "internal_links", "external_links", "image_links", "form_links", "api_links"): results[k] = list(dict.fromkeys(results[k])) return results except Exception as e: logger.debug(f"extract_links error: {e}") return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []} # -------------------- Playwright render (returns content, headers, final_url) -------------------- async def get_dynamic_html(url: str, timeout: int = 20) -> Tuple[str, Dict[str, str], str]: try: async with async_playwright() as pw: browser = await pw.chromium.launch(args=["--no-sandbox"], headless=True) page = await browser.new_page() await page.set_extra_http_headers(get_realistic_headers(url)) # navigate and capture main response resp = await page.goto(url, wait_until="networkidle", timeout=timeout * 1000) await asyncio.sleep(0.25) content = await page.content() # extract headers from the main response if available headers = {} final_url = url try: if resp: headers = resp.headers or {} final_url = resp.url or page.url else: final_url = page.url except Exception: headers = {} await browser.close() headers = {str(k): str(v) for k, v in (headers or {}).items()} return content or "", headers, final_url or url except Exception as e: logger.debug(f"Playwright error: {e}") return "", {}, url # -------------------- Static fetch -------------------- async def fetch_static(url: str, timeout: int = 15) -> Optional[httpx.Response]: headers = get_realistic_headers(url) async with httpx.AsyncClient(follow_redirects=True) as client: resp = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers) return resp # -------------------- Wappalyzer helpers: DB validation -------------------- def _iter_values_recursively(obj): if isinstance(obj, dict): for k, v in obj.items(): yield from _iter_values_recursively(v) elif isinstance(obj, list): for i in obj: yield from _iter_values_recursively(i) elif isinstance(obj, str): yield obj def validate_wappalyzer_db(path: str) -> List[Tuple[str, str, str]]: """ Validate regex patterns inside a Wappalyzer technologies.json file. Returns list of tuples: (technology_name, pattern_string, error_message) """ bad = [] try: with open(path, "r", encoding="utf-8") as fh: data = json.load(fh) except Exception as e: logger.debug(f"validate_wappalyzer_db: failed to load JSON: {e}") return bad for tech_name, tech_def in data.items(): try: for s in _iter_values_recursively(tech_def): if not isinstance(s, str): continue # quick skip for short tokens unlikely to be regex if len(s) < 4: continue try: re.compile(s) except re.error as rex: bad.append((tech_name, s, str(rex))) except Exception: # ignore other compile-time issues continue except Exception: continue return bad # -------------------- Wappalyzer / BuiltWith / JS/CSS heuristics -------------------- def compute_tech_confidence_from_wappalyzer(data: dict) -> int: confidence = 50 detection = data.get("detection", {}) if isinstance(detection, dict): if "headers" in detection: confidence = max(confidence, 85) if "script" in detection or "js" in detection: confidence = max(confidence, 80) if "meta" in detection: confidence = max(confidence, 75) return confidence async def detect_technologies_wappalyzer(url: str, html: str, headers: dict) -> list: try: webpage = WebPage(url, html or "", headers or {}) # try Wappalyzer.latest() but be resilient try: w = Wappalyzer.latest() except Exception as e: # fallback to local DB if available (with validation) tech_path = os.path.join(os.path.dirname(__file__), "technologies.json") if os.path.exists(tech_path): try: # validate DB first to log problematic regexs bad = validate_wappalyzer_db(tech_path) if bad: logger.warning(f"Wappalyzer DB contains {len(bad)} invalid regex patterns (showing up to 10).") for tname, patt, err in bad[:10]: logger.warning(f"Invalid regex in Wappalyzer DB - {tname}: pattern={patt!r} error={err}") w = Wappalyzer(tech_path) except Exception as e2: logger.debug(f"Fallback Wappalyzer load failed: {e2}") return [] else: logger.debug("Wappalyzer DB not available and no local fallback") return [] # analyze, but guard against regex runtime errors inside w.analyze try: results = w.analyze_with_categories(webpage) or {} except re.error as rex: logger.exception("Wappalyzer analyze raised a regex error — likely a faulty pattern in DB.") return [] except Exception as e: logger.debug(f"Wappalyzer analyze failed: {e}") return [] detected = [] for name, data in results.items(): if not isinstance(data, dict): continue confidence = compute_tech_confidence_from_wappalyzer(data) prov = [] det = data.get("detected", {}) if det: prov.append("wappalyzer-detected") categories = data.get("categories", []) detected.append({ "name": name, "version": data.get("version", "Unknown"), "categories": categories, "confidence": confidence, "source": "Wappalyzer", "provenance": prov }) detected.sort(key=lambda x: x["confidence"], reverse=True) return detected except Exception as e: logger.debug(f"Wappalyzer error: {e}") return [] async def detect_technologies_builtwith(url: str) -> list: try: raw = builtwith.builtwith(url) out = [] for cat, techs in (raw or {}).items(): for t in techs: confidence = 70 if "cdn" in cat.lower(): confidence = 90 if "framework" in cat.lower(): confidence = 90 out.append({ "name": t, "category": cat, "confidence": confidence, "source": "BuiltWith", "provenance": ["builtwith-api"] }) out.sort(key=lambda x: x["confidence"], reverse=True) return out except Exception as e: logger.debug(f"BuiltWith error: {e}") return [] async def fetch_resource_content(url: str, timeout: int = 10) -> str: try: headers = get_realistic_headers(url) async with httpx.AsyncClient(follow_redirects=True) as client: r = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers) if r and r.status_code == 200: return r.text or "" except Exception as e: logger.debug(f"Failed to fetch resource {url}: {e}") return "" return "" async def detect_js_technologies(js_links: List[str], base_url: str, html: str) -> list: detected = [] content = " ".join(js_links or []) + " " + (html or "") content_l = content.lower() indicators = { "jQuery": r"jquery[\w-]*\.js|jquery-ui|\$\.fn\.jquery|window\.jquery", "React": r"react[\w-]*\.js|react-dom|__react_devtools_global_hook__|data-reactroot", "Angular": r"angular[\w-]*\.js|ng-app|angular\.module", "Vue.js": r"vue[\w-]*\.js|__vue_devtools_global_hook__|vue-router" } for tech, pattern in indicators.items(): try: if re.search(pattern, content_l): detected.append({"name": tech, "confidence": 70, "source": "JS Heuristics", "provenance": ["inline", "links"]}) except re.error: # fallback: substring check if pattern.lower() in content_l: detected.append({"name": tech, "confidence": 60, "source": "JS Heuristics (fallback)", "provenance": ["inline", "links"]}) sem = asyncio.Semaphore(10) async def _fetch(url_): async with sem: return await fetch_resource_content(url_) tasks = [] for url_ in (js_links or []): tasks.append(_fetch(url_)) contents = [] if tasks: try: contents = await asyncio.gather(*tasks) except Exception: contents = [] for c in (contents or []): c_l = (c or "").lower() for tech, pattern in indicators.items(): try: if re.search(pattern, c_l): if not any(d["name"] == tech for d in detected): detected.append({"name": tech, "confidence": 85, "source": "JS Heuristics", "provenance": ["resource_content"]}) except re.error: if pattern.lower() in c_l: if not any(d["name"] == tech for d in detected): detected.append({"name": tech, "confidence": 75, "source": "JS Heuristics (fallback)", "provenance": ["resource_content"]}) return detected async def detect_css_technologies(css_links: List[str], html: str) -> list: detected = [] content = " ".join(css_links or []) + " " + (html or "") content_l = content.lower() indicators = { "Bootstrap": r"bootstrap[\w-]*\.css|class=['\"].*col-", # improved Tailwind detection: look for class attributes containing tw- (utility prefix) or grid-cols, flex- etc. "Tailwind CSS": r"tailwind\.min\.css|class=['\"][^'\"]*\btw-|class=['\"].*grid-cols-|class=['\"].*flex-", "Materialize": r"materialize[\w-]*\.css" } for tech, pattern in indicators.items(): try: if re.search(pattern, content_l): detected.append({"name": tech, "confidence": 70, "source": "CSS Heuristics", "provenance": ["links_or_inline"]}) except re.error: if pattern.lower() in content_l: detected.append({"name": tech, "confidence": 60, "source": "CSS Heuristics (fallback)", "provenance": ["links_or_inline"]}) sem = asyncio.Semaphore(8) async def _fetch(url_): async with sem: return await fetch_resource_content(url_) tasks = [] for url_ in (css_links or []): tasks.append(_fetch(url_)) contents = [] if tasks: try: contents = await asyncio.gather(*tasks) except Exception: contents = [] for c in (contents or []): c_l = (c or "").lower() for tech, pattern in indicators.items(): try: if re.search(pattern, c_l): if not any(d["name"] == tech for d in detected): detected.append({"name": tech, "confidence": 85, "source": "CSS Heuristics", "provenance": ["resource_content"]}) except re.error: if pattern.lower() in c_l: if not any(d["name"] == tech for d in detected): detected.append({"name": tech, "confidence": 75, "source": "CSS Heuristics (fallback)", "provenance": ["resource_content"]}) return detected # -------------------- CMS detection -------------------- def compute_confidence_from_evidence(evidence: List[Dict[str, Any]]) -> float: if not evidence: return 0.0 total_possible = sum(float(e.get("weight", 0.0)) for e in evidence) if total_possible <= 0: return 0.0 found = sum(float(e.get("weight", 0.0)) for e in evidence if e.get("found")) return min(1.0, found / total_possible) def detect_cms(html: str, headers: dict, url: str, extracted_data: dict = None) -> list: detected_cms = [] html_lower = (html or "").lower() headers_lower = {k.lower(): v for k, v in (headers or {}).items()} extracted_data = extracted_data or {} js_links = " ".join(extracted_data.get("js_links", [])) form_links = " ".join(extracted_data.get("form_links", [])) image_links = " ".join(extracted_data.get("image_links", [])) cms_signatures = { "WordPress": [ {"type": "path", "pattern": r"wp-content", "weight": 0.23}, {"type": "path", "pattern": r"wp-includes", "weight": 0.22}, {"type": "api", "pattern": r"wp-json", "weight": 0.18}, {"type": "meta", "pattern": r" 0: detected_cms.append({ "name": cms_name, "confidence": round(confidence, 3), "evidence": evidence, "source": "CMS Heuristics", "provenance": [e for e in evidence if e["found"]] }) x_gen = headers_lower.get("x-generator", "") or headers_lower.get("server", "") if x_gen: if "joomla" in x_gen.lower(): if not any(d["name"] == "Joomla" for d in detected_cms): detected_cms.append({"name": "Joomla", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]}) elif "wordpress" in x_gen.lower() or "wp-" in x_gen.lower(): if not any(d["name"] == "WordPress" for d in detected_cms): detected_cms.append({"name": "WordPress", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]}) detected_cms.sort(key=lambda x: x["confidence"], reverse=True) return detected_cms # -------------------- Security Headers -------------------- def analyze_security_headers(headers: dict) -> Dict[str, Any]: headers = {k.lower(): v for k, v in (headers or {}).items()} security = { "x-frame-options": headers.get("x-frame-options"), "x-xss-protection": headers.get("x-xss-protection"), "x-content-type-options": headers.get("x-content-type-options"), "strict-transport-security": headers.get("strict-transport-security"), "content-security-policy": headers.get("content-security-policy"), "referrer-policy": headers.get("referrer-policy") } results = {} for header, value in security.items(): if value: status = "Implemented" if header == "x-frame-options": if value.lower() in ["deny", "sameorigin"]: status = "Secure" else: status = "Weak" results[header] = {"status": status, "value": value} return results # -------------------- Payment Method Detection -------------------- def detect_payment_methods(html: str, extracted_data: dict = None) -> list: detected_methods = [] html_lower = (html or "").lower() extracted_data = extracted_data or {} js_links = " ".join(extracted_data.get("js_links", [])).lower() form_links = " ".join(extracted_data.get("form_links", [])).lower() image_links = " ".join(extracted_data.get("image_links", [])).lower() combined = " ".join([html_lower, js_links, form_links, image_links]) payment_patterns = { "Visa": r"\bvisa\b|visa-logo|/visa\.(svg|png|jpg|gif)", "Mastercard": r"mastercard|/mastercard\.(svg|png|jpg|gif)|master-card|master card", "American Express": r"american[\s-]*express|amex|/amex\.(svg|png|jpg|gif)", "PayPal": r"paypal\.com|paypal-button|www\.paypalobjects\.com|paypalcheckout|paypal\.me", "Stripe": r"js\.stripe\.com|stripe\.com|Stripe\.(setPublishableKey|card)|stripe-v3|stripe-elements", "Apple Pay": r"apple[\s-]*pay|apple-pay", "Google Pay": r"google[\s-]*pay|pay.google.com|google-pay", "Shop Pay": r"shopify\.com\/shop_pay|shopify|shop-pay", "Discover": r"discover|discover-logo|/discover\.(svg|png|jpg|gif)", "UnionPay": r"unionpay|union-pay", "JCB": r"\bjcb\b", "Alipay": r"alipay|alipayjsbridge|alipay\.com", "WeChat Pay": r"wechatpay|weixin\.qq\.com|wechat[\s-]*pay", "Square": r"squareup\.com|square\.(js|cdn)|sq-", "Authorize.Net": r"authorize\.net|secure2.authorize\.net", "Braintree": r"braintree\.gateway|braintree\.js|braintree", "Adyen": r"adyen|checkoutshopper|adyen-checkout", "Worldpay": r"worldpay|secure\.worldpay", "SagePay": r"sagepay|opayo", "Klarna": r"klarna|klarna-checkout", "Amazon Pay": r"amazonpay|static-na\.amzn\.com|amazon-pay", "Payoneer": r"payoneer", "Razorpay": r"razorpay|checkout\.razorpay\.com", "2Checkout": r"2checkout|2co", "Mollie": r"mollie|checkout\.mollie", "PayU": r"payu|payu\.com", "MercadoPago": r"mercadopago|mercadopago\.com", "CyberSource": r"cybersource|ics2wsa", "Afterpay": r"afterpay|clearpay", "Paystack": r"paystack|js\.paystack\.co", "ePDQ": r"epdq|ogone", "Checkout.com": r"checkout\.com|checkoutjs", "GreenPay": r"greenpay" } for method, pattern in payment_patterns.items(): try: if re.search(pattern, combined, re.I): if method not in detected_methods: detected_methods.append(method) except re.error: if pattern.lower() in combined: if method not in detected_methods: detected_methods.append(method) checkout_indicators = [r"/checkout", r"/cart", r"/pay", r"/payment", r"/order", r"/billing"] for pat in checkout_indicators: if re.search(pat, form_links + html_lower): if "E-Commerce/Checkout" not in detected_methods: detected_methods.append("E-Commerce/Checkout") return detected_methods # -------------------- Tracker and Analytics Detection -------------------- def detect_trackers_and_analytics(html: str, js_links: list = None, meta_tags: list = None) -> list: detected_trackers = [] html_lower = (html or "").lower() tracker_patterns = { "Google Analytics": r"google-analytics\.com/|gtag\.js|analytics\.js", "Google Tag Manager": r"googletagmanager\.com", "Facebook Pixel": r"connect\.facebook\.net/en_US/fbevents\.js|fbq\(", "Hotjar": r"hotjar\.com|hjid", "Matomo (Piwik)": r"matomo\.js", "TikTok Pixel": r"ttq\.load" } for tracker, pattern in tracker_patterns.items(): if re.search(pattern, html_lower): detected_trackers.append(tracker) all_js_links = " ".join([link.lower() for link in (js_links or [])]) for tracker, pattern in tracker_patterns.items(): if re.search(pattern, all_js_links): if tracker not in detected_trackers: detected_trackers.append(tracker) meta_content = " ".join([tag.get('content', '').lower() for tag in (meta_tags or [])]) for tracker, pattern in tracker_patterns.items(): if re.search(pattern, meta_content): if tracker not in detected_trackers: detected_trackers.append(tracker) return detected_trackers # -------------------- IP info -------------------- def get_ip_info(ip: str) -> Dict: res = {"source": "ipwhois", "timestamp": datetime.utcnow().isoformat() + "Z"} try: obj = ipwhois.IPWhois(ip).lookup_rdap(depth=1) res["asn"] = obj.get("asn") res["asn_cidr"] = obj.get("asn_cidr") res["asn_country_code"] = obj.get("asn_country_code") res["asn_description"] = obj.get("asn_description") res["network"] = obj.get("network") except Exception as e: logger.debug(f"IPWhois lookup failed for {ip}: {e}") res["error"] = str(e) return res # -------------------- WAF detection -------------------- def detect_waf_subprocess(url: str) -> dict: result = {"detected": False, "provider": None, "confidence": 0.0, "evidence": []} try: proc = subprocess.run(["wafw00f", "-a", url], capture_output=True, text=True, timeout=20) out = (proc.stdout or "") + (proc.stderr or "") if proc.returncode == 0 and out: lines = out.splitlines() for ln in lines: for provider in ["Cloudflare", "Imperva", "Akamai", "Fastly", "Sucuri", "F5", "ModSecurity", "AWS WAF", "Fortinet", "Barracuda", "Incapsula"]: if provider.lower() in ln.lower(): result.update({"detected": True, "provider": provider, "confidence": 0.9, "evidence": ["wafw00f-output"]}) return result except Exception: pass try: parsed = urlparse(url) try: r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url)) headers = {k.lower(): v for k, v in dict(r.headers).items()} body_snippet = (r.text or "")[:3000] cookie_keys = " ".join([c.name for c in getattr(r, "cookies", [])]) if hasattr(r, "cookies") else "" except Exception as e: headers = {} body_snippet = "" cookie_keys = "" header_indicators = { "Cloudflare": ["cf-ray", "server: cloudflare", "cf-cache-status", "cf-request-id"], "Imperva": ["x-iinfo", "incapsula", "visid_incap_"], "Akamai": ["x-akamai-transformed", "akamai", "akamaiedge", "akamaitechnologies"], "Fastly": ["x-served-by", "x-cache", "x-fastly-backend-request-id"], "Sucuri": ["x-sucuri-cache", "x-sucuri-id"], "F5": ["bigipserver", "x-lb"], "ModSecurity": ["mod_security", "mod_sec"], "AWS WAF": ["x-amzn-requestid", "x-amz-cf-id"], "Fortinet": ["fortigate", "f5-"], "Barracuda": ["barracuda"], "Incapsula": ["visid_incap_"] } for provider, sigs in header_indicators.items(): for sig in sigs: try: if ":" in sig: hname, hv = [s.strip() for s in sig.split(":", 1)] hv = hv.lower() if headers.get(hname) and hv in headers.get(hname, "").lower(): result.update({"detected": True, "provider": provider, "confidence": 0.75, "evidence": [f"header:{hname}"]}) return result else: if any(sig in h for h in headers.keys()): result.update({"detected": True, "provider": provider, "confidence": 0.7, "evidence": [f"header_contains:{sig}"]}) return result if sig in body_snippet.lower(): result.update({"detected": True, "provider": provider, "confidence": 0.6, "evidence": ["body_snippet"]}) return result if re.search(re.escape(sig), cookie_keys, re.I): result.update({"detected": True, "provider": provider, "confidence": 0.65, "evidence": ["cookie_name"]}) return result except Exception: continue challenge_patterns = [r"attention required", r"access denied", r"please enable cookies", r"security check", r"verify you are a human", r"challenge.*cloudflare"] for pat in challenge_patterns: if re.search(pat, body_snippet, re.I): result.update({"detected": True, "provider": "Unknown (challenge page)", "confidence": 0.5, "evidence": ["challenge_pattern"]}) return result except Exception as e: logger.debug(f"WAF detection error heuristics: {e}") return result # -------------------- CDN detection -------------------- def detect_cdn_from_headers_and_dns(headers: dict, dns_records: dict, ip: str = None, extracted_data: dict = None) -> dict: detected = {"source": None, "provider": None, "confidence": 0, "reasons": []} headers_lower = {k.lower(): v for k, v in (headers or {}).items()} extracted_data = extracted_data or {} cdn_header_signatures = { "Cloudflare": ["cf-ray", "cf-cache-status", "server: cloudflare", "cf-request-id"], "Akamai": ["x-akamai-transformed", "x-akamai-request-id", "akamai"], "Amazon CloudFront": ["x-amz-cf-id", "via: 1.1 cloudfront", "x-cache"], "Fastly": ["x-served-by", "x-fastly-backend-request-id", "x-cache"], "Sucuri": ["x-sucuri-cache", "x-sucuri-id"], "Google Cloud CDN": ["x-goog-gfe-response-headers", "x-google-gfe"], "Incapsula": ["x-iinfo", "visid_incap_"], "Azure CDN": ["cdn-io", "azureedge", "azurefd", "akadns"], "Netlify": ["netlify"], "Cloudflare Stream": ["cf-stream"], "BunnyCDN": ["bunnycdn"], "StackPath": ["stackpathcdn"], "KeyCDN": ["x-keycdn"], "CDN77": ["cdn77"], "Akamai EdgeKey": ["edgekey.net"] } for provider, sigs in cdn_header_signatures.items(): for sig in sigs: if any(sig in h for h in headers_lower.keys()) or any(sig in v.lower() for v in headers_lower.values()): detected.update({"source": "Headers", "provider": provider, "confidence": 95}) detected["reasons"].append(f"header signature matched {sig}") return detected cname_records = dns_records.get("CNAME", []) if dns_records else [] try: candidate_host = cname_records[0] if cname_records else None cname_chain = resolve_cname_chain(candidate_host) if candidate_host else [] cname_patterns = { "Cloudflare": r"cloudflare|cloudfront|cloudflare.net", "Akamai": r"akamai|akamaiedge|akamaitechnologies|edgekey\.net|akamaiedge\.net", "Amazon CloudFront": r"cloudfront\.net", "Fastly": r"fastly\.net|fastly", "Incapsula": r"incapsula|imperva", "Sucuri": r"sucuri\.net|sucuri", "Azure CDN": r"azureedge|azurefd|z6rungcdn|azure", "Netlify": r"netlify\.app|netlify", "BunnyCDN": r"bunnycdn", "StackPath": r"stackpathdns", "KeyCDN": r"kccdn|kxcdn", "CDN77": r"cdn77", } for provider, pattern in cname_patterns.items(): for cname in (cname_records + cname_chain): if re.search(pattern, cname, re.I): detected.update({"source": "DNS CNAME", "provider": provider, "confidence": 85}) detected["reasons"].append(f"CNAME {cname} matches {provider}") return detected except Exception as e: logger.debug(f"CDN CNAME check error: {e}") try: asset_hosts = set() for linklist in ("js_links", "css_links", "image_links", "form_links"): for a in extracted_data.get(linklist, []): try: p = urlparse(a) if p.hostname: asset_hosts.add(p.hostname.lower()) except Exception: continue asset_hosts_list = list(asset_hosts) asset_host_patterns = { "Cloudflare": ["cloudflare", "cdn-cdn.cloudflare", "cloudflare.net", "cdn-cgi"], "Akamai": ["akamai.net", "akamaiedge", "akamaitechnologies", "edgekey.net"], "Fastly": ["fastly.net", "fastly"], "Amazon CloudFront": ["cloudfront.net", "amazonaws.com"], "Netlify": ["netlify.app", "netlify"], "BunnyCDN": ["b-cdn.net", "bunnycdn"], "Google Cloud CDN": ["googleusercontent.com", "googleapis.com"], "KeyCDN": ["kxcdn", "kccdn"], "CDN77": ["cdn77"], "StackPath": ["stackpathcdn", "stackpathdns"] } for provider, pats in asset_host_patterns.items(): for pat in pats: for ah in asset_hosts_list: if pat in ah: detected.update({"source": "Asset Hosts", "provider": provider, "confidence": 80}) detected["reasons"].append(f"asset host {ah} contains {pat}") return detected except Exception as e: logger.debug(f"Asset host analysis error: {e}") return detected # -------------------- Main async scan (IMPROVED) -------------------- async def main_async_scan(url: str): scan_start = datetime.utcnow().isoformat() + "Z" try: logger.info(f"Starting scan for {url}") # Step 1: Try Playwright render (get content + headers) dynamic_html, dynamic_headers, dynamic_final_url = await get_dynamic_html(url) final_html = dynamic_html or "" final_headers = dynamic_headers or {} final_url = dynamic_final_url or url static_response = None # If no dynamic content, try static fetch (async) if not final_html: logger.info("Dynamic fetch empty; attempting static fetch...") static_response = await fetch_static(url) if static_response and static_response.status_code == 200: final_html = static_response.text or "" final_headers = dict(static_response.headers or {}) final_url = str(static_response.url or url) else: # fallback sync attempt to capture headers/body try: r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url)) if r.status_code == 200: final_html = r.text or "" final_headers = dict(r.headers or {}) final_url = str(r.url or url) else: logger.warning(f"Static fetch returned {r.status_code} for {url}") except Exception as e: logger.debug(f"Sync fallback static fetch failed: {e}") else: # We have dynamic HTML; ensure we also have headers (use static fetch or HEAD if headers missing) if not final_headers: try: head_resp = httpx.head(final_url, follow_redirects=True, timeout=8, headers=get_realistic_headers(final_url)) if head_resp and head_resp.status_code < 400: final_headers = dict(head_resp.headers or {}) else: r2 = httpx.get(final_url, follow_redirects=True, timeout=10, headers=get_realistic_headers(final_url)) if r2: final_headers = dict(r2.headers or {}) except Exception as e: logger.debug(f"Failed to fetch headers fallback: {e}") # store raw evidence: headers + body raw_evidence = {} if final_html: raw_body_bytes = (final_html.encode("utf-8") if isinstance(final_html, str) else (final_html or b"")) raw_evidence["body"] = store_raw_evidence(raw_body_bytes, prefix="body") if final_headers: try: hdr_bytes = json.dumps(dict(final_headers), ensure_ascii=False).encode("utf-8") raw_evidence["headers"] = store_raw_evidence(hdr_bytes, prefix="headers") except Exception: raw_evidence["headers"] = {"error": "failed_to_store_headers"} # Step 2: Extract links and resources (ensure final_url passed) logger.info("Extracting links and resources...") extracted_data = extract_links_and_scripts(final_html or "", final_url) js_links = extracted_data.get("js_links", []) css_links = extracted_data.get("css_links", []) # Step 3: Run detection tasks concurrently logger.info("Detecting technologies (Wappalyzer/BuiltWith/JS/CSS heuristics)...") tasks = [ detect_technologies_wappalyzer(final_url, final_html or "", final_headers), detect_technologies_builtwith(final_url), detect_js_technologies(js_links, final_url, final_html or ""), detect_css_technologies(css_links, final_html or "") ] wappalyzer_res, builtwith_res, js_res, css_res = await asyncio.gather(*tasks) # Step 4: Combine technologies all_tech = (wappalyzer_res or []) + (builtwith_res or []) + (js_res or []) + (css_res or []) tech_map: Dict[str, Any] = {} for tech in all_tech: name = tech.get("name") if not name: continue existing = tech_map.get(name) confidence = float(tech.get("confidence", 50)) if existing: existing_conf = float(existing.get("confidence", 0)) existing["confidence"] = max(existing_conf, confidence) existing_sources = set([s.strip() for s in str(existing.get("source", "")).split(",") if s]) incoming_source = tech.get("source") or "" if incoming_source and incoming_source not in existing_sources: existing_sources.add(incoming_source) existing["source"] = ", ".join(sorted(existing_sources)) existing_prov = set(existing.get("provenance", []) or []) incoming_prov = set(tech.get("provenance", []) or []) existing["provenance"] = list(existing_prov.union(incoming_prov)) if tech.get("version") and existing.get("version") in (None, "Unknown"): existing["version"] = tech.get("version") else: tech_map[name] = { "name": name, "version": tech.get("version", "Unknown"), "confidence": confidence, "source": tech.get("source", ""), "provenance": tech.get("provenance", []) or [] } combined_tech = list(tech_map.values()) combined_tech.sort(key=lambda x: x.get("confidence", 0), reverse=True) # Step 5: DNS and SSL parsed = urlparse(final_url) domain = parsed.netloc.split(":")[0] if parsed.netloc else "" dns_records = get_dns_records(domain) if domain else {} ssl_info = {} if parsed.scheme == "https" and domain: ssl_info = get_ssl_info(domain) # Step 6: IP info ip_info = {} if dns_records.get("A"): ip = dns_records["A"][0] if isinstance(dns_records["A"], list) and dns_records["A"] else dns_records["A"] ip_info = get_ip_info(ip) # Step 7: robots.txt robots_info = await analyze_robots(domain) if domain else {"exists": False, "tried": [], "error": "no domain"} # Step 8: Security headers and CMS detection security_headers = analyze_security_headers(final_headers) cms_info = detect_cms(final_html or "", final_headers or {}, final_url, extracted_data=extracted_data) # Step 9: payments and trackers payment_methods_info = detect_payment_methods(final_html or "", extracted_data=extracted_data) trackers_info = detect_trackers_and_analytics(final_html or "", js_links=extracted_data.get("js_links", []), meta_tags=extracted_data.get("meta_tags", [])) # Step 10: WAF & CDN heuristics waf_info = detect_waf_subprocess(final_url) cdn_info = detect_cdn_from_headers_and_dns(final_headers or {}, dns_records or {}, ip_info.get("asn_cidr") if ip_info else None, extracted_data=extracted_data) # Inference rules for Cloudflare try: if (not cdn_info.get("provider")) and waf_info.get("provider") and "cloudflare" in (waf_info.get("provider") or "").lower(): cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 90, "reasons": ["waf indicates Cloudflare"]}) elif (not cdn_info.get("provider")) and ip_info and ip_info.get("asn_description") and "cloudflare" in str(ip_info.get("asn_description")).lower(): cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 85, "reasons": ["ip whois ASN indicates Cloudflare"]}) else: ns_list = dns_records.get("NS", []) or [] if (not cdn_info.get("provider")): for ns in ns_list: if "cloudflare" in ns.lower(): cdn_info.update({"source": "dns", "provider": "Cloudflare", "confidence": 85, "reasons": [f"NS {ns} indicates Cloudflare"]}) break except Exception: pass # Build final report title = "No Title" try: soup = BeautifulSoup(final_html or "", "lxml") if soup.title and soup.title.string: title = soup.title.string.strip() except Exception: title = "No Title" report = { "scan_id": generate_scan_id(), "scanned_at": scan_start, "url": final_url, "title": title, "raw_evidence": raw_evidence, "technologies": combined_tech, "links_and_resources": extracted_data, "dns_records": dns_records, "ssl_info": ssl_info, "ip_info": ip_info, "robots_info": robots_info, "security_headers": security_headers, "cms_info": cms_info, "payment_methods": payment_methods_info, "trackers_and_analytics": trackers_info, "waf_info": waf_info, "cdn_info": cdn_info, "headers": final_headers, "notes": "Report contains provenance (raw_evidence paths) and normalized confidence scores (0-100 for technologies)." } # Normalize confidence to 0-100 for technologies for t in report["technologies"]: try: t_conf = float(t.get("confidence", 50)) if 0 <= t_conf <= 1: t["confidence"] = int(round(t_conf * 100)) else: t["confidence"] = int(round(min(max(t_conf, 0), 100))) except Exception: t["confidence"] = 50 return safe_json(report) except Exception as e: logger.exception("Main scan failed") return safe_json({"error": "Main scan failed", "details": str(e), "scanned_at": scan_start}) # -------------------- Convenience wrapper used by analyze_site.py -------------------- async def run_scan_for_url(url: str, render_js: bool = False, scan_id: Optional[str] = None) -> Dict[str, Any]: try: report = await main_async_scan(url) if not isinstance(report, dict): report = {"error": "invalid_report", "details": "Scanner returned non-dict result", "raw": str(report)} report.setdefault("scanned_url", report.get("url", url)) if scan_id: report["scan_id"] = scan_id report.setdefault("url", report.get("scanned_url")) report.setdefault("technologies", report.get("technologies", [])) report.setdefault("dns_records", report.get("dns_records", {})) report.setdefault("robots_info", report.get("robots_info", {"exists": False})) report.setdefault("headers", report.get("headers", {})) # compatibility aliases report.setdefault("waf", report.get("waf_info")) report.setdefault("cdn", report.get("cdn_info")) report.setdefault("payments", report.get("payment_methods")) return report except Exception as e: logger.exception("run_scan_for_url wrapper failed") return safe_json({"error": "run_scan_for_url_failed", "details": str(e), "scanned_url": url}) if __name__ == '__main__': # quick smoke test when running standalone test_url = "https://www.google.com" # note: run async via: python -c "import asyncio, utils; asyncio.run(utils.main_async_scan('https://example.com'))" pass