import os import logging import asyncio from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import JSONResponse, HTMLResponse from fastapi.middleware.cors import CORSMiddleware from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.middleware import SlowAPIMiddleware from pydantic import BaseModel, HttpUrl from typing import Optional, List from dotenv import load_dotenv from datetime import datetime, timezone from urllib.parse import urlparse import ipaddress # تحميل المتغيرات من ملف .env load_dotenv() # Logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("SuperReconAPI") logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) # إنشاء تطبيق FastAPI app = FastAPI( title="SuperRecon API", version="1.0.0", docs_url="/docs", redoc_url="/redoc" ) # تكوين CORS — تنظيف القيم وفحص القوائم الفارغة raw_origins = os.getenv("CORS_ALLOW_ORIGINS", "") if raw_origins.strip() == "": allow_origins: List[str] = ["*"] else: allow_origins = [o.strip() for o in raw_origins.split(",") if o.strip()] app.add_middleware( CORSMiddleware, allow_origins=allow_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # تهيئة Rate Limiter limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(429, _rate_limit_exceeded_handler) app.add_middleware(SlowAPIMiddleware) # المتغيرات البيئية MAX_CONCURRENT_SCANS = int(os.getenv("MAX_CONCURRENT_SCANS", "8")) SCAN_TIMEOUT = int(os.getenv("SCAN_TIMEOUT", "180")) # seconds rate_limit = os.getenv("RATE_LIMIT", "15/minute") scan_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SCANS) # محاولة استيراد run_scan بمرونة (app.analyze_site أو analyze_site) try: from app.analyze_site import run_scan # type: ignore except Exception: try: from analyze_site import run_scan # type: ignore except Exception as e: # لنفشل مبكرًا مع رسالة واضحة لو لم نتمكن من إيجاد دالة الفحص logger.exception("Cannot import run_scan from app.analyze_site or analyze_site.") raise ImportError("Could not import run_scan from app.analyze_site or analyze_site.") from e class ReconRequest(BaseModel): url: HttpUrl render_js: Optional[bool] = True @app.get("/", response_class=HTMLResponse) async def index(): html = """ SuperRecon API ✅ SuperRecon API جاهز
استخدم الرابط التالي لفحص موقع:
/recon?url=https://example.com

عرض وثائق API """ return HTMLResponse(content=html) @app.get("/health") async def health(): return { "status": "healthy", "service": "SuperRecon API", "version": "1.0.0", "timestamp": datetime.now(timezone.utc).isoformat() } def _is_ip_private(ip_str: str) -> bool: """Returns True if ip_str is private/reserved/loopback/link-local/multicast.""" try: ip_obj = ipaddress.ip_address(ip_str) return ( ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_reserved or ip_obj.is_multicast or ip_obj.is_unspecified ) except Exception: return False async def _ensure_not_local_target(parsed_url): """If the target resolves to private/loopback IPs, raise HTTPException (for safety).""" host = parsed_url.hostname if not host: raise HTTPException(status_code=400, detail="Invalid host in URL.") # if host is an IP literal try: ipaddress.ip_address(host) if _is_ip_private(host): raise HTTPException(status_code=400, detail="Scanning private/loopback addresses is not allowed.") return except ValueError: # hostname - resolve asynchronously using event loop resolver try: loop = asyncio.get_running_loop() # getaddrinfo returns list of tuples; we'll extract the sockaddr[0] infos = await loop.getaddrinfo(host, None) ips = set(sockaddr[0] for _, _, _, _, sockaddr in infos if sockaddr) if not ips: raise HTTPException(status_code=400, detail="Target hostname could not be resolved to any IP.") for ip in ips: if _is_ip_private(ip): raise HTTPException(status_code=400, detail="Target resolves to private/loopback addresses; scanning is blocked.") return except HTTPException: raise except Exception as e: logger.debug(f"DNS resolution error for host {host}: {e}") raise HTTPException(status_code=400, detail="Target hostname could not be resolved.") from e @app.get("/recon") @limiter.limit(rate_limit) async def recon_get( request: Request, url: str = Query(..., description="Target URL to analyze (e.g., https://example.com)"), render_js: bool = Query(True, description="Render page with JavaScript before analysis") ): # validate via pydantic model then dispatch to the POST handler payload = ReconRequest(url=url, render_js=render_js) return await recon_post(request, payload) @app.post("/recon") @limiter.limit(rate_limit) async def recon_post(request: Request, payload: ReconRequest): url_str = str(payload.url) render_js = payload.render_js if not url_str: raise HTTPException(status_code=400, detail="Missing 'url' in payload") # basic sanity: avoid extremely long URLs (simple DoS protection) if len(url_str) > 4096: raise HTTPException(status_code=400, detail="URL too long.") parsed = urlparse(url_str) if parsed.scheme.lower() not in ("http", "https"): raise HTTPException(status_code=400, detail="Only http and https schemes are allowed.") # Ensure the target is not local/private await _ensure_not_local_target(parsed) # get remote address (with fallback) try: remote_addr = get_remote_address(request) except Exception: try: remote_addr = request.client.host # type: ignore except Exception: remote_addr = "unknown" logger.info(f"Scan requested by {remote_addr} for {url_str} (render_js={render_js})") async with scan_semaphore: try: logger.info(f"Starting scan for {url_str}") # run_scan is expected to return a serializable dict (or safe_json already) result = await asyncio.wait_for( run_scan(url_str, render_js=render_js), timeout=SCAN_TIMEOUT ) logger.info(f"Scan completed for {url_str}") # ensure result is JSON serializable; if not, wrap minimally if not isinstance(result, dict): logger.warning("run_scan returned non-dict result; coercing to dict.") result = {"result": str(result)} return JSONResponse(content=result) except asyncio.TimeoutError: logger.warning(f"Scan timed out for {url_str}") return JSONResponse( status_code=504, content={ "success": False, "error": "timeout", "message": f"Scan timed out after {SCAN_TIMEOUT} seconds" } ) except HTTPException: # re-raise HTTPException as-is (e.g., blocked target) raise except Exception as e: logger.exception("Scan failed") raise HTTPException(status_code=500, detail="Internal server error during scan.") from e