الملفات
SuperReconn/app/utils.py

1424 أسطر
64 KiB
Python

# utils.py
# SuperRecon utils - improved (compat_resources + Wappalyzer DB validation + regex fixes)
import os
import re
import json
import socket
import logging
import ssl
import gzip
import OpenSSL
import dns.resolver
import httpx
from urllib.parse import urljoin, urlparse, quote_plus
from bs4 import BeautifulSoup
from datetime import datetime, date, timezone
from collections import defaultdict
from typing import List, Dict, Any, Optional, Tuple
import asyncio
import random
import ipaddress
import ipwhois
import time
from functools import lru_cache
from playwright.async_api import async_playwright
import whois
from Wappalyzer import Wappalyzer, WebPage
import builtwith
import subprocess
import hashlib
# optional import for charset detection (best-effort)
try:
from charset_normalizer import from_bytes
except Exception:
from_bytes = None
# optional brotli decompress
try:
import brotli
except Exception:
brotli = None
# -------------------- Logger setup --------------------
logger = logging.getLogger("SuperRecon.utils")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(os.environ.get("SUPERR_LOG_LEVEL", "INFO"))
# Directory to store raw evidence
EVIDENCE_DIR = os.environ.get("SUPERR_EVIDENCE_DIR", "./evidence")
os.makedirs(EVIDENCE_DIR, exist_ok=True)
# -------------------- Compatibility layer (replacement for pkg_resources) --------------------
# Provides: get_version, resource_bytes, resource_text, resource_path (context manager),
# iter_entry_points, load_entry_point, parse_requirement, installed_distributions, dist_metadata
try:
# importlib.metadata (stdlib) with backport fallback
from importlib.metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore
except Exception:
from importlib_metadata import distribution, distributions, entry_points, version as _version, PackageNotFoundError # type: ignore
# importlib.resources with backport fallback
try:
from importlib.resources import files, as_file, read_binary, read_text # type: ignore
except Exception:
from importlib_resources import files, as_file, read_binary, read_text # type: ignore
from contextlib import contextmanager
from packaging.requirements import Requirement
from packaging.version import Version, InvalidVersion
def get_version(package_name: str) -> Optional[str]:
try:
return _version(package_name)
except Exception:
return None
def resource_bytes(package: str, resource: str) -> bytes:
return read_binary(package, resource)
def resource_text(package: str, resource: str, encoding: str = "utf-8") -> str:
return read_text(package, resource, encoding=encoding)
@contextmanager
def resource_path(package: str, resource: str):
"""
Yields a filesystem Path for resource if possible.
Usage:
with resource_path('mypkg', 'data/file.txt') as p:
open(p)...
"""
p = files(package).joinpath(resource)
with as_file(p) as fp:
yield fp
class EP:
def __init__(self, ep):
self._ep = ep
@property
def name(self):
return self._ep.name
@property
def value(self):
return self._ep.value
def load(self):
return self._ep.load()
def iter_entry_points(group: str):
eps = entry_points()
try:
group_eps = eps.select(group=group) # py3.10+
except Exception:
try:
group_eps = [e for e in eps if getattr(e, "group", None) == group]
except Exception:
group_eps = eps.get(group, []) # type: ignore
for e in group_eps:
yield EP(e)
def load_entry_point(group: str, name: str):
for ep in iter_entry_points(group):
if ep.name == name:
return ep.load()
raise LookupError(f"entry point {group}:{name} not found")
def parse_requirement(req_str: str) -> Requirement:
return Requirement(req_str)
def installed_distributions():
for dist in distributions():
yield dist
def dist_metadata(name: str):
try:
return distribution(name).metadata
except PackageNotFoundError:
return None
def dist_files(name: str):
try:
return distribution(name).files
except PackageNotFoundError:
return None
# -------------------- Safe JSON Helpers --------------------
def _make_json_safe(obj):
if obj is None or isinstance(obj, (bool, int, float, str)):
return obj
if isinstance(obj, dict):
new = {}
for k, v in obj.items():
try:
key = str(k)
except Exception:
key = repr(k)
new[key] = _make_json_safe(v)
return new
if isinstance(obj, (list, tuple, set)):
return [_make_json_safe(i) for i in obj]
try:
if isinstance(obj, (datetime, date)):
return obj.isoformat()
except Exception:
pass
try:
import httpx as _httpx
if isinstance(obj, _httpx.Response):
try:
text_snippet = obj.text[:1000]
except Exception:
text_snippet = None
return {
"status_code": obj.status_code,
"url": str(obj.url) if hasattr(obj, "url") else None,
"headers": dict(obj.headers) if hasattr(obj, "headers") else None,
"text_snippet": text_snippet
}
except Exception:
pass
try:
return str(obj)
except Exception:
return repr(obj)
def safe_json(obj):
try:
safe = _make_json_safe(obj)
json.dumps(safe, ensure_ascii=False)
return safe
except Exception as e:
logger.exception("safe_json conversion failed")
return {
"error": "safe_json_conversion_failed",
"error_str": str(e),
"repr": repr(obj)[:2000]
}
# -------------------- UUID Generator --------------------
def generate_scan_id():
import uuid
return str(uuid.uuid4())
# -------------------- Stealth Mode Enhancements --------------------
def get_random_user_agent():
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edg/120.0.0.0"
]
return random.choice(user_agents)
def get_realistic_headers(url: Optional[str] = None):
from urllib.parse import urlparse
time.sleep(random.uniform(0.02, 0.15))
domain = urlparse(url).netloc if url else "example.com"
user_agent = get_random_user_agent()
accept_headers = {
"Chrome": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Firefox": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Safari": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Edge": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Opera": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
}
browser = "Chrome"
if "Firefox" in user_agent:
browser = "Firefox"
elif "Safari" in user_agent and "Chrome" not in user_agent:
browser = "Safari"
elif "Edg" in user_agent or "Edge" in user_agent:
browser = "Edge"
languages = ["en-US,en;q=0.9", "en-GB,en;q=0.9", "ar-JO,ar;q=0.9,en;q=0.8", "fr-FR,fr;q=0.9,en;q=0.8"]
encodings = ["gzip, deflate, br", "gzip, deflate", "gzip, br", "deflate, br"]
headers = {
"User-Agent": user_agent,
"Accept": accept_headers.get(browser, accept_headers["Chrome"]),
"Accept-Language": random.choice(languages),
"Accept-Encoding": random.choice(encodings),
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"DNT": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Referer": f"https://www.google.com/search?q={domain.replace('.', '+')}",
"Cache-Control": "max-age=0"
}
return headers
# -------------------- Evidence storage --------------------
def store_raw_evidence(content: bytes, prefix: str = "body") -> Dict[str, Any]:
sha = hashlib.sha256(content).hexdigest()
filename = f"{prefix}_{sha}.bin"
path = os.path.join(EVIDENCE_DIR, filename)
try:
if not os.path.exists(path):
with open(path, "wb") as fh:
fh.write(content)
return {"path": path, "sha256": sha, "timestamp": datetime.utcnow().isoformat() + "Z"}
except Exception as e:
logger.debug(f"Failed to store evidence: {e}")
return {"error": str(e)}
# -------------------- Retry/backoff wrapper (async) --------------------
async def async_request_with_retry(method: str, url: str, client: httpx.AsyncClient, max_retries: int = 4,
base_delay: float = 0.5, timeout: int = 15, headers: dict = None):
attempt = 0
while attempt <= max_retries:
try:
attempt += 1
resp = await client.request(method, url, timeout=timeout, headers=headers)
if resp.status_code == 429 or (500 <= resp.status_code < 600 and resp.status_code != 501):
raise httpx.HTTPStatusError("Retryable status", request=resp.request, response=resp)
return resp
except Exception as e:
if attempt > max_retries:
logger.debug(f"Request failed (max retries) for {url}: {e}")
return None
sleep = base_delay * (2 ** (attempt - 1))
jitter = random.uniform(0, sleep)
await asyncio.sleep(jitter)
return None
# -------------------- WHOIS --------------------
def whois_lookup(domain: str) -> dict:
try:
w = whois.whois(domain)
return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "data": safe_json(w)}
except Exception as e:
logger.debug(f"whois_lookup error: {e}")
return {"source": "python-whois", "timestamp": datetime.utcnow().isoformat() + "Z", "error": str(e)}
# -------------------- DNS --------------------
@lru_cache(maxsize=256)
def get_dns_records(domain: str) -> Dict[str, List[str]]:
records = defaultdict(list)
try:
for rtype in ("A", "AAAA", "CNAME", "MX", "NS", "TXT"):
try:
answers = dns.resolver.resolve(domain, rtype, lifetime=5)
for r in answers:
records[rtype].append(str(r).strip())
except Exception:
continue
except Exception as e:
logger.debug(f"get_dns_records error: {e}")
return dict(records)
def resolve_cname_chain(hostname: str, max_depth: int = 6) -> List[str]:
chain = []
try:
resolver = dns.resolver.Resolver()
resolver.lifetime = 5
curr = hostname
for _ in range(max_depth):
try:
answers = resolver.resolve(curr, "CNAME")
if not answers:
break
target = str(answers[0].target).rstrip(".")
chain.append(target)
curr = target
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.NoNameservers):
break
except Exception:
break
except Exception as e:
logger.debug(f"resolve_cname_chain error for {hostname}: {e}")
return chain
# -------------------- SSL/TLS info --------------------
def get_ssl_info(domain: str) -> Dict[str, Any]:
res = {
"valid": False,
"issuer": None,
"subject": None,
"not_before": None,
"not_after": None,
"expired": None,
"san": [],
"raw_pem": None,
"error": None
}
try:
ctx = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as sock:
with ctx.wrap_socket(sock, server_hostname=domain) as ss:
der = ss.getpeercert(binary_form=True)
pem = ssl.DER_cert_to_PEM_cert(der)
res["raw_pem"] = pem
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
res["issuer"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v
for k, v in x509.get_issuer().get_components()}
res["subject"] = {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v
for k, v in x509.get_subject().get_components()}
not_before = x509.get_notBefore()
not_after = x509.get_notAfter()
res["not_before"] = not_before.decode() if isinstance(not_before, bytes) else str(not_before)
res["not_after"] = not_after.decode() if isinstance(not_after, bytes) else str(not_after)
for i in range(x509.get_extension_count()):
ext = x509.get_extension(i)
if ext.get_short_name() == b'subjectAltName':
res["san"] = [s.strip() for s in str(ext).split(',')]
res["valid"] = True
try:
dt = datetime.strptime(res["not_after"][:14], "%Y%m%d%H%M%S")
res["expired"] = dt < datetime.utcnow()
except Exception:
res["expired"] = None
except Exception as e:
res["error"] = str(e)
logger.debug(f"get_ssl_info error for {domain}: {e}")
return res
# -------------------- Robots (try https then http, handle encodings/charset/compression) --------------------
async def analyze_robots(domain: str) -> Dict[str, Any]:
tried = []
async with httpx.AsyncClient(follow_redirects=True) as client:
for scheme in ("https://", "http://"):
url = f"{scheme}{domain}/robots.txt"
tried.append(url)
headers = get_realistic_headers(url)
r = await async_request_with_retry("GET", url, client, headers=headers, timeout=10)
if not r:
continue
if r.status_code == 200:
raw = r.content or b""
ev = store_raw_evidence(raw, prefix="robots")
text = None
# if content is compressed (gzip)
try:
if raw.startswith(b'\x1f\x8b'):
try:
text = gzip.decompress(raw).decode('utf-8', errors='replace')
except Exception:
try:
text = r.text
except Exception:
text = None
elif brotli and (not raw.startswith(b'\x1f\x8b')) and (b'br' in (r.headers.get('content-encoding') or '').lower() or raw[:2] == b'\x8b'):
try:
text = brotli.decompress(raw).decode('utf-8', errors='replace')
except Exception:
text = None
else:
text = None
except Exception:
text = None
# try charset_normalizer
if text is None and from_bytes:
try:
result = from_bytes(raw)
best = result.best()
if best:
text = best.read()
except Exception:
text = None
if text is None:
try:
text = raw.decode(r.encoding or "utf-8", errors="replace")
except Exception:
try:
text = r.text
except Exception:
text = raw.decode("utf-8", errors="replace")
# sanitize and parse lines
rules = []
sitemaps = []
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parts = (line.split(":", 1) + [""])[:2]
k = parts[0].strip().lower()
v = parts[1].strip()
if k == "sitemap":
sitemaps.append(v)
else:
rules.append({"directive": k, "value": v})
return {"exists": True, "content_snippet": text[:8000], "rules": rules, "sitemaps": sitemaps, "fetched_from": url, "raw_evidence": ev}
return {"exists": False, "tried": tried, "error": "robots not found or unreachable (checked https and http)"}
# -------------------- Extract links & resources --------------------
def extract_links_and_scripts(html: str, base_url: str) -> dict:
if not html:
return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []}
try:
soup = BeautifulSoup(html, "lxml")
results = {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []}
base_domain = urlparse(base_url).netloc.split(":")[0] if base_url else ""
for s in soup.find_all("script", src=True):
src = s["src"].strip()
full = urljoin(base_url, src)
results["js_links"].append(full)
for l in soup.find_all("link", rel=lambda r: r and "stylesheet" in r, href=True):
href = l["href"].strip()
full = urljoin(base_url, href)
results["css_links"].append(full)
for m in soup.find_all("meta"):
results["meta_tags"].append({k: m.get(k) for k in ("name", "property", "content", "http-equiv") if m.get(k)})
for a in soup.find_all("a", href=True):
href = a["href"].strip()
if href.startswith(("mailto:", "tel:", "javascript:", "#")):
continue
full = urljoin(base_url, href)
try:
netloc = urlparse(full).netloc.split(":")[0]
except Exception:
netloc = ""
if netloc == base_domain:
results["internal_links"].append(full)
else:
results["external_links"].append(full)
for img in soup.find_all("img", src=True):
src = img["src"].strip()
full = urljoin(base_url, src)
results["image_links"].append(full)
for form in soup.find_all("form", action=True):
action = form["action"].strip()
full = urljoin(base_url, action)
results["form_links"].append(full)
if "/api/" in full or "/graphql" in full:
results["api_links"].append(full)
for k in ("js_links", "css_links", "internal_links", "external_links", "image_links", "form_links", "api_links"):
results[k] = list(dict.fromkeys(results[k]))
return results
except Exception as e:
logger.debug(f"extract_links error: {e}")
return {"js_links": [], "css_links": [], "internal_links": [], "external_links": [], "image_links": [], "form_links": [], "api_links": [], "meta_tags": []}
# -------------------- Playwright render (returns content, headers, final_url) --------------------
async def get_dynamic_html(url: str, timeout: int = 20) -> Tuple[str, Dict[str, str], str]:
try:
async with async_playwright() as pw:
browser = await pw.chromium.launch(args=["--no-sandbox"], headless=True)
page = await browser.new_page()
await page.set_extra_http_headers(get_realistic_headers(url))
# navigate and capture main response
resp = await page.goto(url, wait_until="networkidle", timeout=timeout * 1000)
await asyncio.sleep(0.25)
content = await page.content()
# extract headers from the main response if available
headers = {}
final_url = url
try:
if resp:
headers = resp.headers or {}
final_url = resp.url or page.url
else:
final_url = page.url
except Exception:
headers = {}
await browser.close()
headers = {str(k): str(v) for k, v in (headers or {}).items()}
return content or "", headers, final_url or url
except Exception as e:
logger.debug(f"Playwright error: {e}")
return "", {}, url
# -------------------- Static fetch --------------------
async def fetch_static(url: str, timeout: int = 15) -> Optional[httpx.Response]:
headers = get_realistic_headers(url)
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers)
return resp
# -------------------- Wappalyzer helpers: DB validation --------------------
def _iter_values_recursively(obj):
if isinstance(obj, dict):
for k, v in obj.items():
yield from _iter_values_recursively(v)
elif isinstance(obj, list):
for i in obj:
yield from _iter_values_recursively(i)
elif isinstance(obj, str):
yield obj
def validate_wappalyzer_db(path: str) -> List[Tuple[str, str, str]]:
"""
Validate regex patterns inside a Wappalyzer technologies.json file.
Returns list of tuples: (technology_name, pattern_string, error_message)
"""
bad = []
try:
with open(path, "r", encoding="utf-8") as fh:
data = json.load(fh)
except Exception as e:
logger.debug(f"validate_wappalyzer_db: failed to load JSON: {e}")
return bad
for tech_name, tech_def in data.items():
try:
for s in _iter_values_recursively(tech_def):
if not isinstance(s, str):
continue
# quick skip for short tokens unlikely to be regex
if len(s) < 4:
continue
try:
re.compile(s)
except re.error as rex:
bad.append((tech_name, s, str(rex)))
except Exception:
# ignore other compile-time issues
continue
except Exception:
continue
return bad
# -------------------- Wappalyzer / BuiltWith / JS/CSS heuristics --------------------
def compute_tech_confidence_from_wappalyzer(data: dict) -> int:
confidence = 50
detection = data.get("detection", {})
if isinstance(detection, dict):
if "headers" in detection:
confidence = max(confidence, 85)
if "script" in detection or "js" in detection:
confidence = max(confidence, 80)
if "meta" in detection:
confidence = max(confidence, 75)
return confidence
async def detect_technologies_wappalyzer(url: str, html: str, headers: dict) -> list:
try:
webpage = WebPage(url, html or "", headers or {})
# try Wappalyzer.latest() but be resilient
try:
w = Wappalyzer.latest()
except Exception as e:
# fallback to local DB if available (with validation)
tech_path = os.path.join(os.path.dirname(__file__), "technologies.json")
if os.path.exists(tech_path):
try:
# validate DB first to log problematic regexs
bad = validate_wappalyzer_db(tech_path)
if bad:
logger.warning(f"Wappalyzer DB contains {len(bad)} invalid regex patterns (showing up to 10).")
for tname, patt, err in bad[:10]:
logger.warning(f"Invalid regex in Wappalyzer DB - {tname}: pattern={patt!r} error={err}")
w = Wappalyzer(tech_path)
except Exception as e2:
logger.debug(f"Fallback Wappalyzer load failed: {e2}")
return []
else:
logger.debug("Wappalyzer DB not available and no local fallback")
return []
# analyze, but guard against regex runtime errors inside w.analyze
try:
results = w.analyze_with_categories(webpage) or {}
except re.error as rex:
logger.exception("Wappalyzer analyze raised a regex error — likely a faulty pattern in DB.")
return []
except Exception as e:
logger.debug(f"Wappalyzer analyze failed: {e}")
return []
detected = []
for name, data in results.items():
if not isinstance(data, dict):
continue
confidence = compute_tech_confidence_from_wappalyzer(data)
prov = []
det = data.get("detected", {})
if det:
prov.append("wappalyzer-detected")
categories = data.get("categories", [])
detected.append({
"name": name,
"version": data.get("version", "Unknown"),
"categories": categories,
"confidence": confidence,
"source": "Wappalyzer",
"provenance": prov
})
detected.sort(key=lambda x: x["confidence"], reverse=True)
return detected
except Exception as e:
logger.debug(f"Wappalyzer error: {e}")
return []
async def detect_technologies_builtwith(url: str) -> list:
try:
raw = builtwith.builtwith(url)
out = []
for cat, techs in (raw or {}).items():
for t in techs:
confidence = 70
if "cdn" in cat.lower():
confidence = 90
if "framework" in cat.lower():
confidence = 90
out.append({
"name": t,
"category": cat,
"confidence": confidence,
"source": "BuiltWith",
"provenance": ["builtwith-api"]
})
out.sort(key=lambda x: x["confidence"], reverse=True)
return out
except Exception as e:
logger.debug(f"BuiltWith error: {e}")
return []
async def fetch_resource_content(url: str, timeout: int = 10) -> str:
try:
headers = get_realistic_headers(url)
async with httpx.AsyncClient(follow_redirects=True) as client:
r = await async_request_with_retry("GET", url, client, timeout=timeout, headers=headers)
if r and r.status_code == 200:
return r.text or ""
except Exception as e:
logger.debug(f"Failed to fetch resource {url}: {e}")
return ""
return ""
async def detect_js_technologies(js_links: List[str], base_url: str, html: str) -> list:
detected = []
content = " ".join(js_links or []) + " " + (html or "")
content_l = content.lower()
indicators = {
"jQuery": r"jquery[\w-]*\.js|jquery-ui|\$\.fn\.jquery|window\.jquery",
"React": r"react[\w-]*\.js|react-dom|__react_devtools_global_hook__|data-reactroot",
"Angular": r"angular[\w-]*\.js|ng-app|angular\.module",
"Vue.js": r"vue[\w-]*\.js|__vue_devtools_global_hook__|vue-router"
}
for tech, pattern in indicators.items():
try:
if re.search(pattern, content_l):
detected.append({"name": tech, "confidence": 70, "source": "JS Heuristics", "provenance": ["inline", "links"]})
except re.error:
# fallback: substring check
if pattern.lower() in content_l:
detected.append({"name": tech, "confidence": 60, "source": "JS Heuristics (fallback)", "provenance": ["inline", "links"]})
sem = asyncio.Semaphore(10)
async def _fetch(url_):
async with sem:
return await fetch_resource_content(url_)
tasks = []
for url_ in (js_links or []):
tasks.append(_fetch(url_))
contents = []
if tasks:
try:
contents = await asyncio.gather(*tasks)
except Exception:
contents = []
for c in (contents or []):
c_l = (c or "").lower()
for tech, pattern in indicators.items():
try:
if re.search(pattern, c_l):
if not any(d["name"] == tech for d in detected):
detected.append({"name": tech, "confidence": 85, "source": "JS Heuristics", "provenance": ["resource_content"]})
except re.error:
if pattern.lower() in c_l:
if not any(d["name"] == tech for d in detected):
detected.append({"name": tech, "confidence": 75, "source": "JS Heuristics (fallback)", "provenance": ["resource_content"]})
return detected
async def detect_css_technologies(css_links: List[str], html: str) -> list:
detected = []
content = " ".join(css_links or []) + " " + (html or "")
content_l = content.lower()
indicators = {
"Bootstrap": r"bootstrap[\w-]*\.css|class=['\"].*col-",
# improved Tailwind detection: look for class attributes containing tw- (utility prefix) or grid-cols, flex- etc.
"Tailwind CSS": r"tailwind\.min\.css|class=['\"][^'\"]*\btw-|class=['\"].*grid-cols-|class=['\"].*flex-",
"Materialize": r"materialize[\w-]*\.css"
}
for tech, pattern in indicators.items():
try:
if re.search(pattern, content_l):
detected.append({"name": tech, "confidence": 70, "source": "CSS Heuristics", "provenance": ["links_or_inline"]})
except re.error:
if pattern.lower() in content_l:
detected.append({"name": tech, "confidence": 60, "source": "CSS Heuristics (fallback)", "provenance": ["links_or_inline"]})
sem = asyncio.Semaphore(8)
async def _fetch(url_):
async with sem:
return await fetch_resource_content(url_)
tasks = []
for url_ in (css_links or []):
tasks.append(_fetch(url_))
contents = []
if tasks:
try:
contents = await asyncio.gather(*tasks)
except Exception:
contents = []
for c in (contents or []):
c_l = (c or "").lower()
for tech, pattern in indicators.items():
try:
if re.search(pattern, c_l):
if not any(d["name"] == tech for d in detected):
detected.append({"name": tech, "confidence": 85, "source": "CSS Heuristics", "provenance": ["resource_content"]})
except re.error:
if pattern.lower() in c_l:
if not any(d["name"] == tech for d in detected):
detected.append({"name": tech, "confidence": 75, "source": "CSS Heuristics (fallback)", "provenance": ["resource_content"]})
return detected
# -------------------- CMS detection --------------------
def compute_confidence_from_evidence(evidence: List[Dict[str, Any]]) -> float:
if not evidence:
return 0.0
total_possible = sum(float(e.get("weight", 0.0)) for e in evidence)
if total_possible <= 0:
return 0.0
found = sum(float(e.get("weight", 0.0)) for e in evidence if e.get("found"))
return min(1.0, found / total_possible)
def detect_cms(html: str, headers: dict, url: str, extracted_data: dict = None) -> list:
detected_cms = []
html_lower = (html or "").lower()
headers_lower = {k.lower(): v for k, v in (headers or {}).items()}
extracted_data = extracted_data or {}
js_links = " ".join(extracted_data.get("js_links", []))
form_links = " ".join(extracted_data.get("form_links", []))
image_links = " ".join(extracted_data.get("image_links", []))
cms_signatures = {
"WordPress": [
{"type": "path", "pattern": r"wp-content", "weight": 0.23},
{"type": "path", "pattern": r"wp-includes", "weight": 0.22},
{"type": "api", "pattern": r"wp-json", "weight": 0.18},
{"type": "meta", "pattern": r"<meta name=\"generator\" content=\"wordpress", "weight": 0.12},
{"type": "cookie", "pattern": r"wordpress_logged_in_", "weight": 0.12},
{"type": "admin", "pattern": r"/wp-admin/", "weight": 0.13}
],
"Joomla": [
{"type": "meta", "pattern": r"meta name=\"generator\" content=\"joomla", "weight": 0.35},
{"type": "path", "pattern": r"media\/com_content", "weight": 0.25},
{"type": "admin", "pattern": r"\/administrator\/", "weight": 0.35}
],
"Drupal": [
{"type": "path", "pattern": r"sites\/default\/files", "weight": 0.35},
{"type": "path", "pattern": r"\/core\/misc\/drupal\.js", "weight": 0.3},
{"type": "meta", "pattern": r"<meta name=\"generator\" content=\"drupal", "weight": 0.35}
],
"Shopify": [
{"type": "domain", "pattern": r"cdn\.shopify\.com", "weight": 0.45},
{"type": "domain", "pattern": r"myshopify\.com", "weight": 0.45},
{"type": "script", "pattern": r"shopify", "weight": 0.1}
],
"Magento": [
{"type": "path", "pattern": r"mage\/", "weight": 0.3},
{"type": "meta", "pattern": r"magento", "weight": 0.3},
{"type": "admin", "pattern": r"/admin/", "weight": 0.2}
],
"Wix": [
{"type": "script", "pattern": r"wix\.com|wixstatic", "weight": 0.6},
{"type": "meta", "pattern": r"wix", "weight": 0.4}
],
"Squarespace": [
{"type": "script", "pattern": r"squarespace", "weight": 0.6},
{"type": "meta", "pattern": r"squarespace", "weight": 0.4}
],
"Bitrix": [
{"type": "path", "pattern": r"/bitrix/", "weight": 0.7}
]
}
for cms_name, sigs in cms_signatures.items():
evidence = []
for s in sigs:
found = False
typ = s["type"]
pat = s["pattern"]
try:
if typ in ("path", "meta", "api"):
found = bool(re.search(pat, html_lower))
elif typ == "cookie":
cookie_header = headers_lower.get("set-cookie", "")
found = bool(re.search(pat, cookie_header.lower()))
elif typ == "domain":
combined = " ".join(list(headers_lower.values())) + " " + form_links.lower() + " " + js_links.lower() + " " + image_links.lower() + " " + url.lower()
found = bool(re.search(pat, combined))
elif typ == "script":
found = bool(re.search(pat, js_links.lower()))
elif typ == "admin":
found = bool(re.search(pat, html_lower)) or bool(re.search(pat, form_links.lower()))
except re.error:
if typ in ("path", "meta", "api", "script", "admin"):
found = pat.lower() in html_lower
elif typ == "domain":
found = pat.lower() in (" ".join(list(headers_lower.values())) + " " + form_links + " " + js_links + " " + image_links + " " + url).lower()
evidence.append({"type": typ, "pattern": pat, "weight": s.get("weight", 0.1), "found": found})
confidence = compute_confidence_from_evidence(evidence)
if confidence > 0:
detected_cms.append({
"name": cms_name,
"confidence": round(confidence, 3),
"evidence": evidence,
"source": "CMS Heuristics",
"provenance": [e for e in evidence if e["found"]]
})
x_gen = headers_lower.get("x-generator", "") or headers_lower.get("server", "")
if x_gen:
if "joomla" in x_gen.lower():
if not any(d["name"] == "Joomla" for d in detected_cms):
detected_cms.append({"name": "Joomla", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]})
elif "wordpress" in x_gen.lower() or "wp-" in x_gen.lower():
if not any(d["name"] == "WordPress" for d in detected_cms):
detected_cms.append({"name": "WordPress", "confidence": 1.0, "evidence": [{"type": "header", "value": x_gen}], "source": "Headers", "provenance": [{"type": "header", "value": x_gen}]})
detected_cms.sort(key=lambda x: x["confidence"], reverse=True)
return detected_cms
# -------------------- Security Headers --------------------
def analyze_security_headers(headers: dict) -> Dict[str, Any]:
headers = {k.lower(): v for k, v in (headers or {}).items()}
security = {
"x-frame-options": headers.get("x-frame-options"),
"x-xss-protection": headers.get("x-xss-protection"),
"x-content-type-options": headers.get("x-content-type-options"),
"strict-transport-security": headers.get("strict-transport-security"),
"content-security-policy": headers.get("content-security-policy"),
"referrer-policy": headers.get("referrer-policy")
}
results = {}
for header, value in security.items():
if value:
status = "Implemented"
if header == "x-frame-options":
if value.lower() in ["deny", "sameorigin"]:
status = "Secure"
else:
status = "Weak"
results[header] = {"status": status, "value": value}
return results
# -------------------- Payment Method Detection --------------------
def detect_payment_methods(html: str, extracted_data: dict = None) -> list:
detected_methods = []
html_lower = (html or "").lower()
extracted_data = extracted_data or {}
js_links = " ".join(extracted_data.get("js_links", [])).lower()
form_links = " ".join(extracted_data.get("form_links", [])).lower()
image_links = " ".join(extracted_data.get("image_links", [])).lower()
combined = " ".join([html_lower, js_links, form_links, image_links])
payment_patterns = {
"Visa": r"\bvisa\b|visa-logo|/visa\.(svg|png|jpg|gif)",
"Mastercard": r"mastercard|/mastercard\.(svg|png|jpg|gif)|master-card|master card",
"American Express": r"american[\s-]*express|amex|/amex\.(svg|png|jpg|gif)",
"PayPal": r"paypal\.com|paypal-button|www\.paypalobjects\.com|paypalcheckout|paypal\.me",
"Stripe": r"js\.stripe\.com|stripe\.com|Stripe\.(setPublishableKey|card)|stripe-v3|stripe-elements",
"Apple Pay": r"apple[\s-]*pay|apple-pay",
"Google Pay": r"google[\s-]*pay|pay.google.com|google-pay",
"Shop Pay": r"shopify\.com\/shop_pay|shopify|shop-pay",
"Discover": r"discover|discover-logo|/discover\.(svg|png|jpg|gif)",
"UnionPay": r"unionpay|union-pay",
"JCB": r"\bjcb\b",
"Alipay": r"alipay|alipayjsbridge|alipay\.com",
"WeChat Pay": r"wechatpay|weixin\.qq\.com|wechat[\s-]*pay",
"Square": r"squareup\.com|square\.(js|cdn)|sq-",
"Authorize.Net": r"authorize\.net|secure2.authorize\.net",
"Braintree": r"braintree\.gateway|braintree\.js|braintree",
"Adyen": r"adyen|checkoutshopper|adyen-checkout",
"Worldpay": r"worldpay|secure\.worldpay",
"SagePay": r"sagepay|opayo",
"Klarna": r"klarna|klarna-checkout",
"Amazon Pay": r"amazonpay|static-na\.amzn\.com|amazon-pay",
"Payoneer": r"payoneer",
"Razorpay": r"razorpay|checkout\.razorpay\.com",
"2Checkout": r"2checkout|2co",
"Mollie": r"mollie|checkout\.mollie",
"PayU": r"payu|payu\.com",
"MercadoPago": r"mercadopago|mercadopago\.com",
"CyberSource": r"cybersource|ics2wsa",
"Afterpay": r"afterpay|clearpay",
"Paystack": r"paystack|js\.paystack\.co",
"ePDQ": r"epdq|ogone",
"Checkout.com": r"checkout\.com|checkoutjs",
"GreenPay": r"greenpay"
}
for method, pattern in payment_patterns.items():
try:
if re.search(pattern, combined, re.I):
if method not in detected_methods:
detected_methods.append(method)
except re.error:
if pattern.lower() in combined:
if method not in detected_methods:
detected_methods.append(method)
checkout_indicators = [r"/checkout", r"/cart", r"/pay", r"/payment", r"/order", r"/billing"]
for pat in checkout_indicators:
if re.search(pat, form_links + html_lower):
if "E-Commerce/Checkout" not in detected_methods:
detected_methods.append("E-Commerce/Checkout")
return detected_methods
# -------------------- Tracker and Analytics Detection --------------------
def detect_trackers_and_analytics(html: str, js_links: list = None, meta_tags: list = None) -> list:
detected_trackers = []
html_lower = (html or "").lower()
tracker_patterns = {
"Google Analytics": r"google-analytics\.com/|gtag\.js|analytics\.js",
"Google Tag Manager": r"googletagmanager\.com",
"Facebook Pixel": r"connect\.facebook\.net/en_US/fbevents\.js|fbq\(",
"Hotjar": r"hotjar\.com|hjid",
"Matomo (Piwik)": r"matomo\.js",
"TikTok Pixel": r"ttq\.load"
}
for tracker, pattern in tracker_patterns.items():
if re.search(pattern, html_lower):
detected_trackers.append(tracker)
all_js_links = " ".join([link.lower() for link in (js_links or [])])
for tracker, pattern in tracker_patterns.items():
if re.search(pattern, all_js_links):
if tracker not in detected_trackers:
detected_trackers.append(tracker)
meta_content = " ".join([tag.get('content', '').lower() for tag in (meta_tags or [])])
for tracker, pattern in tracker_patterns.items():
if re.search(pattern, meta_content):
if tracker not in detected_trackers:
detected_trackers.append(tracker)
return detected_trackers
# -------------------- IP info --------------------
def get_ip_info(ip: str) -> Dict:
res = {"source": "ipwhois", "timestamp": datetime.utcnow().isoformat() + "Z"}
try:
obj = ipwhois.IPWhois(ip).lookup_rdap(depth=1)
res["asn"] = obj.get("asn")
res["asn_cidr"] = obj.get("asn_cidr")
res["asn_country_code"] = obj.get("asn_country_code")
res["asn_description"] = obj.get("asn_description")
res["network"] = obj.get("network")
except Exception as e:
logger.debug(f"IPWhois lookup failed for {ip}: {e}")
res["error"] = str(e)
return res
# -------------------- WAF detection --------------------
def detect_waf_subprocess(url: str) -> dict:
result = {"detected": False, "provider": None, "confidence": 0.0, "evidence": []}
try:
proc = subprocess.run(["wafw00f", "-a", url], capture_output=True, text=True, timeout=20)
out = (proc.stdout or "") + (proc.stderr or "")
if proc.returncode == 0 and out:
lines = out.splitlines()
for ln in lines:
for provider in ["Cloudflare", "Imperva", "Akamai", "Fastly", "Sucuri", "F5", "ModSecurity", "AWS WAF", "Fortinet", "Barracuda", "Incapsula"]:
if provider.lower() in ln.lower():
result.update({"detected": True, "provider": provider, "confidence": 0.9, "evidence": ["wafw00f-output"]})
return result
except Exception:
pass
try:
parsed = urlparse(url)
try:
r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url))
headers = {k.lower(): v for k, v in dict(r.headers).items()}
body_snippet = (r.text or "")[:3000]
cookie_keys = " ".join([c.name for c in getattr(r, "cookies", [])]) if hasattr(r, "cookies") else ""
except Exception as e:
headers = {}
body_snippet = ""
cookie_keys = ""
header_indicators = {
"Cloudflare": ["cf-ray", "server: cloudflare", "cf-cache-status", "cf-request-id"],
"Imperva": ["x-iinfo", "incapsula", "visid_incap_"],
"Akamai": ["x-akamai-transformed", "akamai", "akamaiedge", "akamaitechnologies"],
"Fastly": ["x-served-by", "x-cache", "x-fastly-backend-request-id"],
"Sucuri": ["x-sucuri-cache", "x-sucuri-id"],
"F5": ["bigipserver", "x-lb"],
"ModSecurity": ["mod_security", "mod_sec"],
"AWS WAF": ["x-amzn-requestid", "x-amz-cf-id"],
"Fortinet": ["fortigate", "f5-"],
"Barracuda": ["barracuda"],
"Incapsula": ["visid_incap_"]
}
for provider, sigs in header_indicators.items():
for sig in sigs:
try:
if ":" in sig:
hname, hv = [s.strip() for s in sig.split(":", 1)]
hv = hv.lower()
if headers.get(hname) and hv in headers.get(hname, "").lower():
result.update({"detected": True, "provider": provider, "confidence": 0.75, "evidence": [f"header:{hname}"]})
return result
else:
if any(sig in h for h in headers.keys()):
result.update({"detected": True, "provider": provider, "confidence": 0.7, "evidence": [f"header_contains:{sig}"]})
return result
if sig in body_snippet.lower():
result.update({"detected": True, "provider": provider, "confidence": 0.6, "evidence": ["body_snippet"]})
return result
if re.search(re.escape(sig), cookie_keys, re.I):
result.update({"detected": True, "provider": provider, "confidence": 0.65, "evidence": ["cookie_name"]})
return result
except Exception:
continue
challenge_patterns = [r"attention required", r"access denied", r"please enable cookies", r"security check", r"verify you are a human", r"challenge.*cloudflare"]
for pat in challenge_patterns:
if re.search(pat, body_snippet, re.I):
result.update({"detected": True, "provider": "Unknown (challenge page)", "confidence": 0.5, "evidence": ["challenge_pattern"]})
return result
except Exception as e:
logger.debug(f"WAF detection error heuristics: {e}")
return result
# -------------------- CDN detection --------------------
def detect_cdn_from_headers_and_dns(headers: dict, dns_records: dict, ip: str = None, extracted_data: dict = None) -> dict:
detected = {"source": None, "provider": None, "confidence": 0, "reasons": []}
headers_lower = {k.lower(): v for k, v in (headers or {}).items()}
extracted_data = extracted_data or {}
cdn_header_signatures = {
"Cloudflare": ["cf-ray", "cf-cache-status", "server: cloudflare", "cf-request-id"],
"Akamai": ["x-akamai-transformed", "x-akamai-request-id", "akamai"],
"Amazon CloudFront": ["x-amz-cf-id", "via: 1.1 cloudfront", "x-cache"],
"Fastly": ["x-served-by", "x-fastly-backend-request-id", "x-cache"],
"Sucuri": ["x-sucuri-cache", "x-sucuri-id"],
"Google Cloud CDN": ["x-goog-gfe-response-headers", "x-google-gfe"],
"Incapsula": ["x-iinfo", "visid_incap_"],
"Azure CDN": ["cdn-io", "azureedge", "azurefd", "akadns"],
"Netlify": ["netlify"],
"Cloudflare Stream": ["cf-stream"],
"BunnyCDN": ["bunnycdn"],
"StackPath": ["stackpathcdn"],
"KeyCDN": ["x-keycdn"],
"CDN77": ["cdn77"],
"Akamai EdgeKey": ["edgekey.net"]
}
for provider, sigs in cdn_header_signatures.items():
for sig in sigs:
if any(sig in h for h in headers_lower.keys()) or any(sig in v.lower() for v in headers_lower.values()):
detected.update({"source": "Headers", "provider": provider, "confidence": 95})
detected["reasons"].append(f"header signature matched {sig}")
return detected
cname_records = dns_records.get("CNAME", []) if dns_records else []
try:
candidate_host = cname_records[0] if cname_records else None
cname_chain = resolve_cname_chain(candidate_host) if candidate_host else []
cname_patterns = {
"Cloudflare": r"cloudflare|cloudfront|cloudflare.net",
"Akamai": r"akamai|akamaiedge|akamaitechnologies|edgekey\.net|akamaiedge\.net",
"Amazon CloudFront": r"cloudfront\.net",
"Fastly": r"fastly\.net|fastly",
"Incapsula": r"incapsula|imperva",
"Sucuri": r"sucuri\.net|sucuri",
"Azure CDN": r"azureedge|azurefd|z6rungcdn|azure",
"Netlify": r"netlify\.app|netlify",
"BunnyCDN": r"bunnycdn",
"StackPath": r"stackpathdns",
"KeyCDN": r"kccdn|kxcdn",
"CDN77": r"cdn77",
}
for provider, pattern in cname_patterns.items():
for cname in (cname_records + cname_chain):
if re.search(pattern, cname, re.I):
detected.update({"source": "DNS CNAME", "provider": provider, "confidence": 85})
detected["reasons"].append(f"CNAME {cname} matches {provider}")
return detected
except Exception as e:
logger.debug(f"CDN CNAME check error: {e}")
try:
asset_hosts = set()
for linklist in ("js_links", "css_links", "image_links", "form_links"):
for a in extracted_data.get(linklist, []):
try:
p = urlparse(a)
if p.hostname:
asset_hosts.add(p.hostname.lower())
except Exception:
continue
asset_hosts_list = list(asset_hosts)
asset_host_patterns = {
"Cloudflare": ["cloudflare", "cdn-cdn.cloudflare", "cloudflare.net", "cdn-cgi"],
"Akamai": ["akamai.net", "akamaiedge", "akamaitechnologies", "edgekey.net"],
"Fastly": ["fastly.net", "fastly"],
"Amazon CloudFront": ["cloudfront.net", "amazonaws.com"],
"Netlify": ["netlify.app", "netlify"],
"BunnyCDN": ["b-cdn.net", "bunnycdn"],
"Google Cloud CDN": ["googleusercontent.com", "googleapis.com"],
"KeyCDN": ["kxcdn", "kccdn"],
"CDN77": ["cdn77"],
"StackPath": ["stackpathcdn", "stackpathdns"]
}
for provider, pats in asset_host_patterns.items():
for pat in pats:
for ah in asset_hosts_list:
if pat in ah:
detected.update({"source": "Asset Hosts", "provider": provider, "confidence": 80})
detected["reasons"].append(f"asset host {ah} contains {pat}")
return detected
except Exception as e:
logger.debug(f"Asset host analysis error: {e}")
return detected
# -------------------- Main async scan (IMPROVED) --------------------
async def main_async_scan(url: str):
scan_start = datetime.utcnow().isoformat() + "Z"
try:
logger.info(f"Starting scan for {url}")
# Step 1: Try Playwright render (get content + headers)
dynamic_html, dynamic_headers, dynamic_final_url = await get_dynamic_html(url)
final_html = dynamic_html or ""
final_headers = dynamic_headers or {}
final_url = dynamic_final_url or url
static_response = None
# If no dynamic content, try static fetch (async)
if not final_html:
logger.info("Dynamic fetch empty; attempting static fetch...")
static_response = await fetch_static(url)
if static_response and static_response.status_code == 200:
final_html = static_response.text or ""
final_headers = dict(static_response.headers or {})
final_url = str(static_response.url or url)
else:
# fallback sync attempt to capture headers/body
try:
r = httpx.get(url, follow_redirects=True, timeout=10, headers=get_realistic_headers(url))
if r.status_code == 200:
final_html = r.text or ""
final_headers = dict(r.headers or {})
final_url = str(r.url or url)
else:
logger.warning(f"Static fetch returned {r.status_code} for {url}")
except Exception as e:
logger.debug(f"Sync fallback static fetch failed: {e}")
else:
# We have dynamic HTML; ensure we also have headers (use static fetch or HEAD if headers missing)
if not final_headers:
try:
head_resp = httpx.head(final_url, follow_redirects=True, timeout=8, headers=get_realistic_headers(final_url))
if head_resp and head_resp.status_code < 400:
final_headers = dict(head_resp.headers or {})
else:
r2 = httpx.get(final_url, follow_redirects=True, timeout=10, headers=get_realistic_headers(final_url))
if r2:
final_headers = dict(r2.headers or {})
except Exception as e:
logger.debug(f"Failed to fetch headers fallback: {e}")
# store raw evidence: headers + body
raw_evidence = {}
if final_html:
raw_body_bytes = (final_html.encode("utf-8") if isinstance(final_html, str) else (final_html or b""))
raw_evidence["body"] = store_raw_evidence(raw_body_bytes, prefix="body")
if final_headers:
try:
hdr_bytes = json.dumps(dict(final_headers), ensure_ascii=False).encode("utf-8")
raw_evidence["headers"] = store_raw_evidence(hdr_bytes, prefix="headers")
except Exception:
raw_evidence["headers"] = {"error": "failed_to_store_headers"}
# Step 2: Extract links and resources (ensure final_url passed)
logger.info("Extracting links and resources...")
extracted_data = extract_links_and_scripts(final_html or "", final_url)
js_links = extracted_data.get("js_links", [])
css_links = extracted_data.get("css_links", [])
# Step 3: Run detection tasks concurrently
logger.info("Detecting technologies (Wappalyzer/BuiltWith/JS/CSS heuristics)...")
tasks = [
detect_technologies_wappalyzer(final_url, final_html or "", final_headers),
detect_technologies_builtwith(final_url),
detect_js_technologies(js_links, final_url, final_html or ""),
detect_css_technologies(css_links, final_html or "")
]
wappalyzer_res, builtwith_res, js_res, css_res = await asyncio.gather(*tasks)
# Step 4: Combine technologies
all_tech = (wappalyzer_res or []) + (builtwith_res or []) + (js_res or []) + (css_res or [])
tech_map: Dict[str, Any] = {}
for tech in all_tech:
name = tech.get("name")
if not name:
continue
existing = tech_map.get(name)
confidence = float(tech.get("confidence", 50))
if existing:
existing_conf = float(existing.get("confidence", 0))
existing["confidence"] = max(existing_conf, confidence)
existing_sources = set([s.strip() for s in str(existing.get("source", "")).split(",") if s])
incoming_source = tech.get("source") or ""
if incoming_source and incoming_source not in existing_sources:
existing_sources.add(incoming_source)
existing["source"] = ", ".join(sorted(existing_sources))
existing_prov = set(existing.get("provenance", []) or [])
incoming_prov = set(tech.get("provenance", []) or [])
existing["provenance"] = list(existing_prov.union(incoming_prov))
if tech.get("version") and existing.get("version") in (None, "Unknown"):
existing["version"] = tech.get("version")
else:
tech_map[name] = {
"name": name,
"version": tech.get("version", "Unknown"),
"confidence": confidence,
"source": tech.get("source", ""),
"provenance": tech.get("provenance", []) or []
}
combined_tech = list(tech_map.values())
combined_tech.sort(key=lambda x: x.get("confidence", 0), reverse=True)
# Step 5: DNS and SSL
parsed = urlparse(final_url)
domain = parsed.netloc.split(":")[0] if parsed.netloc else ""
dns_records = get_dns_records(domain) if domain else {}
ssl_info = {}
if parsed.scheme == "https" and domain:
ssl_info = get_ssl_info(domain)
# Step 6: IP info
ip_info = {}
if dns_records.get("A"):
ip = dns_records["A"][0] if isinstance(dns_records["A"], list) and dns_records["A"] else dns_records["A"]
ip_info = get_ip_info(ip)
# Step 7: robots.txt
robots_info = await analyze_robots(domain) if domain else {"exists": False, "tried": [], "error": "no domain"}
# Step 8: Security headers and CMS detection
security_headers = analyze_security_headers(final_headers)
cms_info = detect_cms(final_html or "", final_headers or {}, final_url, extracted_data=extracted_data)
# Step 9: payments and trackers
payment_methods_info = detect_payment_methods(final_html or "", extracted_data=extracted_data)
trackers_info = detect_trackers_and_analytics(final_html or "", js_links=extracted_data.get("js_links", []), meta_tags=extracted_data.get("meta_tags", []))
# Step 10: WAF & CDN heuristics
waf_info = detect_waf_subprocess(final_url)
cdn_info = detect_cdn_from_headers_and_dns(final_headers or {}, dns_records or {}, ip_info.get("asn_cidr") if ip_info else None, extracted_data=extracted_data)
# Inference rules for Cloudflare
try:
if (not cdn_info.get("provider")) and waf_info.get("provider") and "cloudflare" in (waf_info.get("provider") or "").lower():
cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 90, "reasons": ["waf indicates Cloudflare"]})
elif (not cdn_info.get("provider")) and ip_info and ip_info.get("asn_description") and "cloudflare" in str(ip_info.get("asn_description")).lower():
cdn_info.update({"source": "inferred", "provider": "Cloudflare", "confidence": 85, "reasons": ["ip whois ASN indicates Cloudflare"]})
else:
ns_list = dns_records.get("NS", []) or []
if (not cdn_info.get("provider")):
for ns in ns_list:
if "cloudflare" in ns.lower():
cdn_info.update({"source": "dns", "provider": "Cloudflare", "confidence": 85, "reasons": [f"NS {ns} indicates Cloudflare"]})
break
except Exception:
pass
# Build final report
title = "No Title"
try:
soup = BeautifulSoup(final_html or "", "lxml")
if soup.title and soup.title.string:
title = soup.title.string.strip()
except Exception:
title = "No Title"
report = {
"scan_id": generate_scan_id(),
"scanned_at": scan_start,
"url": final_url,
"title": title,
"raw_evidence": raw_evidence,
"technologies": combined_tech,
"links_and_resources": extracted_data,
"dns_records": dns_records,
"ssl_info": ssl_info,
"ip_info": ip_info,
"robots_info": robots_info,
"security_headers": security_headers,
"cms_info": cms_info,
"payment_methods": payment_methods_info,
"trackers_and_analytics": trackers_info,
"waf_info": waf_info,
"cdn_info": cdn_info,
"headers": final_headers,
"notes": "Report contains provenance (raw_evidence paths) and normalized confidence scores (0-100 for technologies)."
}
# Normalize confidence to 0-100 for technologies
for t in report["technologies"]:
try:
t_conf = float(t.get("confidence", 50))
if 0 <= t_conf <= 1:
t["confidence"] = int(round(t_conf * 100))
else:
t["confidence"] = int(round(min(max(t_conf, 0), 100)))
except Exception:
t["confidence"] = 50
return safe_json(report)
except Exception as e:
logger.exception("Main scan failed")
return safe_json({"error": "Main scan failed", "details": str(e), "scanned_at": scan_start})
# -------------------- Convenience wrapper used by analyze_site.py --------------------
async def run_scan_for_url(url: str, render_js: bool = False, scan_id: Optional[str] = None) -> Dict[str, Any]:
try:
report = await main_async_scan(url)
if not isinstance(report, dict):
report = {"error": "invalid_report", "details": "Scanner returned non-dict result", "raw": str(report)}
report.setdefault("scanned_url", report.get("url", url))
if scan_id:
report["scan_id"] = scan_id
report.setdefault("url", report.get("scanned_url"))
report.setdefault("technologies", report.get("technologies", []))
report.setdefault("dns_records", report.get("dns_records", {}))
report.setdefault("robots_info", report.get("robots_info", {"exists": False}))
report.setdefault("headers", report.get("headers", {}))
# compatibility aliases
report.setdefault("waf", report.get("waf_info"))
report.setdefault("cdn", report.get("cdn_info"))
report.setdefault("payments", report.get("payment_methods"))
return report
except Exception as e:
logger.exception("run_scan_for_url wrapper failed")
return safe_json({"error": "run_scan_for_url_failed", "details": str(e), "scanned_url": url})
if __name__ == '__main__':
# quick smoke test when running standalone
test_url = "https://www.google.com"
# note: run async via: python -c "import asyncio, utils; asyncio.run(utils.main_async_scan('https://example.com'))"
pass