1424 أسطر
64 KiB
Python
1424 أسطر
64 KiB
Python
# utils.py
|
|
# SuperRecon utils - improved (compat_resources + Wappalyzer DB validation + regex fixes)
|
|
import os
|
|
import re
|
|
import json
|
|
import socket
|
|
import logging
|
|
import ssl
|
|
import gzip
|
|
import OpenSSL
|
|
import dns.resolver
|
|
import httpx
|
|
from urllib.parse import urljoin, urlparse, quote_plus
|
|
from bs4 import BeautifulSoup
|
|
from datetime import datetime, date, timezone
|
|
from collections import defaultdict
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
import asyncio
|
|
import random
|
|
import ipaddress
|
|
import ipwhois
|
|
import time
|
|
from functools import lru_cache
|
|
from playwright.async_api import async_playwright
|
|
import whois
|
|
from Wappalyzer import Wappalyzer, WebPage
|
|
import builtwith
|
|
import subprocess
|
|
import hashlib
|
|
|
|
# optional import for charset detection (best-effort)
|
|
try:
|
|
from charset_normalizer import from_bytes
|
|
except Exception:
|
|
from_bytes = None
|
|
|
|
# optional brotli decompress
|
|
try:
|
|
import brotli
|
|
except Exception:
|
|
brotli = None
|
|
|
|
# -------------------- Logger setup --------------------
|
|
logger = logging.getLogger("SuperRecon.utils")
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
logger.setLevel(os.environ.get("SUPERR_LOG_LEVEL", "INFO"))
|
|
|
|
# Directory to store raw evidence
|
|
EVIDENCE_DIR = os.environ.get("SUPERR_EVIDENCE_DIR", "./evidence")
|
|
os.makedirs(EVIDENCE_DIR, exist_ok=True)
|
|
|
|
|
|
# -------------------- Compatibility layer (replacement for pkg_resources) --------------------
|
|
# Provides: get_version, resource_bytes, resource_text, resource_path (context manager),
|
|
# iter_entry_points, load_entry_point, parse_requirement, installed_distributions, dist_metadata
|
|
try:
|
|
# importlib.metadata (stdlib) with backport fallback
|
|
from importlib.metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore
|
|
except Exception:
|
|
from importlib_metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore
|
|
|
|
# importlib.resources with backport fallback
|
|
try:
|
|
from importlib.resources import files, as_file, read_binary, read_text # type: ignore
|
|
except Exception:
|
|
from importlib_resources import files, as_file, read_binary, read_text # type: ignore
|
|
|
|
from contextlib import contextmanager
|
|
from packaging.requirements import Requirement
|
|
from packaging.version import Version, InvalidVersion
|
|
|
|
def get_version(package_name: str) -> Optional[str]:
|
|
try:
|
|
return _version(package_name)
|
|
except Exception:
|
|
return None
|
|
|
|
def resource_bytes(package: str, resource: str) -> bytes:
|
|
return read_binary(package, resource)
|
|
|
|
def resource_text(package: str, resource: str, encoding: str = "utf-8") -> str:
|
|
return read_text(package, resource, encoding=encoding)
|
|
|
|
@contextmanager
|
|
def resource_path(package: str, resource: str):
|
|
"""
|
|
Yields a filesystem Path for resource if possible.
|
|
Usage:
|
|
with resource_path('mypkg', 'data/file.txt') as p:
|
|
open(p)...
|
|
"""
|
|
p = files(package).joinpath(resource)
|
|
with as_file(p) as fp:
|
|
yield fp
|
|
|
|
class EP:
|
|
def __init__(self, ep):
|
|
self._ep = ep
|
|
|
|
@property
|
|
def name(self):
|
|
return self._ep.name
|
|
|
|
@property
|
|
def value(self):
|
|
return self._ep.value
|
|
|
|
def load(self):
|
|
return self._ep.load()
|
|
|
|
def iter_entry_points(group: str):
|
|
eps = entry_points()
|
|
try:
|
|
group_eps = eps.select(group=group) # py3.10+
|
|
except Exception:
|
|
try:
|
|
group_eps = [e for e in eps if getattr(e, "group", None) == group]
|
|
except Exception:
|
|
group_eps = eps.get(group, []) # type: ignore
|
|
for e in group_eps:
|
|
yield EP(e)
|
|
|
|
def load_entry_point(group: str, name: str):
|
|
for ep in iter_entry_points(group):
|
|
if ep.name == name:
|
|
return ep.load()
|
|
raise LookupError(f"entry point {group}:{name} not found")
|
|
|
|
def parse_requirement(req_str: str) -> Requirement:
|
|
return Requirement(req_str)
|
|
|
|
def installed_distributions():
|
|
for dist in distributions():
|
|
yield dist
|
|
|
|
def dist_metadata(name: str):
|
|
try:
|
|
return distribution(name).metadata
|
|
except PackageNotFoundError:
|
|
return None
|
|
|
|
def dist_files(name: str):
|
|
try:
|
|
return distribution(name).files
|
|
except PackageNotFoundError:
|
|
return None
|
|
|
|
|
|
# -------------------- Safe JSON Helpers --------------------
|
|
def _make_json_safe(obj):
|
|
if obj is None or isinstance(obj, (bool, int, float, str)):
|
|
return obj
|
|
if isinstance(obj, dict):
|
|
new = {}
|
|
for k, v in obj.items():
|
|
try:
|
|
key = str(k)
|
|
except Exception:
|
|
key = repr(k)
|
|
new[key] = _make_json_safe(v)
|
|
return new
|
|
if isinstance(obj, (list, tuple, set)):
|
|
return [_make_json_safe(i) for i in obj]
|
|
try:
|
|
if isinstance(obj, (datetime, date)):
|
|
return obj.isoformat()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
import httpx as _httpx
|
|
if isinstance(obj, _httpx.Response):
|
|
try:
|
|
text_snippet = obj.text[:1000]
|
|
except Exception:
|
|
text_snippet = None
|
|
return {
|
|
"status_code": obj.status_code,
|
|
"url": str(obj.url) if hasattr(obj, "url") else None,
|
|
"headers": dict(obj.headers) if hasattr(obj, "headers") else None,
|
|
"text_snippet": text_snippet
|
|
}
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return str(obj)
|
|
except Exception:
|
|
return repr(obj)
|
|
|
|
def safe_json(obj):
|
|
try:
|
|
safe = _make_json_safe(obj)
|
|
json.dumps(safe, ensure_ascii=False)
|
|
return safe
|
|
except Exception as e:
|
|
logger.exception("safe_json conversion failed")
|
|
return {
|
|
"error": "safe_json_conversion_failed",
|
|
"error_str": str(e),
|
|
"repr": repr(obj)[:2000]
|
|
}
|
|
|
|
|
|
# -------------------- UUID Generator --------------------
|
|
def generate_scan_id():
|
|
import uuid
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
# -------------------- Stealth Mode Enhancements --------------------
|
|
def get_random_user_agent():
|
|
user_agents = [
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15",
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edg/120.0.0.0"
|
|
]
|
|
return random.choice(user_agents)
|
|
|
|
def get_realistic_headers(url: Optional[str] = None):
|
|
from urllib.parse import urlparse
|
|
time.sleep(random.uniform(0.02, 0.15))
|
|
domain = urlparse(url).netloc if url else "example.com"
|
|
user_agent = get_random_user_agent()
|
|
accept_headers = {
|
|
"Chrome": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
|
"Firefox": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
|
|
"Safari": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Edge": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
|
|
"Opera": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
|
|
}
|
|
browser = "Chrome"
|
|
if "Firefox" in user_agent:
|
|
browser = "Firefox"
|
|
elif "Safari" in user_agent and "Chrome" not in user_agent:
|
|
browser = "Safari"
|
|
elif "Edg" in user_agent or "Edge" in user_agent:
|
|
browser = "Edge"
|
|
languages = ["en-US,en;q=0.9", "en-GB,en;q=0.9", "ar-JO,ar;q=0.9,en;q=0.8", "fr-FR,fr;q=0.9,en;q=0.8"]
|
|
encodings = ["gzip, deflate, br", "gzip, deflate", "gzip, br", "deflate, br"]
|
|
headers = {
|
|
"User-Agent": user_agent,
|
|
"Accept": accept_headers.get(browser, accept_headers["Chrome"]),
|
|
"Accept-Language": random.choice(languages),
|
|
"Accept-Encoding": random.choice(encodings),
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
"DNT": "1",
|
|
"Sec-Fetch-Dest": "document",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Sec-Fetch-Site": "none",
|
|
"Sec-Fetch-User": "?1",
|
|
"Referer": f"https://www.google.com/search?q={domain.replace('.', '+')}",
|
|
"Cache-Control": "max-age=0"
|
|
}
|
|
return headers
|
|
|
|
|
|
# -------------------- Evidence storage --------------------
|
|
def store_raw_evidence(content: bytes, prefix: str = "body") -> Dict[str, Any]:
|
|
sha = hashlib.sha256(content).hexdigest()
|
|
filename = f"{prefix}_{sha}.bin"
|
|
path = os.path.join(EVIDENCE_DIR, filename)
|
|
try:
|
|
if not os.path.exists(path):
|
|
with open(path, "wb") as fh:
|
|
fh.write(content)
|
|
return {"path": path, "sha256": sha, "timestamp": datetime.utcnow().isoformat() + "Z"}
|
|
except Exception as e:
|
|
logger.debug(f"Failed to store evidence: {e}")
|
|
return {"error": str(e)}
|
|
|
|
|
|
# -------------------- Retry/backoff wrapper (async) --------------------
|
|
async def async_request_with_retry(method: str, url: str, client: httpx.AsyncClient, max_retries: int = 4,
|
|
base_delay: float = 0.5, timeout: int = 15, headers: dict = None):
|
|
attempt = 0
|
|
while attempt <= max_retries:
|
|
try:
|
|
attempt += 1
|
|
resp = await client.request(method, url, timeout=timeout, headers=headers)
|
|
if resp.status_code == 429 or (500 <= resp.status_code < 600 and resp.status_code != 501):
|
|
raise httpx.HTTPStatusError("Retryable status", request=resp.request, response=resp)
|
|
return resp
|
|
except Exception as e:
|
|
if attempt > max_retries:
|
|
logger.debug(f"Request failed (max retries) for {url}: {e}")
|
|
return None
|
|
sleep = base_delay * (2 ** (attempt - 1))
|
|
jitter = random.uniform(0, sleep)
|
|
await asyncio.sleep(jitter)
|
|
return None
|
|
|
|
|
|
# -------------------- WHOIS --------------------
|
|
def whois_lookup(domain: str) -> dict:
|
|
try:
|
|
w = whois.whois(domain)
|
|
return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "data": safe_json(w)}
|
|
except Exception as e:
|
|
logger.debug(f"whois_lookup error: {e}")
|
|
return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "error": str(e)}
|
|
|
|
|
|
# -------------------- DNS --------------------
|
|
@lru_cache(maxsize=256)
|
|
def get_dns_records(domain: str) -> Dict[str, List[str]]:
|
|
records = defaultdict(list)
|
|
try:
|
|
for rtype in ("A", "AAAA", "CNAME", "MX", "NS", "TXT"):
|
|
try:
|
|
answers = dns.resolver.resolve(domain, rtype, lifetime=5)
|
|
for r in answers:
|
|
records[rtype].append(str(r).strip())
|
|
except Exception:
|
|
continue
|
|
except Exception as e:
|
|
logger.debug(f"get_dns_records error: {e}")
|
|
return dict(records)
|
|
|
|
def resolve_cname_chain(hostname: str, max_depth: int = 6) -> List[str]:
|
|
chain = []
|
|
try:
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.lifetime = 5
|
|
curr = hostname
|
|
for _ in range(max_depth):
|
|
try:
|
|
answers = resolver.resolve(curr, "CNAME")
|
|
if not answers:
|
|
break
|
|
target = str(answers[0].target).rstrip(".")
|
|
chain.append(target)
|
|
curr = target
|
|
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.NoNameservers):
|
|
break
|
|
except Exception:
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"resolve_cname_chain error for {hostname}: {e}")
|
|
return chain
|
|
|
|
|
|
# -------------------- SSL/TLS info --------------------
|
|
def get_ssl_info(domain: str) -> Dict[str, Any]:
|
|
res = {
|
|
"valid": False,
|
|
"issuer": None,
|
|
"subject": None,
|
|
"not_before": None,
|
|
"not_after": None,
|
|
"expired": None,
|
|
"san": [],
|
|
"raw_pem": None,
|
|
"error": None
|
|
}
|
|
try:
|
|
ctx = ssl.create_default_context()
|
|
with socket.create_connection((domain, 443), timeout=5) as sock:
|
|
with ctx.wrap_socket(sock, server_hostname=domain) as ss:
|
|
der = ss.getpeercert(binary_form=True)
|
|
pem = ssl.DER_cert_to_PEM_cert(der)
|
|
res["raw_pem"] = pem
|
|
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
|
|
res["issuer"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v
|
|
for k, v in x509.get_issuer().get_components()}
|
|
res["subject"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v
|
|
for k, v in x509.get_subject().get_components()}
|
|
not_before = x509.get_notBefore()
|
|
not_after = x509.get_notAfter()
|
|
res["not_before"] = not_before.decode() if isinstance(not_before, bytes) else str(not_before)
|
|
res["not_after"] = not_after.decode() if isinstance(not_after, bytes) else str(not_after)
|
|
for i in range(x509.get_extension_count()):
|
|
ext = x509.get_extension(i)
|
|
if ext.get_short_name() == b'subjectAltName':
|
|
res["san"] = [s.strip() for s in str(ext).split(',')]
|
|
res["valid"] = True
|
|
try:
|
|
dt = datetime.strptime(res["not_after"][:14], "%Y%m%d%H%M%S")
|
|
res["expired"] = dt < datetime.utcnow()
|
|
except Exception:
|
|
res["expired"] = None
|
|
except Exception as e:
|
|
res["error"] = str(e)
|
|
logger.debug(f"get_ssl_info error for {domain}: {e}")
|
|
return res
|
|
|
|
|
|
# -------------------- Robots (try https then http, handle encodings/charset/compression) --------------------
|
|
async def analyze_robots(domain: str) -> Dict[str, Any]:
|
|
tried = []
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
for scheme in ("https://", "http://"):
|
|
url = f"{scheme}{domain}/robots.txt"
|
|
tried.append(url)
|
|
headers = get_realistic_headers(url)
|
|
r = await async_request_with_retry("GET", url, client, headers=headers, timeout=10)
|
|
if not r:
|
|
continue
|
|
if r.status_code == 200:
|
|
raw = r.content or b""
|
|
ev = store_raw_evidence(raw, prefix="robots")
|
|
text = None
|
|
# if content is compressed (gzip)
|
|
try:
|
|
if raw.startswith(b'\x1f\x8b'):
|
|
try:
|
|
text = gzip.decompress(raw).decode('utf-8', errors='replace')
|
|
except Exception:
|
|
try:
|
|
text = r.text
|
|
except Exception:
|
|
text = None
|
|
elif brotli and (not raw.startswith(b'\x1f\x8b')) and (b'br' in (r.headers.get('content-encoding') or '').lower() or raw[:2] == b'\x8b'):
|
|
try:
|
|
text = brotli.decompress(raw).decode('utf-8', errors='replace')
|
|
except Exception:
|
|
text = None
|
|
else:
|
|
text = None
|
|
except Exception:
|
|
text = None
|
|
|
|
# try charset_normalizer
|
|
if text is None and from_bytes:
|
|
try:
|
|
result = from_bytes(raw)
|
|
best = result.best()
|
|
if best:
|
|
text = best.read()
|
|
except Exception:
|
|
text = None
|
|
if text is None:
|
|
try:
|
|
text = raw.decode(r.encoding or "utf-8", errors="replace")
|
|
except Exception:
|
|
try:
|
|
text = r.text
|
|
except Exception:
|
|
text = raw.decode("utf-8", errors="replace")
|
|
|
|
# sanitize and parse lines
|
|
rules = []
|
|
sitemaps = []
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parts = (line.split(":", 1) + [""])[:2]
|
|
k = parts[0].strip().lower()
|
|
v = parts[1].strip()
|
|
if k == "sitemap":
|
|
sitemaps.append(v)
|
|
else:
|
|
rules.append({"directive": k, "value": v})
|
|
return {"exists": True, "content_snippet": text[:8000], "rules": rules, "sitemaps": sitemaps, "fetched_from": url, "raw_evidence": ev}
|
|
return {"exists": False, "tried": tried, "error": "robots not found or unreachable (checked https and http)"}
|
|
|
|
|
|
# -------------------- Extract links & resources --------------------
|
|
def extract_links_and_scripts(html: str, base_url: str) -> dict:
|
|
if not html:
|
|
return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []}
|
|
try:
|
|
soup = BeautifulSoup(html, "lxml")
|
|
results = {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []}
|
|
base_domain = urlparse(base_url).netloc.split(":")[0] if base_url else ""
|
|
for s in soup.find_all("script", src=True):
|
|
src = s["src"].strip()
|
|
full = urljoin(base_url, src)
|
|
results["js_links"].append(full)
|
|
for l in soup.find_all("link", rel=lambda r: r and "stylesheet" in r, href=True):
|
|
href = l["href"].strip()
|
|
full = urljoin(base_url, href)
|
|
results["css_links"].append(full)
|
|
for m in soup.find_all("meta"):
|
|
results["meta_tags"].append({k: m.get(k) for k in ("name", "property", "content", "http-equiv") if m.get(k)})
|
|
for a in soup.find_all("a", href=True):
|
|
href = a["href"].strip()
|
|
if href.startswith(("mailto:", "tel:", "javascript:", "#")):
|
|
continue
|
|
full = urljoin(base_url, href)
|
|
try:
|
|
netloc = urlparse(full).netloc.split(":")[0]
|
|
except Exception:
|
|
netloc = ""
|
|
if netloc == base_domain:
|
|
results["internal_links"].append(full)
|
|
else:
|
|
results["external_links"].append(full)
|
|
for img in soup.find_all("img", src=True):
|
|
src = img["src"].strip()
|
|
full = urljoin(base_url, src)
|
|
results["image_links"].append(full)
|
|
for form in soup.find_all("form", action=True):
|
|
action = form["action"].strip()
|
|
full = urljoin(base_url, action)
|
|
results["form_links"].append(full)
|
|
if "/api/" in full or "/graphql" in full:
|
|
results["api_links"].append(full)
|
|
for k in ("js_links", "css_links", "internal_links", "external_links", "image_links", "form_links", "api_links"):
|
|
results[k] = list(dict.fromkeys(results[k]))
|
|
return results
|
|
except Exception as e:
|
|
logger.debug(f"extract_links error: {e}")
|
|
return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []}
|
|
|
|
|
|
# -------------------- Playwright render (returns content, headers, final_url) --------------------
|
|
async def get_dynamic_html(url: str, timeout: int = 20) -> Tuple[str, Dict[str, str], str]:
|
|
try:
|
|
async with async_playwright() as pw:
|
|
browser = await pw.chromium.launch(args=["--no-sandbox"], headless=True)
|
|
page = await browser.new_page()
|
|
await page.set_extra_http_headers(get_realistic_headers(url))
|
|
# navigate and capture main response
|
|
resp = await page.goto(url, wait_until="networkidle", timeout=timeout * 1000)
|
|
await asyncio.sleep(0.25)
|
|
content = await page.content()
|
|
# extract headers from the main response if available
|
|
headers = {}
|
|
final_url = url
|
|
try:
|
|
if resp:
|
|
headers = resp.headers or {}
|
|
final_url = resp.url or page.url
|
|
else:
|
|
final_url = page.url
|
|
except Exception:
|
|
headers = {}
|
|
await browser.close()
|
|
headers = {str(k): str(v) for k, v in (headers or {}).items()}
|
|
return content or "", headers, final_url or url
|
|
except Exception as e:
|
|
logger.debug(f"Playwright error: {e}")
|
|
return "", {}, url
|
|
|
|
|
|
# -------------------- Static fetch --------------------
|
|
async def fetch_static(url: str, timeout: int = 15) -> Optional[httpx.Response]:
|
|
headers = get_realistic_headers(url)
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
resp = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers)
|
|
return resp
|
|
|
|
|
|
# -------------------- Wappalyzer helpers: DB validation --------------------
|
|
def _iter_values_recursively(obj):
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
yield from _iter_values_recursively(v)
|
|
elif isinstance(obj, list):
|
|
for i in obj:
|
|
yield from _iter_values_recursively(i)
|
|
elif isinstance(obj, str):
|
|
yield obj
|
|
|
|
def validate_wappalyzer_db(path: str) -> List[Tuple[str, str, str]]:
|
|
"""
|
|
Validate regex patterns inside a Wappalyzer technologies.json file.
|
|
Returns list of tuples: (technology_name, pattern_string, error_message)
|
|
"""
|
|
bad = []
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
except Exception as e:
|
|
logger.debug(f"validate_wappalyzer_db: failed to load JSON: {e}")
|
|
return bad
|
|
for tech_name, tech_def in data.items():
|
|
try:
|
|
for s in _iter_values_recursively(tech_def):
|
|
if not isinstance(s, str):
|
|
continue
|
|
# quick skip for short tokens unlikely to be regex
|
|
if len(s) < 4:
|
|
continue
|
|
try:
|
|
re.compile(s)
|
|
except re.error as rex:
|
|
bad.append((tech_name, s, str(rex)))
|
|
except Exception:
|
|
# ignore other compile-time issues
|
|
continue
|
|
except Exception:
|
|
continue
|
|
return bad
|
|
|
|
|
|
# -------------------- Wappalyzer / BuiltWith / JS/CSS heuristics --------------------
|
|
def compute_tech_confidence_from_wappalyzer(data: dict) -> int:
|
|
confidence = 50
|
|
detection = data.get("detection", {})
|
|
if isinstance(detection, dict):
|
|
if "headers" in detection:
|
|
confidence = max(confidence, 85)
|
|
if "script" in detection or "js" in detection:
|
|
confidence = max(confidence, 80)
|
|
if "meta" in detection:
|
|
confidence = max(confidence, 75)
|
|
return confidence
|
|
|
|
async def detect_technologies_wappalyzer(url: str, html: str, headers: dict) -> list:
|
|
try:
|
|
webpage = WebPage(url, html or "", headers or {})
|
|
# try Wappalyzer.latest() but be resilient
|
|
try:
|
|
w = Wappalyzer.latest()
|
|
except Exception as e:
|
|
# fallback to local DB if available (with validation)
|
|
tech_path = os.path.join(os.path.dirname(__file__), "technologies.json")
|
|
if os.path.exists(tech_path):
|
|
try:
|
|
# validate DB first to log problematic regexs
|
|
bad = validate_wappalyzer_db(tech_path)
|
|
if bad:
|
|
logger.warning(f"Wappalyzer DB contains {len(bad)} invalid regex patterns (showing up to 10).")
|
|
for tname, patt, err in bad[:10]:
|
|
logger.warning(f"Invalid regex in Wappalyzer DB - {tname}: pattern={patt!r} error={err}")
|
|
w = Wappalyzer(tech_path)
|
|
except Exception as e2:
|
|
logger.debug(f"Fallback Wappalyzer load failed: {e2}")
|
|
return []
|
|
else:
|
|
logger.debug("Wappalyzer DB not available and no local fallback")
|
|
return []
|
|
# analyze, but guard against regex runtime errors inside w.analyze
|
|
try:
|
|
results = w.analyze_with_categories(webpage) or {}
|
|
except re.error as rex:
|
|
logger.exception("Wappalyzer analyze raised a regex error — likely a faulty pattern in DB.")
|
|
return []
|
|
except Exception as e:
|
|
logger.debug(f"Wappalyzer analyze failed: {e}")
|
|
return []
|
|
|
|
detected = []
|
|
for name, data in results.items():
|
|
if not isinstance(data, dict):
|
|
continue
|
|
confidence = compute_tech_confidence_from_wappalyzer(data)
|
|
prov = []
|
|
det = data.get("detected", {})
|
|
if det:
|
|
prov.append("wappalyzer-detected")
|
|
categories = data.get("categories", [])
|
|
detected.append({
|
|
"name": name,
|
|
"version": data.get("version", "Unknown"),
|
|
"categories": categories,
|
|
"confidence": confidence,
|
|
"source": "Wappalyzer",
|
|
"provenance": prov
|
|
})
|
|
detected.sort(key=lambda x: x["confidence"], reverse=True)
|
|
return detected
|
|
except Exception as e:
|
|
logger.debug(f"Wappalyzer error: {e}")
|
|
return []
|
|
|
|
|
|
async def detect_technologies_builtwith(url: str) -> list:
|
|
try:
|
|
raw = builtwith.builtwith(url)
|
|
out = []
|
|
for cat, techs in (raw or {}).items():
|
|
for t in techs:
|
|
confidence = 70
|
|
if "cdn" in cat.lower():
|
|
confidence = 90
|
|
if "framework" in cat.lower():
|
|
confidence = 90
|
|
out.append({
|
|
"name": t,
|
|
"category": cat,
|
|
"confidence": confidence,
|
|
"source": "BuiltWith",
|
|
"provenance": ["builtwith-api"]
|
|
})
|
|
out.sort(key=lambda x: x["confidence"], reverse=True)
|
|
return out
|
|
except Exception as e:
|
|
logger.debug(f"BuiltWith error: {e}")
|
|
return []
|
|
|
|
|
|
async def fetch_resource_content(url: str, timeout: int = 10) -> str:
|
|
try:
|
|
headers = get_realistic_headers(url)
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
r = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers)
|
|
if r and r.status_code == 200:
|
|
return r.text or ""
|
|
except Exception as e:
|
|
logger.debug(f"Failed to fetch resource {url}: {e}")
|
|
return ""
|
|
return ""
|
|
|
|
|
|
async def detect_js_technologies(js_links: List[str], base_url: str, html: str) -> list:
|
|
detected = []
|
|
content = " ".join(js_links or []) + " " + (html or "")
|
|
content_l = content.lower()
|
|
indicators = {
|
|
"jQuery": r"jquery[\w-]*\.js|jquery-ui|\$\.fn\.jquery|window\.jquery",
|
|
"React": r"react[\w-]*\.js|react-dom|__react_devtools_global_hook__|data-reactroot",
|
|
"Angular": r"angular[\w-]*\.js|ng-app|angular\.module",
|
|
"Vue.js": r"vue[\w-]*\.js|__vue_devtools_global_hook__|vue-router"
|
|
}
|
|
for tech, pattern in indicators.items():
|
|
try:
|
|
if re.search(pattern, content_l):
|
|
detected.append({"name": tech, "confidence": 70, "source": "JS Heuristics", "provenance": ["inline", "links"]})
|
|
except re.error:
|
|
# fallback: substring check
|
|
if pattern.lower() in content_l:
|
|
detected.append({"name": tech, "confidence": 60, "source": "JS Heuristics (fallback)", "provenance": ["inline", "links"]})
|
|
sem = asyncio.Semaphore(10)
|
|
|
|
async def _fetch(url_):
|
|
async with sem:
|
|
return await fetch_resource_content(url_)
|
|
|
|
tasks = []
|
|
for url_ in (js_links or []):
|
|
tasks.append(_fetch(url_))
|
|
|
|
contents = []
|
|
if tasks:
|
|
try:
|
|
contents = await asyncio.gather(*tasks)
|
|
except Exception:
|
|
contents = []
|
|
|
|
for c in (contents or []):
|
|
c_l = (c or "").lower()
|
|
for tech, pattern in indicators.items():
|
|
try:
|
|
if re.search(pattern, c_l):
|
|
if not any(d["name"] == tech for d in detected):
|
|
detected.append({"name": tech, "confidence": 85, "source": "JS Heuristics", "provenance": ["resource_content"]})
|
|
except re.error:
|
|
if pattern.lower() in c_l:
|
|
if not any(d["name"] == tech for d in detected):
|
|
detected.append({"name": tech, "confidence": 75, "source": "JS Heuristics (fallback)", "provenance": ["resource_content"]})
|
|
return detected
|
|
|
|
|
|
async def detect_css_technologies(css_links: List[str], html: str) -> list:
|
|
detected = []
|
|
content = " ".join(css_links or []) + " " + (html or "")
|
|
content_l = content.lower()
|
|
indicators = {
|
|
"Bootstrap": r"bootstrap[\w-]*\.css|class=['\"].*col-",
|
|
# improved Tailwind detection: look for class attributes containing tw- (utility prefix) or grid-cols, flex- etc.
|
|
"Tailwind CSS": r"tailwind\.min\.css|class=['\"][^'\"]*\btw-|class=['\"].*grid-cols-|class=['\"].*flex-",
|
|
"Materialize": r"materialize[\w-]*\.css"
|
|
}
|
|
for tech, pattern in indicators.items():
|
|
try:
|
|
if re.search(pattern, content_l):
|
|
detected.append({"name": tech, "confidence": 70, "source": "CSS Heuristics", "provenance": ["links_or_inline"]})
|
|
except re.error:
|
|
if pattern.lower() in content_l:
|
|
detected.append({"name": tech, "confidence": 60, "source": "CSS Heuristics (fallback)", "provenance": ["links_or_inline"]})
|
|
sem = asyncio.Semaphore(8)
|
|
|
|
async def _fetch(url_):
|
|
async with sem:
|
|
return await fetch_resource_content(url_)
|
|
|
|
tasks = []
|
|
for url_ in (css_links or []):
|
|
tasks.append(_fetch(url_))
|
|
|
|
contents = []
|
|
if tasks:
|
|
try:
|
|
contents = await asyncio.gather(*tasks)
|
|
except Exception:
|
|
contents = []
|
|
|
|
for c in (contents or []):
|
|
c_l = (c or "").lower()
|
|
for tech, pattern in indicators.items():
|
|
try:
|
|
if re.search(pattern, c_l):
|
|
if not any(d["name"] == tech for d in detected):
|
|
detected.append({"name": tech, "confidence": 85, "source": "CSS Heuristics", "provenance": ["resource_content"]})
|
|
except re.error:
|
|
if pattern.lower() in c_l:
|
|
if not any(d["name"] == tech for d in detected):
|
|
detected.append({"name": tech, "confidence": 75, "source": "CSS Heuristics (fallback)", "provenance": ["resource_content"]})
|
|
return detected
|
|
|
|
|
|
# -------------------- CMS detection --------------------
|
|
def compute_confidence_from_evidence(evidence: List[Dict[str, Any]]) -> float:
|
|
if not evidence:
|
|
return 0.0
|
|
total_possible = sum(float(e.get("weight", 0.0)) for e in evidence)
|
|
if total_possible <= 0:
|
|
return 0.0
|
|
found = sum(float(e.get("weight", 0.0)) for e in evidence if e.get("found"))
|
|
return min(1.0, found / total_possible)
|
|
|
|
def detect_cms(html: str, headers: dict, url: str, extracted_data: dict = None) -> list:
|
|
detected_cms = []
|
|
html_lower = (html or "").lower()
|
|
headers_lower = {k.lower(): v for k, v in (headers or {}).items()}
|
|
extracted_data = extracted_data or {}
|
|
js_links = " ".join(extracted_data.get("js_links", []))
|
|
form_links = " ".join(extracted_data.get("form_links", []))
|
|
image_links = " ".join(extracted_data.get("image_links", []))
|
|
cms_signatures = {
|
|
"WordPress": [
|
|
{"type": "path", "pattern": r"wp-content", "weight": 0.23},
|
|
{"type": "path", "pattern": r"wp-includes", "weight": 0.22},
|
|
{"type": "api", "pattern": r"wp-json", "weight": 0.18},
|
|
{"type": "meta", "pattern": r"<meta name=\"generator\" content=\"wordpress", "weight": 0.12},
|
|
{"type": "cookie", "pattern": r"wordpress_logged_in_", "weight": 0.12},
|
|
{"type": "admin", "pattern": r"/wp-admin/", "weight": 0.13}
|
|
],
|
|
"Joomla": [
|
|
{"type": "meta", "pattern": r"meta name=\"generator\" content=\"joomla", "weight": 0.35},
|
|
{"type": "path", "pattern": r"media\/com_content", "weight": 0.25},
|
|
{"type": "admin", "pattern": r"\/administrator\/", "weight": 0.35}
|
|
],
|
|
"Drupal": [
|
|
{"type": "path", "pattern": r"sites\/default\/files", "weight": 0.35},
|
|
{"type": "path", "pattern": r"\/core\/misc\/drupal\.js", "weight": 0.3},
|
|
{"type": "meta", "pattern": r"<meta name=\"generator\" content=\"drupal", "weight": 0.35}
|
|
],
|
|
"Shopify": [
|
|
{"type": "domain", "pattern": r"cdn\.shopify\.com", "weight": 0.45},
|
|
{"type": "domain", "pattern": r"myshopify\.com", "weight": 0.45},
|
|
{"type": "script", "pattern": r"shopify", "weight": 0.1}
|
|
],
|
|
"Magento": [
|
|
{"type": "path", "pattern": r"mage\/", "weight": 0.3},
|
|
{"type": "meta", "pattern": r"magento", "weight": 0.3},
|
|
{"type": "admin", "pattern": r"/admin/", "weight": 0.2}
|
|
],
|
|
"Wix": [
|
|
{"type": "script", "pattern": r"wix\.com|wixstatic", "weight": 0.6},
|
|
{"type": "meta", "pattern": r"wix", "weight": 0.4}
|
|
],
|
|
"Squarespace": [
|
|
{"type": "script", "pattern": r"squarespace", "weight": 0.6},
|
|
{"type": "meta", "pattern": r"squarespace", "weight": 0.4}
|
|
],
|
|
"Bitrix": [
|
|
{"type": "path", "pattern": r"/bitrix/", "weight": 0.7}
|
|
]
|
|
}
|
|
for cms_name, sigs in cms_signatures.items():
|
|
evidence = []
|
|
for s in sigs:
|
|
found = False
|
|
typ = s["type"]
|
|
pat = s["pattern"]
|
|
try:
|
|
if typ in ("path", "meta", "api"):
|
|
found = bool(re.search(pat, html_lower))
|
|
elif typ == "cookie":
|
|
cookie_header = headers_lower.get("set-cookie", "")
|
|
found = bool(re.search(pat, cookie_header.lower()))
|
|
elif typ == "domain":
|
|
combined = " ".join(list(headers_lower.values())) + " " + form_links.lower() + " " + js_links.lower() + " " + image_links.lower() + " " + url.lower()
|
|
found = bool(re.search(pat, combined))
|
|
elif typ == "script":
|
|
found = bool(re.search(pat, js_links.lower()))
|
|
elif typ == "admin":
|
|
found = bool(re.search(pat, html_lower)) or bool(re.search(pat, form_links.lower()))
|
|
except re.error:
|
|
if typ in ("path", "meta", "api", "script", "admin"):
|
|
found = pat.lower() in html_lower
|
|
elif typ == "domain":
|
|
found = pat.lower() in (" ".join(list(headers_lower.values())) + " " + form_links + " " + js_links + " " + image_links + " " + url).lower()
|
|
evidence.append({"type": typ, "pattern": pat, "weight": s.get("weight", 0.1), "found": found})
|
|
confidence = compute_confidence_from_evidence(evidence)
|
|
if confidence > 0:
|
|
detected_cms.append({
|
|
"name": cms_name,
|
|
"confidence": round(confidence, 3),
|
|
"evidence": evidence,
|
|
"source": "CMS Heuristics",
|
|
"provenance": [e for e in evidence if e["found"]]
|
|
})
|
|
x_gen = headers_lower.get("x-generator", "") or headers_lower.get("server", "")
|
|
if x_gen:
|
|
if "joomla" in x_gen.lower():
|
|
if not any(d["name"] == "Joomla" for d in detected_cms):
|
|
detected_cms.append({"name": "Joomla", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]})
|
|
elif "wordpress" in x_gen.lower() or "wp-" in x_gen.lower():
|
|
if not any(d["name"] == "WordPress" for d in detected_cms):
|
|
detected_cms.append({"name": "WordPress", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]})
|
|
detected_cms.sort(key=lambda x: x["confidence"], reverse=True)
|
|
return detected_cms
|
|
|
|
|
|
# -------------------- Security Headers --------------------
|
|
def analyze_security_headers(headers: dict) -> Dict[str, Any]:
|
|
headers = {k.lower(): v for k, v in (headers or {}).items()}
|
|
security = {
|
|
"x-frame-options": headers.get("x-frame-options"),
|
|
"x-xss-protection": headers.get("x-xss-protection"),
|
|
"x-content-type-options": headers.get("x-content-type-options"),
|
|
"strict-transport-security": headers.get("strict-transport-security"),
|
|
"content-security-policy": headers.get("content-security-policy"),
|
|
"referrer-policy": headers.get("referrer-policy")
|
|
}
|
|
results = {}
|
|
for header, value in security.items():
|
|
if value:
|
|
status = "Implemented"
|
|
if header == "x-frame-options":
|
|
if value.lower() in ["deny", "sameorigin"]:
|
|
status = "Secure"
|
|
else:
|
|
status = "Weak"
|
|
results[header] = {"status": status, "value": value}
|
|
return results
|
|
|
|
|
|
# -------------------- Payment Method Detection --------------------
|
|
def detect_payment_methods(html: str, extracted_data: dict = None) -> list:
|
|
detected_methods = []
|
|
html_lower = (html or "").lower()
|
|
extracted_data = extracted_data or {}
|
|
js_links = " ".join(extracted_data.get("js_links", [])).lower()
|
|
form_links = " ".join(extracted_data.get("form_links", [])).lower()
|
|
image_links = " ".join(extracted_data.get("image_links", [])).lower()
|
|
combined = " ".join([html_lower, js_links, form_links, image_links])
|
|
payment_patterns = {
|
|
"Visa": r"\bvisa\b|visa-logo|/visa\.(svg|png|jpg|gif)",
|
|
"Mastercard": r"mastercard|/mastercard\.(svg|png|jpg|gif)|master-card|master card",
|
|
"American Express": r"american[\s-]*express|amex|/amex\.(svg|png|jpg|gif)",
|
|
"PayPal": r"paypal\.com|paypal-button|www\.paypalobjects\.com|paypalcheckout|paypal\.me",
|
|
"Stripe": r"js\.stripe\.com|stripe\.com|Stripe\.(setPublishableKey|card)|stripe-v3|stripe-elements",
|
|
"Apple Pay": r"apple[\s-]*pay|apple-pay",
|
|
"Google Pay": r"google[\s-]*pay|pay.google.com|google-pay",
|
|
"Shop Pay": r"shopify\.com\/shop_pay|shopify|shop-pay",
|
|
"Discover": r"discover|discover-logo|/discover\.(svg|png|jpg|gif)",
|
|
"UnionPay": r"unionpay|union-pay",
|
|
"JCB": r"\bjcb\b",
|
|
"Alipay": r"alipay|alipayjsbridge|alipay\.com",
|
|
"WeChat Pay": r"wechatpay|weixin\.qq\.com|wechat[\s-]*pay",
|
|
"Square": r"squareup\.com|square\.(js|cdn)|sq-",
|
|
"Authorize.Net": r"authorize\.net|secure2.authorize\.net",
|
|
"Braintree": r"braintree\.gateway|braintree\.js|braintree",
|
|
"Adyen": r"adyen|checkoutshopper|adyen-checkout",
|
|
"Worldpay": r"worldpay|secure\.worldpay",
|
|
"SagePay": r"sagepay|opayo",
|
|
"Klarna": r"klarna|klarna-checkout",
|
|
"Amazon Pay": r"amazonpay|static-na\.amzn\.com|amazon-pay",
|
|
"Payoneer": r"payoneer",
|
|
"Razorpay": r"razorpay|checkout\.razorpay\.com",
|
|
"2Checkout": r"2checkout|2co",
|
|
"Mollie": r"mollie|checkout\.mollie",
|
|
"PayU": r"payu|payu\.com",
|
|
"MercadoPago": r"mercadopago|mercadopago\.com",
|
|
"CyberSource": r"cybersource|ics2wsa",
|
|
"Afterpay": r"afterpay|clearpay",
|
|
"Paystack": r"paystack|js\.paystack\.co",
|
|
"ePDQ": r"epdq|ogone",
|
|
"Checkout.com": r"checkout\.com|checkoutjs",
|
|
"GreenPay": r"greenpay"
|
|
}
|
|
for method, pattern in payment_patterns.items():
|
|
try:
|
|
if re.search(pattern, combined, re.I):
|
|
if method not in detected_methods:
|
|
detected_methods.append(method)
|
|
except re.error:
|
|
if pattern.lower() in combined:
|
|
if method not in detected_methods:
|
|
detected_methods.append(method)
|
|
checkout_indicators = [r"/checkout", r"/cart", r"/pay", r"/payment", r"/order", r"/billing"]
|
|
for pat in checkout_indicators:
|
|
if re.search(pat, form_links + html_lower):
|
|
if "E-Commerce/Checkout" not in detected_methods:
|
|
detected_methods.append("E-Commerce/Checkout")
|
|
return detected_methods
|
|
|
|
|
|
# -------------------- Tracker and Analytics Detection --------------------
|
|
def detect_trackers_and_analytics(html: str, js_links: list = None, meta_tags: list = None) -> list:
|
|
detected_trackers = []
|
|
html_lower = (html or "").lower()
|
|
tracker_patterns = {
|
|
"Google Analytics": r"google-analytics\.com/|gtag\.js|analytics\.js",
|
|
"Google Tag Manager": r"googletagmanager\.com",
|
|
"Facebook Pixel": r"connect\.facebook\.net/en_US/fbevents\.js|fbq\(",
|
|
"Hotjar": r"hotjar\.com|hjid",
|
|
"Matomo (Piwik)": r"matomo\.js",
|
|
"TikTok Pixel": r"ttq\.load"
|
|
}
|
|
for tracker, pattern in tracker_patterns.items():
|
|
if re.search(pattern, html_lower):
|
|
detected_trackers.append(tracker)
|
|
all_js_links = " ".join([link.lower() for link in (js_links or [])])
|
|
for tracker, pattern in tracker_patterns.items():
|
|
if re.search(pattern, all_js_links):
|
|
if tracker not in detected_trackers:
|
|
detected_trackers.append(tracker)
|
|
meta_content = " ".join([tag.get('content', '').lower() for tag in (meta_tags or [])])
|
|
for tracker, pattern in tracker_patterns.items():
|
|
if re.search(pattern, meta_content):
|
|
if tracker not in detected_trackers:
|
|
detected_trackers.append(tracker)
|
|
return detected_trackers
|
|
|
|
|
|
# -------------------- IP info --------------------
|
|
def get_ip_info(ip: str) -> Dict:
|
|
res = {"source": "ipwhois", "timestamp": datetime.utcnow().isoformat() + "Z"}
|
|
try:
|
|
obj = ipwhois.IPWhois(ip).lookup_rdap(depth=1)
|
|
res["asn"] = obj.get("asn")
|
|
res["asn_cidr"] = obj.get("asn_cidr")
|
|
res["asn_country_code"] = obj.get("asn_country_code")
|
|
res["asn_description"] = obj.get("asn_description")
|
|
res["network"] = obj.get("network")
|
|
except Exception as e:
|
|
logger.debug(f"IPWhois lookup failed for {ip}: {e}")
|
|
res["error"] = str(e)
|
|
return res
|
|
|
|
|
|
# -------------------- WAF detection --------------------
|
|
def detect_waf_subprocess(url: str) -> dict:
|
|
result = {"detected": False, "provider": None, "confidence": 0.0, "evidence": []}
|
|
try:
|
|
proc = subprocess.run(["wafw00f", "-a", url], capture_output=True, text=True, timeout=20)
|
|
out = (proc.stdout or "") + (proc.stderr or "")
|
|
if proc.returncode == 0 and out:
|
|
lines = out.splitlines()
|
|
for ln in lines:
|
|
for provider in ["Cloudflare", "Imperva", "Akamai", "Fastly", "Sucuri", "F5", "ModSecurity", "AWS WAF", "Fortinet", "Barracuda", "Incapsula"]:
|
|
if provider.lower() in ln.lower():
|
|
result.update({"detected": True, "provider": provider, "confidence": 0.9, "evidence": ["wafw00f-output"]})
|
|
return result
|
|
except Exception:
|
|
pass
|
|
try:
|
|
parsed = urlparse(url)
|
|
try:
|
|
r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url))
|
|
headers = {k.lower(): v for k, v in dict(r.headers).items()}
|
|
body_snippet = (r.text or "")[:3000]
|
|
cookie_keys = " ".join([c.name for c in getattr(r, "cookies", [])]) if hasattr(r, "cookies") else ""
|
|
except Exception as e:
|
|
headers = {}
|
|
body_snippet = ""
|
|
cookie_keys = ""
|
|
header_indicators = {
|
|
"Cloudflare": ["cf-ray", "server: cloudflare", "cf-cache-status", "cf-request-id"],
|
|
"Imperva": ["x-iinfo", "incapsula", "visid_incap_"],
|
|
"Akamai": ["x-akamai-transformed", "akamai", "akamaiedge", "akamaitechnologies"],
|
|
"Fastly": ["x-served-by", "x-cache", "x-fastly-backend-request-id"],
|
|
"Sucuri": ["x-sucuri-cache", "x-sucuri-id"],
|
|
"F5": ["bigipserver", "x-lb"],
|
|
"ModSecurity": ["mod_security", "mod_sec"],
|
|
"AWS WAF": ["x-amzn-requestid", "x-amz-cf-id"],
|
|
"Fortinet": ["fortigate", "f5-"],
|
|
"Barracuda": ["barracuda"],
|
|
"Incapsula": ["visid_incap_"]
|
|
}
|
|
for provider, sigs in header_indicators.items():
|
|
for sig in sigs:
|
|
try:
|
|
if ":" in sig:
|
|
hname, hv = [s.strip() for s in sig.split(":", 1)]
|
|
hv = hv.lower()
|
|
if headers.get(hname) and hv in headers.get(hname, "").lower():
|
|
result.update({"detected": True, "provider": provider, "confidence": 0.75, "evidence": [f"header:{hname}"]})
|
|
return result
|
|
else:
|
|
if any(sig in h for h in headers.keys()):
|
|
result.update({"detected": True, "provider": provider, "confidence": 0.7, "evidence": [f"header_contains:{sig}"]})
|
|
return result
|
|
if sig in body_snippet.lower():
|
|
result.update({"detected": True, "provider": provider, "confidence": 0.6, "evidence": ["body_snippet"]})
|
|
return result
|
|
if re.search(re.escape(sig), cookie_keys, re.I):
|
|
result.update({"detected": True, "provider": provider, "confidence": 0.65, "evidence": ["cookie_name"]})
|
|
return result
|
|
except Exception:
|
|
continue
|
|
challenge_patterns = [r"attention required", r"access denied", r"please enable cookies", r"security check", r"verify you are a human", r"challenge.*cloudflare"]
|
|
for pat in challenge_patterns:
|
|
if re.search(pat, body_snippet, re.I):
|
|
result.update({"detected": True, "provider": "Unknown (challenge page)", "confidence": 0.5, "evidence": ["challenge_pattern"]})
|
|
return result
|
|
except Exception as e:
|
|
logger.debug(f"WAF detection error heuristics: {e}")
|
|
return result
|
|
|
|
|
|
# -------------------- CDN detection --------------------
|
|
def detect_cdn_from_headers_and_dns(headers: dict, dns_records: dict, ip: str = None, extracted_data: dict = None) -> dict:
|
|
detected = {"source": None, "provider": None, "confidence": 0, "reasons": []}
|
|
headers_lower = {k.lower(): v for k, v in (headers or {}).items()}
|
|
extracted_data = extracted_data or {}
|
|
cdn_header_signatures = {
|
|
"Cloudflare": ["cf-ray", "cf-cache-status", "server: cloudflare", "cf-request-id"],
|
|
"Akamai": ["x-akamai-transformed", "x-akamai-request-id", "akamai"],
|
|
"Amazon CloudFront": ["x-amz-cf-id", "via: 1.1 cloudfront", "x-cache"],
|
|
"Fastly": ["x-served-by", "x-fastly-backend-request-id", "x-cache"],
|
|
"Sucuri": ["x-sucuri-cache", "x-sucuri-id"],
|
|
"Google Cloud CDN": ["x-goog-gfe-response-headers", "x-google-gfe"],
|
|
"Incapsula": ["x-iinfo", "visid_incap_"],
|
|
"Azure CDN": ["cdn-io", "azureedge", "azurefd", "akadns"],
|
|
"Netlify": ["netlify"],
|
|
"Cloudflare Stream": ["cf-stream"],
|
|
"BunnyCDN": ["bunnycdn"],
|
|
"StackPath": ["stackpathcdn"],
|
|
"KeyCDN": ["x-keycdn"],
|
|
"CDN77": ["cdn77"],
|
|
"Akamai EdgeKey": ["edgekey.net"]
|
|
}
|
|
for provider, sigs in cdn_header_signatures.items():
|
|
for sig in sigs:
|
|
if any(sig in h for h in headers_lower.keys()) or any(sig in v.lower() for v in headers_lower.values()):
|
|
detected.update({"source": "Headers", "provider": provider, "confidence": 95})
|
|
detected["reasons"].append(f"header signature matched {sig}")
|
|
return detected
|
|
cname_records = dns_records.get("CNAME", []) if dns_records else []
|
|
try:
|
|
candidate_host = cname_records[0] if cname_records else None
|
|
cname_chain = resolve_cname_chain(candidate_host) if candidate_host else []
|
|
cname_patterns = {
|
|
"Cloudflare": r"cloudflare|cloudfront|cloudflare.net",
|
|
"Akamai": r"akamai|akamaiedge|akamaitechnologies|edgekey\.net|akamaiedge\.net",
|
|
"Amazon CloudFront": r"cloudfront\.net",
|
|
"Fastly": r"fastly\.net|fastly",
|
|
"Incapsula": r"incapsula|imperva",
|
|
"Sucuri": r"sucuri\.net|sucuri",
|
|
"Azure CDN": r"azureedge|azurefd|z6rungcdn|azure",
|
|
"Netlify": r"netlify\.app|netlify",
|
|
"BunnyCDN": r"bunnycdn",
|
|
"StackPath": r"stackpathdns",
|
|
"KeyCDN": r"kccdn|kxcdn",
|
|
"CDN77": r"cdn77",
|
|
}
|
|
for provider, pattern in cname_patterns.items():
|
|
for cname in (cname_records + cname_chain):
|
|
if re.search(pattern, cname, re.I):
|
|
detected.update({"source": "DNS CNAME", "provider": provider, "confidence": 85})
|
|
detected["reasons"].append(f"CNAME {cname} matches {provider}")
|
|
return detected
|
|
except Exception as e:
|
|
logger.debug(f"CDN CNAME check error: {e}")
|
|
try:
|
|
asset_hosts = set()
|
|
for linklist in ("js_links", "css_links", "image_links", "form_links"):
|
|
for a in extracted_data.get(linklist, []):
|
|
try:
|
|
p = urlparse(a)
|
|
if p.hostname:
|
|
asset_hosts.add(p.hostname.lower())
|
|
except Exception:
|
|
continue
|
|
asset_hosts_list = list(asset_hosts)
|
|
asset_host_patterns = {
|
|
"Cloudflare": ["cloudflare", "cdn-cdn.cloudflare", "cloudflare.net", "cdn-cgi"],
|
|
"Akamai": ["akamai.net", "akamaiedge", "akamaitechnologies", "edgekey.net"],
|
|
"Fastly": ["fastly.net", "fastly"],
|
|
"Amazon CloudFront": ["cloudfront.net", "amazonaws.com"],
|
|
"Netlify": ["netlify.app", "netlify"],
|
|
"BunnyCDN": ["b-cdn.net", "bunnycdn"],
|
|
"Google Cloud CDN": ["googleusercontent.com", "googleapis.com"],
|
|
"KeyCDN": ["kxcdn", "kccdn"],
|
|
"CDN77": ["cdn77"],
|
|
"StackPath": ["stackpathcdn", "stackpathdns"]
|
|
}
|
|
for provider, pats in asset_host_patterns.items():
|
|
for pat in pats:
|
|
for ah in asset_hosts_list:
|
|
if pat in ah:
|
|
detected.update({"source": "Asset Hosts", "provider": provider, "confidence": 80})
|
|
detected["reasons"].append(f"asset host {ah} contains {pat}")
|
|
return detected
|
|
except Exception as e:
|
|
logger.debug(f"Asset host analysis error: {e}")
|
|
return detected
|
|
|
|
|
|
# -------------------- Main async scan (IMPROVED) --------------------
|
|
async def main_async_scan(url: str):
|
|
scan_start = datetime.utcnow().isoformat() + "Z"
|
|
try:
|
|
logger.info(f"Starting scan for {url}")
|
|
# Step 1: Try Playwright render (get content + headers)
|
|
dynamic_html, dynamic_headers, dynamic_final_url = await get_dynamic_html(url)
|
|
final_html = dynamic_html or ""
|
|
final_headers = dynamic_headers or {}
|
|
final_url = dynamic_final_url or url
|
|
static_response = None
|
|
|
|
# If no dynamic content, try static fetch (async)
|
|
if not final_html:
|
|
logger.info("Dynamic fetch empty; attempting static fetch...")
|
|
static_response = await fetch_static(url)
|
|
if static_response and static_response.status_code == 200:
|
|
final_html = static_response.text or ""
|
|
final_headers = dict(static_response.headers or {})
|
|
final_url = str(static_response.url or url)
|
|
else:
|
|
# fallback sync attempt to capture headers/body
|
|
try:
|
|
r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url))
|
|
if r.status_code == 200:
|
|
final_html = r.text or ""
|
|
final_headers = dict(r.headers or {})
|
|
final_url = str(r.url or url)
|
|
else:
|
|
logger.warning(f"Static fetch returned {r.status_code} for {url}")
|
|
except Exception as e:
|
|
logger.debug(f"Sync fallback static fetch failed: {e}")
|
|
else:
|
|
# We have dynamic HTML; ensure we also have headers (use static fetch or HEAD if headers missing)
|
|
if not final_headers:
|
|
try:
|
|
head_resp = httpx.head(final_url, follow_redirects=True, timeout=8, headers=get_realistic_headers(final_url))
|
|
if head_resp and head_resp.status_code < 400:
|
|
final_headers = dict(head_resp.headers or {})
|
|
else:
|
|
r2 = httpx.get(final_url, follow_redirects=True, timeout=10, headers=get_realistic_headers(final_url))
|
|
if r2:
|
|
final_headers = dict(r2.headers or {})
|
|
except Exception as e:
|
|
logger.debug(f"Failed to fetch headers fallback: {e}")
|
|
|
|
# store raw evidence: headers + body
|
|
raw_evidence = {}
|
|
if final_html:
|
|
raw_body_bytes = (final_html.encode("utf-8") if isinstance(final_html, str) else (final_html or b""))
|
|
raw_evidence["body"] = store_raw_evidence(raw_body_bytes, prefix="body")
|
|
if final_headers:
|
|
try:
|
|
hdr_bytes = json.dumps(dict(final_headers), ensure_ascii=False).encode("utf-8")
|
|
raw_evidence["headers"] = store_raw_evidence(hdr_bytes, prefix="headers")
|
|
except Exception:
|
|
raw_evidence["headers"] = {"error": "failed_to_store_headers"}
|
|
|
|
# Step 2: Extract links and resources (ensure final_url passed)
|
|
logger.info("Extracting links and resources...")
|
|
extracted_data = extract_links_and_scripts(final_html or "", final_url)
|
|
js_links = extracted_data.get("js_links", [])
|
|
css_links = extracted_data.get("css_links", [])
|
|
|
|
# Step 3: Run detection tasks concurrently
|
|
logger.info("Detecting technologies (Wappalyzer/BuiltWith/JS/CSS heuristics)...")
|
|
tasks = [
|
|
detect_technologies_wappalyzer(final_url, final_html or "", final_headers),
|
|
detect_technologies_builtwith(final_url),
|
|
detect_js_technologies(js_links, final_url, final_html or ""),
|
|
detect_css_technologies(css_links, final_html or "")
|
|
]
|
|
wappalyzer_res, builtwith_res, js_res, css_res = await asyncio.gather(*tasks)
|
|
|
|
# Step 4: Combine technologies
|
|
all_tech = (wappalyzer_res or []) + (builtwith_res or []) + (js_res or []) + (css_res or [])
|
|
tech_map: Dict[str, Any] = {}
|
|
for tech in all_tech:
|
|
name = tech.get("name")
|
|
if not name:
|
|
continue
|
|
existing = tech_map.get(name)
|
|
confidence = float(tech.get("confidence", 50))
|
|
if existing:
|
|
existing_conf = float(existing.get("confidence", 0))
|
|
existing["confidence"] = max(existing_conf, confidence)
|
|
existing_sources = set([s.strip() for s in str(existing.get("source", "")).split(",") if s])
|
|
incoming_source = tech.get("source") or ""
|
|
if incoming_source and incoming_source not in existing_sources:
|
|
existing_sources.add(incoming_source)
|
|
existing["source"] = ", ".join(sorted(existing_sources))
|
|
existing_prov = set(existing.get("provenance", []) or [])
|
|
incoming_prov = set(tech.get("provenance", []) or [])
|
|
existing["provenance"] = list(existing_prov.union(incoming_prov))
|
|
if tech.get("version") and existing.get("version") in (None, "Unknown"):
|
|
existing["version"] = tech.get("version")
|
|
else:
|
|
tech_map[name] = {
|
|
"name": name,
|
|
"version": tech.get("version", "Unknown"),
|
|
"confidence": confidence,
|
|
"source": tech.get("source", ""),
|
|
"provenance": tech.get("provenance", []) or []
|
|
}
|
|
combined_tech = list(tech_map.values())
|
|
combined_tech.sort(key=lambda x: x.get("confidence", 0), reverse=True)
|
|
|
|
# Step 5: DNS and SSL
|
|
parsed = urlparse(final_url)
|
|
domain = parsed.netloc.split(":")[0] if parsed.netloc else ""
|
|
dns_records = get_dns_records(domain) if domain else {}
|
|
ssl_info = {}
|
|
if parsed.scheme == "https" and domain:
|
|
ssl_info = get_ssl_info(domain)
|
|
|
|
# Step 6: IP info
|
|
ip_info = {}
|
|
if dns_records.get("A"):
|
|
ip = dns_records["A"][0] if isinstance(dns_records["A"], list) and dns_records["A"] else dns_records["A"]
|
|
ip_info = get_ip_info(ip)
|
|
|
|
# Step 7: robots.txt
|
|
robots_info = await analyze_robots(domain) if domain else {"exists": False, "tried": [], "error": "no domain"}
|
|
|
|
# Step 8: Security headers and CMS detection
|
|
security_headers = analyze_security_headers(final_headers)
|
|
cms_info = detect_cms(final_html or "", final_headers or {}, final_url, extracted_data=extracted_data)
|
|
|
|
# Step 9: payments and trackers
|
|
payment_methods_info = detect_payment_methods(final_html or "", extracted_data=extracted_data)
|
|
trackers_info = detect_trackers_and_analytics(final_html or "", js_links=extracted_data.get("js_links", []), meta_tags=extracted_data.get("meta_tags", []))
|
|
|
|
# Step 10: WAF & CDN heuristics
|
|
waf_info = detect_waf_subprocess(final_url)
|
|
cdn_info = detect_cdn_from_headers_and_dns(final_headers or {}, dns_records or {}, ip_info.get("asn_cidr") if ip_info else None, extracted_data=extracted_data)
|
|
|
|
# Inference rules for Cloudflare
|
|
try:
|
|
if (not cdn_info.get("provider")) and waf_info.get("provider") and "cloudflare" in (waf_info.get("provider") or "").lower():
|
|
cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 90, "reasons": ["waf indicates Cloudflare"]})
|
|
elif (not cdn_info.get("provider")) and ip_info and ip_info.get("asn_description") and "cloudflare" in str(ip_info.get("asn_description")).lower():
|
|
cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 85, "reasons": ["ip whois ASN indicates Cloudflare"]})
|
|
else:
|
|
ns_list = dns_records.get("NS", []) or []
|
|
if (not cdn_info.get("provider")):
|
|
for ns in ns_list:
|
|
if "cloudflare" in ns.lower():
|
|
cdn_info.update({"source": "dns", "provider": "Cloudflare", "confidence": 85, "reasons": [f"NS {ns} indicates Cloudflare"]})
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
# Build final report
|
|
title = "No Title"
|
|
try:
|
|
soup = BeautifulSoup(final_html or "", "lxml")
|
|
if soup.title and soup.title.string:
|
|
title = soup.title.string.strip()
|
|
except Exception:
|
|
title = "No Title"
|
|
|
|
report = {
|
|
"scan_id": generate_scan_id(),
|
|
"scanned_at": scan_start,
|
|
"url": final_url,
|
|
"title": title,
|
|
"raw_evidence": raw_evidence,
|
|
"technologies": combined_tech,
|
|
"links_and_resources": extracted_data,
|
|
"dns_records": dns_records,
|
|
"ssl_info": ssl_info,
|
|
"ip_info": ip_info,
|
|
"robots_info": robots_info,
|
|
"security_headers": security_headers,
|
|
"cms_info": cms_info,
|
|
"payment_methods": payment_methods_info,
|
|
"trackers_and_analytics": trackers_info,
|
|
"waf_info": waf_info,
|
|
"cdn_info": cdn_info,
|
|
"headers": final_headers,
|
|
"notes": "Report contains provenance (raw_evidence paths) and normalized confidence scores (0-100 for technologies)."
|
|
}
|
|
|
|
# Normalize confidence to 0-100 for technologies
|
|
for t in report["technologies"]:
|
|
try:
|
|
t_conf = float(t.get("confidence", 50))
|
|
if 0 <= t_conf <= 1:
|
|
t["confidence"] = int(round(t_conf * 100))
|
|
else:
|
|
t["confidence"] = int(round(min(max(t_conf, 0), 100)))
|
|
except Exception:
|
|
t["confidence"] = 50
|
|
|
|
return safe_json(report)
|
|
|
|
except Exception as e:
|
|
logger.exception("Main scan failed")
|
|
return safe_json({"error": "Main scan failed", "details": str(e), "scanned_at": scan_start})
|
|
|
|
|
|
# -------------------- Convenience wrapper used by analyze_site.py --------------------
|
|
async def run_scan_for_url(url: str, render_js: bool = False, scan_id: Optional[str] = None) -> Dict[str, Any]:
|
|
try:
|
|
report = await main_async_scan(url)
|
|
if not isinstance(report, dict):
|
|
report = {"error": "invalid_report", "details": "Scanner returned non-dict result", "raw": str(report)}
|
|
report.setdefault("scanned_url", report.get("url", url))
|
|
if scan_id:
|
|
report["scan_id"] = scan_id
|
|
report.setdefault("url", report.get("scanned_url"))
|
|
report.setdefault("technologies", report.get("technologies", []))
|
|
report.setdefault("dns_records", report.get("dns_records", {}))
|
|
report.setdefault("robots_info", report.get("robots_info", {"exists": False}))
|
|
report.setdefault("headers", report.get("headers", {}))
|
|
# compatibility aliases
|
|
report.setdefault("waf", report.get("waf_info"))
|
|
report.setdefault("cdn", report.get("cdn_info"))
|
|
report.setdefault("payments", report.get("payment_methods"))
|
|
return report
|
|
except Exception as e:
|
|
logger.exception("run_scan_for_url wrapper failed")
|
|
return safe_json({"error": "run_scan_for_url_failed", "details": str(e), "scanned_url": url})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# quick smoke test when running standalone
|
|
test_url = "https://www.google.com"
|
|
# note: run async via: python -c "import asyncio, utils; asyncio.run(utils.main_async_scan('https://example.com'))"
|
|
pass
|