الملفات
SuperReconn/app/main.py

221 أسطر
8.0 KiB
Python

import os
import logging
import asyncio
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.middleware import SlowAPIMiddleware
from pydantic import BaseModel, HttpUrl
from typing import Optional, List
from dotenv import load_dotenv
from datetime import datetime, timezone
from urllib.parse import urlparse
import ipaddress
# تحميل المتغيرات من ملف .env
load_dotenv()
# Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("SuperReconAPI")
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
# إنشاء تطبيق FastAPI
app = FastAPI(
title="SuperRecon API",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# تكوين CORS — تنظيف القيم وفحص القوائم الفارغة
raw_origins = os.getenv("CORS_ALLOW_ORIGINS", "")
if raw_origins.strip() == "":
allow_origins: List[str] = ["*"]
else:
allow_origins = [o.strip() for o in raw_origins.split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# تهيئة Rate Limiter
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# المتغيرات البيئية
MAX_CONCURRENT_SCANS = int(os.getenv("MAX_CONCURRENT_SCANS", "8"))
SCAN_TIMEOUT = int(os.getenv("SCAN_TIMEOUT", "180")) # seconds
rate_limit = os.getenv("RATE_LIMIT", "15/minute")
scan_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SCANS)
# محاولة استيراد run_scan بمرونة (app.analyze_site أو analyze_site)
try:
from app.analyze_site import run_scan # type: ignore
except Exception:
try:
from analyze_site import run_scan # type: ignore
except Exception as e:
# لنفشل مبكرًا مع رسالة واضحة لو لم نتمكن من إيجاد دالة الفحص
logger.exception("Cannot import run_scan from app.analyze_site or analyze_site.")
raise ImportError("Could not import run_scan from app.analyze_site or analyze_site.") from e
class ReconRequest(BaseModel):
url: HttpUrl
render_js: Optional[bool] = True
@app.get("/", response_class=HTMLResponse)
async def index():
html = """
<title>SuperRecon API</title>
<style>body { font-family: Arial, sans-serif; margin: 40px; }</style>
✅ SuperRecon API جاهز<br>
استخدم الرابط التالي لفحص موقع:<br>
/recon?url=https://example.com<br><br>
<a href="/docs">عرض وثائق API</a>
"""
return HTMLResponse(content=html)
@app.get("/health")
async def health():
return {
"status": "healthy",
"service": "SuperRecon API",
"version": "1.0.0",
"timestamp": datetime.now(timezone.utc).isoformat()
}
def _is_ip_private(ip_str: str) -> bool:
"""Returns True if ip_str is private/reserved/loopback/link-local/multicast."""
try:
ip_obj = ipaddress.ip_address(ip_str)
return (
ip_obj.is_private
or ip_obj.is_loopback
or ip_obj.is_link_local
or ip_obj.is_reserved
or ip_obj.is_multicast
or ip_obj.is_unspecified
)
except Exception:
return False
async def _ensure_not_local_target(parsed_url):
"""If the target resolves to private/loopback IPs, raise HTTPException (for safety)."""
host = parsed_url.hostname
if not host:
raise HTTPException(status_code=400, detail="Invalid host in URL.")
# if host is an IP literal
try:
ipaddress.ip_address(host)
if _is_ip_private(host):
raise HTTPException(status_code=400, detail="Scanning private/loopback addresses is not allowed.")
return
except ValueError:
# hostname - resolve asynchronously using event loop resolver
try:
loop = asyncio.get_running_loop()
# getaddrinfo returns list of tuples; we'll extract the sockaddr[0]
infos = await loop.getaddrinfo(host, None)
ips = set(sockaddr[0] for _, _, _, _, sockaddr in infos if sockaddr)
if not ips:
raise HTTPException(status_code=400, detail="Target hostname could not be resolved to any IP.")
for ip in ips:
if _is_ip_private(ip):
raise HTTPException(status_code=400, detail="Target resolves to private/loopback addresses; scanning is blocked.")
return
except HTTPException:
raise
except Exception as e:
logger.debug(f"DNS resolution error for host {host}: {e}")
raise HTTPException(status_code=400, detail="Target hostname could not be resolved.") from e
@app.get("/recon")
@limiter.limit(rate_limit)
async def recon_get(
request: Request,
url: str = Query(..., description="Target URL to analyze (e.g., https://example.com)"),
render_js: bool = Query(True, description="Render page with JavaScript before analysis")
):
# validate via pydantic model then dispatch to the POST handler
payload = ReconRequest(url=url, render_js=render_js)
return await recon_post(request, payload)
@app.post("/recon")
@limiter.limit(rate_limit)
async def recon_post(request: Request, payload: ReconRequest):
url_str = str(payload.url)
render_js = payload.render_js
if not url_str:
raise HTTPException(status_code=400, detail="Missing 'url' in payload")
# basic sanity: avoid extremely long URLs (simple DoS protection)
if len(url_str) > 4096:
raise HTTPException(status_code=400, detail="URL too long.")
parsed = urlparse(url_str)
if parsed.scheme.lower() not in ("http", "https"):
raise HTTPException(status_code=400, detail="Only http and https schemes are allowed.")
# Ensure the target is not local/private
await _ensure_not_local_target(parsed)
# get remote address (with fallback)
try:
remote_addr = get_remote_address(request)
except Exception:
try:
remote_addr = request.client.host # type: ignore
except Exception:
remote_addr = "unknown"
logger.info(f"Scan requested by {remote_addr} for {url_str} (render_js={render_js})")
async with scan_semaphore:
try:
logger.info(f"Starting scan for {url_str}")
# run_scan is expected to return a serializable dict (or safe_json already)
result = await asyncio.wait_for(
run_scan(url_str, render_js=render_js),
timeout=SCAN_TIMEOUT
)
logger.info(f"Scan completed for {url_str}")
# ensure result is JSON serializable; if not, wrap minimally
if not isinstance(result, dict):
logger.warning("run_scan returned non-dict result; coercing to dict.")
result = {"result": str(result)}
return JSONResponse(content=result)
except asyncio.TimeoutError:
logger.warning(f"Scan timed out for {url_str}")
return JSONResponse(
status_code=504,
content={
"success": False,
"error": "timeout",
"message": f"Scan timed out after {SCAN_TIMEOUT} seconds"
}
)
except HTTPException:
# re-raise HTTPException as-is (e.g., blocked target)
raise
except Exception as e:
logger.exception("Scan failed")
raise HTTPException(status_code=500, detail="Internal server error during scan.") from e