From c144d20f7c1a0ff1b120d5fe510a546a4e6329af Mon Sep 17 00:00:00 2001
From: ghaymah_dev
Date: Wed, 29 Oct 2025 13:35:42 +0000
Subject: [PATCH] Update main.py
---
main.py | 319 +++++++++++++++++++++++++++++++++++++++++++++-----------
1 file changed, 261 insertions(+), 58 deletions(-)
diff --git a/main.py b/main.py
index f37c4f5..10bb72a 100644
--- a/main.py
+++ b/main.py
@@ -1,46 +1,50 @@
import os
-from fastapi import FastAPI, HTTPException
+import gzip
+import json
+from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse
from pydantic import BaseModel
-from dataforseo_sdk import DataForSeoClient
-from dataforseo_sdk.config import DFSConfig
-import asyncio
-from typing import List, Dict, Optional
+from typing import List, Dict, Optional, Any
import pandas as pd
from dotenv import load_dotenv
+from client import RestClient
+import logging
+
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize FastAPI app
-app = FastAPI(title="Domain Ranking Keywords API", version="1.0.0")
+app = FastAPI(
+ title="DataForSEO API Service",
+ version="1.0.0",
+ description="Comprehensive DataForSEO API integration including Domain Keywords and AI Mode SERP - بسم الله الرحمن الرحيم"
+)
-# DataForSEO configuration
-class DataForSEOConfig:
- def __init__(self):
- self.login = os.getenv('DATAFORSEO_API_LOGIN')
- self.password = os.getenv('DATAFORSEO_API_PASSWORD')
-
- if not self.login or not self.password:
- raise ValueError("Please set DATAFORSEO_API_LOGIN and DATAFORSEO_API_PASSWORD environment variables")
-
- self.config = DFSConfig(login=self.login, password=self.password)
-
-# Request model
+# Request models
class DomainRequest(BaseModel):
domain: str
country_code: Optional[str] = "us"
language_code: Optional[str] = "en"
- limit: Optional[int] = 1000
+ limit: Optional[int] = 100
-# Response model
+class SERPRequest(BaseModel):
+ keyword: str
+ location_code: int
+ language_code: Optional[str] = "en"
+
+# Response models
class KeywordRanking(BaseModel):
keyword: str
position: float
url: str
- search_volume: Optional[int]
- cpc: Optional[float]
- competition: Optional[float]
+ search_volume: Optional[int] = 0
+ cpc: Optional[float] = 0.0
+ competition: Optional[float] = 0.0
+ country_code: Optional[str] = "us"
class DomainResponse(BaseModel):
domain: str
@@ -48,19 +52,178 @@ class DomainResponse(BaseModel):
keywords: List[KeywordRanking]
message: str
+class SERPItem(BaseModel):
+ type: str
+ title: str
+ url: str
+ description: Optional[str] = None
+ position: Optional[int] = None
+ rank_group: Optional[int] = None
+
+class SERPResponse(BaseModel):
+ keyword: str
+ location_code: int
+ total_results: int
+ items: List[SERPItem]
+ search_metadata: Dict[str, Any]
+
# Initialize DataForSEO client
def get_dfs_client():
- config = DataForSEOConfig()
- return DataForSeoClient(config.config)
+ username = os.getenv('DATAFORSEO_API_LOGIN')
+ password = os.getenv('DATAFORSEO_API_PASSWORD')
+
+ if not username or not password:
+ raise ValueError("Please set DATAFORSEO_API_LOGIN and DATAFORSEO_API_PASSWORD environment variables")
+
+ return RestClient(username, password)
@app.get("/")
async def root():
- return {"message": "Domain Ranking Keywords API - بسم الله الرحمن الرحيم"}
+ return {
+ "message": "DataForSEO API Service - بسم الله الرحمن الرحيم",
+ "endpoints": {
+ "health": "/health",
+ "domain_keywords": "/domain-keywords",
+ "serp_search": "/api/search",
+ "export_csv": "/export-keywords-csv",
+ "download_csv": "/download-csv/{filename}"
+ }
+ }
@app.get("/health")
async def health_check():
- return {"status": "healthy", "message": "API is running"}
+ try:
+ client = get_dfs_client()
+ # Test API connection with a simple request
+ test_response = client.get("/v3/applications/user")
+ return {
+ "status": "healthy",
+ "message": "API is running and DataForSEO connection is working",
+ "dataforseo_status": "connected"
+ }
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"DataForSEO connection failed: {str(e)}")
+# AI Mode SERP Search Endpoint
+@app.get("/api/search")
+async def search_ai_mode(
+ keyword: str = Query(..., description="Search keyword"),
+ location_code: int = Query(..., description="Location code (integer)"),
+ language_code: str = Query("en", description="Language code")
+):
+ """
+ AI Mode SERP Search - Get comprehensive search results for a keyword
+ """
+ # Validate required parameters
+ if not keyword:
+ raise HTTPException(status_code=400, detail="keyword parameter is required")
+
+ if not location_code:
+ raise HTTPException(status_code=400, detail="location_code parameter is required")
+
+ try:
+ client = get_dfs_client()
+
+ # Prepare request data
+ post_data = dict()
+ post_data[len(post_data)] = dict(
+ language_code=language_code,
+ location_code=location_code,
+ keyword=keyword
+ )
+
+ # Make API call
+ logger.info(f"Making AI Mode API call for keyword: {keyword}, location_code: {location_code}")
+ response = client.post("/v3/serp/google/ai_mode/live/advanced", post_data)
+
+ logger.info(f"Raw API response status: {response.get('status_code')}")
+
+ # Process response
+ if response.get("status_code") == 20000:
+ # Extract and simplify the response
+ simplified_response = extract_simplified_data(response)
+ return simplified_response
+ else:
+ logger.error(f"API error: {response.get('status_code')} - {response.get('status_message')}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"API error. Code: {response.get('status_code')} Message: {response.get('status_message')}"
+ )
+
+ except Exception as e:
+ logger.error(f"Error in AI Mode search: {str(e)}")
+ raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
+
+def extract_simplified_data(response: Dict) -> Dict:
+ """
+ Extract and simplify the AI Mode SERP response data
+ """
+ try:
+ simplified_items = []
+
+ if "tasks" in response and response["tasks"]:
+ for task in response["tasks"]:
+ if "result" in task and task["result"]:
+ for result in task["result"]:
+ # Extract basic search info
+ search_info = {
+ "keyword": result.get("keyword", ""),
+ "location_code": result.get("location_code"),
+ "language_code": result.get("language_code"),
+ "check_url": result.get("check_url"),
+ "datetime": result.get("datetime")
+ }
+
+ # Extract items
+ if "items" in result:
+ for item in result["items"]:
+ simplified_item = {
+ "type": item.get("type", "unknown"),
+ "title": item.get("title", ""),
+ "url": item.get("url", ""),
+ "description": item.get("description"),
+ "position": item.get("position"),
+ "rank_group": item.get("rank_group")
+ }
+
+ # Handle different item types
+ if item.get("type") == "organic":
+ simplified_item.update({
+ "domain": item.get("domain"),
+ "breadcrumb": item.get("breadcrumb"),
+ "website_name": item.get("website_name")
+ })
+ elif item.get("type") == "paid":
+ simplified_item.update({
+ "domain": item.get("domain"),
+ "price": item.get("price"),
+ "currency": item.get("currency")
+ })
+ elif item.get("type") == "featured_snippet":
+ simplified_item.update({
+ "domain": item.get("domain"),
+ "website_name": item.get("website_name")
+ })
+
+ simplified_items.append(simplified_item)
+
+ return {
+ "search_metadata": search_info,
+ "total_results": len(simplified_items),
+ "items": simplified_items,
+ "message": "تم جلب نتائج البحث بنجاح بفضل الله"
+ }
+
+ except Exception as e:
+ logger.error(f"Error simplifying response data: {str(e)}")
+ return {
+ "search_metadata": {},
+ "total_results": 0,
+ "items": [],
+ "message": "حدث خطأ في معالجة البيانات"
+ }
+
+# Existing Domain Keywords Endpoints
@app.post("/domain-keywords", response_model=DomainResponse)
async def get_domain_keywords(request: DomainRequest):
"""
@@ -69,7 +232,7 @@ async def get_domain_keywords(request: DomainRequest):
try:
client = get_dfs_client()
- # Get domain rankings using DataForSEO
+ # Get domain rankings using DataForSEO Domain Analytics API
rankings = await get_domain_rankings(
client,
request.domain,
@@ -89,44 +252,55 @@ async def get_domain_keywords(request: DomainRequest):
raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}")
async def get_domain_rankings(client, domain: str, country_code: str = "us",
- language_code: str = "en", limit: int = 1000) -> List[KeywordRanking]:
+ language_code: str = "en", limit: int = 100) -> List[KeywordRanking]:
"""
- Fetch domain ranking keywords from DataForSEO
+ Fetch domain ranking keywords from DataForSEO using Domain Analytics API
"""
try:
- # Using DataForSEO's domain analytics service
- domain_analytics = client.domain_analytics()
-
- # Get keywords for the domain
- response = domain_analytics.get_domain_metrics_by_categories(
- target=domain,
- country_code=country_code,
- language_code=language_code,
- limit=limit
- )
+ # Prepare the POST data for Domain Analytics API
+ post_data = [
+ {
+ "target": domain,
+ "limit": limit,
+ "country_code": country_code.upper(),
+ "language_code": language_code.lower(),
+ "filters": [
+ ["country_code", "=", country_code.upper()],
+ "and",
+ ["language_code", "=", language_code.lower()]
+ ]
+ }
+ ]
+
+ # Make API request to Domain Analytics endpoint
+ response = client.post("/v3/dataforseo_labs/domain_metrics_by_categories", post_data)
rankings = []
- if response and hasattr(response, 'tasks'):
- for task in response.tasks:
- if hasattr(task, 'result') and task.result:
- for item in task.result:
+ # Process the response
+ if response and 'tasks' in response:
+ for task in response['tasks']:
+ if 'result' in task and task['result']:
+ for item in task['result']:
+ # Extract keyword data from the response
+ keyword_data = item.get('metrics', {})
+
rankings.append(KeywordRanking(
- keyword=item.keyword if hasattr(item, 'keyword') else "N/A",
- position=float(item.pos) if hasattr(item, 'pos') else 0.0,
- url=item.url if hasattr(item, 'url') else "",
- search_volume=int(item.search_volume) if hasattr(item, 'search_volume') else 0,
- cpc=float(item.cpc) if hasattr(item, 'cpc') else 0.0,
- competition=float(item.competition) if hasattr(item, 'competition') else 0.0
+ keyword=item.get('key', 'N/A'),
+ position=float(keyword_data.get('pos', 0)),
+ url=item.get('domain', ''),
+ search_volume=keyword_data.get('search_volume', 0),
+ cpc=float(keyword_data.get('cpc', 0)),
+ competition=float(keyword_data.get('competition', 0)),
+ country_code=country_code
))
return rankings
except Exception as e:
- print(f"Error in get_domain_rankings: {str(e)}")
+ logger.error(f"Error in get_domain_rankings: {str(e)}")
return []
-# Export to CSV endpoint
@app.post("/export-keywords-csv")
async def export_keywords_csv(request: DomainRequest):
"""
@@ -143,28 +317,30 @@ async def export_keywords_csv(request: DomainRequest):
request.limit
)
+ if not rankings:
+ raise HTTPException(status_code=404, detail="No keywords found for this domain")
+
# Convert to DataFrame
df = pd.DataFrame([keyword.dict() for keyword in rankings])
# Save to CSV in exports directory
exports_dir = "/app/exports"
os.makedirs(exports_dir, exist_ok=True)
- filename = f"{request.domain}_keywords.csv"
+ filename = f"{request.domain}_{request.country_code}_keywords.csv"
filepath = os.path.join(exports_dir, filename)
df.to_csv(filepath, index=False, encoding='utf-8')
return {
"domain": request.domain,
"filename": filename,
- "filepath": filepath,
"total_keywords": len(rankings),
- "message": f"تم تصدير البيانات إلى {filename} بنجاح"
+ "download_url": f"/download-csv/{filename}",
+ "message": f"تم تصدير {len(rankings)} كلمة مفتاحية إلى {filename} بنجاح"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export error: {str(e)}")
-# Download CSV file endpoint
@app.get("/download-csv/{filename}")
async def download_csv(filename: str):
"""
@@ -175,10 +351,37 @@ async def download_csv(filename: str):
return FileResponse(
path=filepath,
filename=filename,
- media_type='text/csv'
+ media_type='text/csv',
+ headers={'Content-Disposition': f'attachment; filename={filename}'}
)
else:
- raise HTTPException(status_code=404, detail="File not found")
+ raise HTTPException(status_code=404, detail="File not found. Please generate the CSV first using /export-keywords-csv")
+
+@app.get("/keywords-by-country/{domain}")
+async def get_keywords_by_country(domain: str, country_code: str = "us", limit: int = 50):
+ """
+ Quick endpoint to get keywords for a domain by country
+ """
+ try:
+ client = get_dfs_client()
+
+ rankings = await get_domain_rankings(
+ client,
+ domain,
+ country_code,
+ "en", # Default language
+ limit
+ )
+
+ return {
+ "domain": domain,
+ "country_code": country_code,
+ "total_keywords": len(rankings),
+ "keywords": rankings[:10] if rankings else [] # Return first 10 for quick view
+ }
+
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
if __name__ == "__main__":
import uvicorn