275 أسطر
13 KiB
Python
275 أسطر
13 KiB
Python
# analyze_site.py
|
|
# Updated to match the improved utils.py (compat_resources, run_scan_for_url, etc.)
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
import asyncio
|
|
import sys
|
|
|
|
# Try flexible imports so this file works whether utils.py is at project root or inside `app` package.
|
|
try:
|
|
# Preferred when utils is inside the `app` package (app/utils.py)
|
|
from app.utils import safe_json, run_scan_for_url, generate_scan_id
|
|
except Exception:
|
|
try:
|
|
# Fallback to top-level utils.py
|
|
from utils import safe_json, run_scan_for_url, generate_scan_id # type: ignore
|
|
except Exception as e:
|
|
raise ImportError("Could not import required utilities (safe_json, run_scan_for_url, generate_scan_id).") from e
|
|
|
|
|
|
logger = logging.getLogger("SuperRecon")
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
logger.setLevel("INFO")
|
|
|
|
|
|
async def run_scan(target_url: str, render_js: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Orchestrates a full site scan for a single URL using run_scan_for_url from utils.
|
|
Returns the raw report (dict) or a safe_json-wrapped error dict.
|
|
"""
|
|
scan_id = generate_scan_id()
|
|
logger.info(f"Starting scan {scan_id} for URL: {target_url} (render_js={render_js})")
|
|
|
|
try:
|
|
# run_scan_for_url already accepts scan_id and render_js and returns a dict
|
|
report = await run_scan_for_url(target_url, render_js=render_js, scan_id=scan_id)
|
|
logger.info(f"Scan {scan_id} completed successfully for {target_url}.")
|
|
# Ensure report is a dict and include scan_id
|
|
if not isinstance(report, dict):
|
|
report = {"error": "invalid_report", "details": "Scanner returned non-dict result", "raw": str(report)}
|
|
report.setdefault("scan_id", scan_id)
|
|
report.setdefault("scanned_url", report.get("url", target_url))
|
|
return report
|
|
except Exception as e:
|
|
logger.error(f"Scan {scan_id} failed with error: {e}", exc_info=True)
|
|
return safe_json({"error": "Scan failed", "details": str(e), "scan_id": scan_id, "scanned_url": target_url})
|
|
|
|
|
|
def _fmt_confidence(conf: Optional[Any]) -> str:
|
|
try:
|
|
if conf is None:
|
|
return "0%"
|
|
# If float in [0,1], convert to percent
|
|
if isinstance(conf, float) and 0.0 <= conf <= 1.0:
|
|
return f"{int(round(conf * 100))}%"
|
|
# else try numeric
|
|
val = int(float(conf))
|
|
if 0 <= val <= 100:
|
|
return f"{val}%"
|
|
return f"{max(0, min(val, 100))}%"
|
|
except Exception:
|
|
try:
|
|
return f"{int(conf)}%"
|
|
except Exception:
|
|
return str(conf)
|
|
|
|
|
|
def format_final_report(report_data: Dict[str, Any]) -> str:
|
|
"""
|
|
Formats the raw scan report data into a human-readable, well-structured string (Arabic).
|
|
Tolerant to different shapes of report_data (single report or wrapper).
|
|
"""
|
|
if "error" in report_data:
|
|
return f"❌ تقرير الفحص: حدث خطأ\n\n{report_data.get('details', 'لا يوجد تفاصيل')}"
|
|
|
|
# Accept either {"full_report": [...]} or a single report dict
|
|
full_reports = report_data.get("full_report")
|
|
if not full_reports:
|
|
# If the provided dict already looks like a single scan report, wrap it
|
|
if "scanned_url" in report_data or "url" in report_data:
|
|
full_reports = [report_data]
|
|
else:
|
|
# If a summary with list of reports is provided, try extracting
|
|
if isinstance(report_data.get("reports"), list):
|
|
full_reports = report_data.get("reports")
|
|
else:
|
|
return "⚠️ لم يتم العثور على تقارير فحص.\nقد يكون الموقع غير متاح أو لم يتم تنفيذ الفحص."
|
|
|
|
output_str = "✨ **تقرير فحص شامل للموقع** ✨\n\n"
|
|
output_str += "---\n\n"
|
|
|
|
# Summary of scanned URLs (if available)
|
|
scanned_urls_summary = report_data.get("summary", {}).get("scanned_urls", [])
|
|
output_str += "**✅ الصفحات التي تم فحصها:**\n"
|
|
if scanned_urls_summary:
|
|
output_str += "\n".join([f"• {url}" for url in scanned_urls_summary]) + "\n\n"
|
|
else:
|
|
collected = [r.get("scanned_url") or r.get("url") for r in full_reports if r.get("scanned_url") or r.get("url")]
|
|
if collected:
|
|
output_str += "\n".join([f"• {url}" for url in collected]) + "\n\n"
|
|
else:
|
|
output_str += "• لم يتم توفير ملخص للروابط المفحوصة.\n\n"
|
|
|
|
for report in full_reports:
|
|
url = report.get("scanned_url", report.get("url", "URL غير معروف"))
|
|
scan_id = report.get("scan_id", "")
|
|
scanned_at = report.get("scanned_at", report.get("scanned_at", "غير معروف"))
|
|
|
|
output_str += "---\n\n"
|
|
output_str += f"### **🌐 تقرير الفحص لصفحة: {url}**\n"
|
|
if scan_id:
|
|
output_str += f"- **معرّف الفحص:** `{scan_id}`\n"
|
|
if scanned_at:
|
|
output_str += f"- **وقت الفحص:** {scanned_at}\n"
|
|
output_str += "\n"
|
|
|
|
# Security Headers
|
|
output_str += "**🛡️ رؤوس الأمان (Security Headers):**\n"
|
|
sec_headers = report.get("security_headers", {})
|
|
if sec_headers:
|
|
for h, d in sec_headers.items():
|
|
try:
|
|
# d may be dict with status/value
|
|
if isinstance(d, dict):
|
|
status = d.get("status", "")
|
|
value = d.get("value", "")
|
|
output_str += f" - **{h}**: {status} — `{value}`\n"
|
|
else:
|
|
output_str += f" - **{h}**: {d}\n"
|
|
except Exception:
|
|
output_str += f" - **{h}**: {d}\n"
|
|
else:
|
|
output_str += " - لم يتم العثور على رؤوس أمان أساسية.\n"
|
|
output_str += "\n"
|
|
|
|
# DNS Records
|
|
output_str += "**📡 معلومات DNS:**\n"
|
|
dns_records = report.get("dns_records", {})
|
|
if dns_records:
|
|
for rtype, records in dns_records.items():
|
|
try:
|
|
recs_display = ", ".join(records) if isinstance(records, (list, tuple)) and records else str(records)
|
|
except Exception:
|
|
recs_display = str(records)
|
|
output_str += f" - **{rtype}**: {recs_display}\n"
|
|
else:
|
|
output_str += " - لا توجد سجلات DNS أو لم يتم استردادها.\n"
|
|
output_str += "\n"
|
|
|
|
# SSL Info
|
|
output_str += "**🔒 شهادة SSL:**\n"
|
|
ssl_info = report.get("ssl_info", {}) or {}
|
|
if ssl_info.get("valid"):
|
|
not_after = ssl_info.get("not_after", "غير معروف")
|
|
issuer = ssl_info.get("issuer") or {}
|
|
issuer_cn = issuer.get("CN") if isinstance(issuer, dict) else issuer
|
|
output_str += f" - ✅ صالحة حتى: {not_after}\n"
|
|
output_str += f" - جهة الإصدار: {issuer_cn if issuer_cn else issuer}\n"
|
|
elif ssl_info.get("error"):
|
|
output_str += f" - ❌ خطأ في فحص الشهادة: {ssl_info.get('error')}\n"
|
|
else:
|
|
output_str += " - ❌ غير مفعلة أو غير متاحة.\n"
|
|
output_str += "\n"
|
|
|
|
# Technologies
|
|
output_str += "**🛠️ التقنيات المكتشفة:**\n"
|
|
teks = report.get("technologies", []) or []
|
|
if teks:
|
|
# Sort by confidence desc and show all (or limit if you want)
|
|
for t in sorted(teks, key=lambda x: x.get('confidence', 0), reverse=True):
|
|
name = t.get("name", "غير معروف")
|
|
confidence = _fmt_confidence(t.get("confidence", 0))
|
|
category = t.get("categories") or t.get("category") or []
|
|
if isinstance(category, (list, tuple)):
|
|
cat_display = ", ".join(category) if category else "غير محدد"
|
|
else:
|
|
cat_display = str(category)
|
|
source = t.get("source", "غير معروف")
|
|
version = t.get("version", "") or ""
|
|
emoji = "⭐" if int(confidence.strip("%")) > 90 else "👍" if int(confidence.strip("%")) > 70 else "🧐"
|
|
output_str += f" - {emoji} **{name}**"
|
|
if version:
|
|
output_str += f" (الإصدار: {version})"
|
|
output_str += f"\n"
|
|
output_str += f" - **الفئة**: {cat_display}\n"
|
|
output_str += f" - **الثقة**: {confidence}\n"
|
|
output_str += f" - **المصدر**: {source}\n"
|
|
else:
|
|
output_str += " - لم يتم العثور على تقنيات.\n"
|
|
output_str += "\n"
|
|
|
|
# Robots.txt
|
|
output_str += "**🤖 ملف Robots.txt:**\n"
|
|
robots_info = report.get("robots_info", {}) or {}
|
|
if robots_info.get("exists"):
|
|
output_str += f" - ✅ **موجود** في: {robots_info.get('fetched_from')}\n"
|
|
if robots_info.get("sitemaps"):
|
|
s = robots_info.get("sitemaps")
|
|
output_str += f" - **Sitemaps**: {', '.join(s)}\n"
|
|
if robots_info.get("rules"):
|
|
output_str += " - **قواعد**: يحتوي على قواعد Allow/Disallow.\n"
|
|
else:
|
|
tried = robots_info.get("tried") or []
|
|
if tried:
|
|
output_str += f" - ❌ غير موجود بعد محاولة الوصول إلى: {', '.join(tried)}\n"
|
|
else:
|
|
output_str += " - ❌ غير موجود أو لم يتم فحصه.\n"
|
|
output_str += "\n"
|
|
|
|
# Payment Methods
|
|
output_str += "**💳 طرق الدفع:**\n"
|
|
payment_methods = report.get("payment_methods", []) or []
|
|
if payment_methods:
|
|
names = []
|
|
for method in payment_methods:
|
|
if isinstance(method, dict):
|
|
names.append(method.get("name") or str(method))
|
|
else:
|
|
names.append(str(method))
|
|
output_str += f" - تم العثور على: {', '.join(names)}\n"
|
|
else:
|
|
output_str += " - لم يتم العثور على طرق دفع معروفة.\n"
|
|
output_str += "\n"
|
|
|
|
# Trackers & Analytics
|
|
output_str += "**📈 المتتبعات (Trackers & Analytics):**\n"
|
|
trackers_info = report.get("trackers_and_analytics", []) or []
|
|
if trackers_info:
|
|
output_str += " - " + ", ".join(trackers_info) + "\n"
|
|
else:
|
|
output_str += " - لا توجد متتبعات معروفة.\n"
|
|
output_str += "\n"
|
|
|
|
# WAF & CDN
|
|
output_str += "**🛡️ WAF و CDN (استدلالي):**\n"
|
|
waf = report.get("waf_info") or report.get("waf") or {}
|
|
if waf and waf.get("detected"):
|
|
output_str += f" - WAF مكتشف: {waf.get('provider')} (ثقة: {_fmt_confidence(waf.get('confidence'))})\n"
|
|
else:
|
|
output_str += " - لا يوجد WAF واضح أو لم يتم اكتشافه.\n"
|
|
cdn = report.get("cdn_info") or report.get("cdn") or {}
|
|
if cdn and cdn.get("provider"):
|
|
output_str += f" - CDN مفترض/مكتشف: {cdn.get('provider')} (ثقة: {_fmt_confidence(cdn.get('confidence'))})\n"
|
|
else:
|
|
output_str += " - لا يوجد CDN واضح.\n"
|
|
output_str += "\n"
|
|
|
|
# Final notes
|
|
output_str += f"**📝 ملاحظات:**\n"
|
|
output_str += f"- مسار الأدلة الخام محفوظ في: {report.get('raw_evidence', {}).get('body', {}).get('path', 'غير متوفر')} (إن وُجد)\n"
|
|
output_str += "\n\n"
|
|
|
|
output_str += "---\n\n✨ تم الفحص بنجاح.\n"
|
|
return output_str
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# CLI usage: python analyze_site.py <url>
|
|
if len(sys.argv) > 1:
|
|
test_url = sys.argv[1]
|
|
render_js_flag = False
|
|
if len(sys.argv) > 2 and sys.argv[2].lower() in ("true", "1", "yes", "y"):
|
|
render_js_flag = True
|
|
try:
|
|
res = asyncio.run(run_scan(test_url, render_js=render_js_flag))
|
|
formatted = format_final_report({"full_report": [res], "summary": {"scanned_urls": [test_url]}})
|
|
print(formatted)
|
|
except Exception as e:
|
|
print("فشل تشغيل الفحص:", e)
|
|
else:
|
|
print("Usage: python analyze_site.py <url_to_scan> [render_js: true|false]")
|