الملفات
domain-api/main.py
2025-10-29 13:35:42 +00:00

388 أسطر
14 KiB
Python

import os
import gzip
import json
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import List, Dict, Optional, Any
import pandas as pd
from dotenv import load_dotenv
from client import RestClient
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize FastAPI app
app = FastAPI(
title="DataForSEO API Service",
version="1.0.0",
description="Comprehensive DataForSEO API integration including Domain Keywords and AI Mode SERP - بسم الله الرحمن الرحيم"
)
# Request models
class DomainRequest(BaseModel):
domain: str
country_code: Optional[str] = "us"
language_code: Optional[str] = "en"
limit: Optional[int] = 100
class SERPRequest(BaseModel):
keyword: str
location_code: int
language_code: Optional[str] = "en"
# Response models
class KeywordRanking(BaseModel):
keyword: str
position: float
url: str
search_volume: Optional[int] = 0
cpc: Optional[float] = 0.0
competition: Optional[float] = 0.0
country_code: Optional[str] = "us"
class DomainResponse(BaseModel):
domain: str
total_keywords: int
keywords: List[KeywordRanking]
message: str
class SERPItem(BaseModel):
type: str
title: str
url: str
description: Optional[str] = None
position: Optional[int] = None
rank_group: Optional[int] = None
class SERPResponse(BaseModel):
keyword: str
location_code: int
total_results: int
items: List[SERPItem]
search_metadata: Dict[str, Any]
# Initialize DataForSEO client
def get_dfs_client():
username = os.getenv('DATAFORSEO_API_LOGIN')
password = os.getenv('DATAFORSEO_API_PASSWORD')
if not username or not password:
raise ValueError("Please set DATAFORSEO_API_LOGIN and DATAFORSEO_API_PASSWORD environment variables")
return RestClient(username, password)
@app.get("/")
async def root():
return {
"message": "DataForSEO API Service - بسم الله الرحمن الرحيم",
"endpoints": {
"health": "/health",
"domain_keywords": "/domain-keywords",
"serp_search": "/api/search",
"export_csv": "/export-keywords-csv",
"download_csv": "/download-csv/{filename}"
}
}
@app.get("/health")
async def health_check():
try:
client = get_dfs_client()
# Test API connection with a simple request
test_response = client.get("/v3/applications/user")
return {
"status": "healthy",
"message": "API is running and DataForSEO connection is working",
"dataforseo_status": "connected"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"DataForSEO connection failed: {str(e)}")
# AI Mode SERP Search Endpoint
@app.get("/api/search")
async def search_ai_mode(
keyword: str = Query(..., description="Search keyword"),
location_code: int = Query(..., description="Location code (integer)"),
language_code: str = Query("en", description="Language code")
):
"""
AI Mode SERP Search - Get comprehensive search results for a keyword
"""
# Validate required parameters
if not keyword:
raise HTTPException(status_code=400, detail="keyword parameter is required")
if not location_code:
raise HTTPException(status_code=400, detail="location_code parameter is required")
try:
client = get_dfs_client()
# Prepare request data
post_data = dict()
post_data[len(post_data)] = dict(
language_code=language_code,
location_code=location_code,
keyword=keyword
)
# Make API call
logger.info(f"Making AI Mode API call for keyword: {keyword}, location_code: {location_code}")
response = client.post("/v3/serp/google/ai_mode/live/advanced", post_data)
logger.info(f"Raw API response status: {response.get('status_code')}")
# Process response
if response.get("status_code") == 20000:
# Extract and simplify the response
simplified_response = extract_simplified_data(response)
return simplified_response
else:
logger.error(f"API error: {response.get('status_code')} - {response.get('status_message')}")
raise HTTPException(
status_code=500,
detail=f"API error. Code: {response.get('status_code')} Message: {response.get('status_message')}"
)
except Exception as e:
logger.error(f"Error in AI Mode search: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
def extract_simplified_data(response: Dict) -> Dict:
"""
Extract and simplify the AI Mode SERP response data
"""
try:
simplified_items = []
if "tasks" in response and response["tasks"]:
for task in response["tasks"]:
if "result" in task and task["result"]:
for result in task["result"]:
# Extract basic search info
search_info = {
"keyword": result.get("keyword", ""),
"location_code": result.get("location_code"),
"language_code": result.get("language_code"),
"check_url": result.get("check_url"),
"datetime": result.get("datetime")
}
# Extract items
if "items" in result:
for item in result["items"]:
simplified_item = {
"type": item.get("type", "unknown"),
"title": item.get("title", ""),
"url": item.get("url", ""),
"description": item.get("description"),
"position": item.get("position"),
"rank_group": item.get("rank_group")
}
# Handle different item types
if item.get("type") == "organic":
simplified_item.update({
"domain": item.get("domain"),
"breadcrumb": item.get("breadcrumb"),
"website_name": item.get("website_name")
})
elif item.get("type") == "paid":
simplified_item.update({
"domain": item.get("domain"),
"price": item.get("price"),
"currency": item.get("currency")
})
elif item.get("type") == "featured_snippet":
simplified_item.update({
"domain": item.get("domain"),
"website_name": item.get("website_name")
})
simplified_items.append(simplified_item)
return {
"search_metadata": search_info,
"total_results": len(simplified_items),
"items": simplified_items,
"message": "تم جلب نتائج البحث بنجاح بفضل الله"
}
except Exception as e:
logger.error(f"Error simplifying response data: {str(e)}")
return {
"search_metadata": {},
"total_results": 0,
"items": [],
"message": "حدث خطأ في معالجة البيانات"
}
# Existing Domain Keywords Endpoints
@app.post("/domain-keywords", response_model=DomainResponse)
async def get_domain_keywords(request: DomainRequest):
"""
Get all ranking keywords for a domain and their positions in Google search
"""
try:
client = get_dfs_client()
# Get domain rankings using DataForSEO Domain Analytics API
rankings = await get_domain_rankings(
client,
request.domain,
request.country_code,
request.language_code,
request.limit
)
return DomainResponse(
domain=request.domain,
total_keywords=len(rankings),
keywords=rankings,
message="تم جلب البيانات بنجاح بفضل الله"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}")
async def get_domain_rankings(client, domain: str, country_code: str = "us",
language_code: str = "en", limit: int = 100) -> List[KeywordRanking]:
"""
Fetch domain ranking keywords from DataForSEO using Domain Analytics API
"""
try:
# Prepare the POST data for Domain Analytics API
post_data = [
{
"target": domain,
"limit": limit,
"country_code": country_code.upper(),
"language_code": language_code.lower(),
"filters": [
["country_code", "=", country_code.upper()],
"and",
["language_code", "=", language_code.lower()]
]
}
]
# Make API request to Domain Analytics endpoint
response = client.post("/v3/dataforseo_labs/domain_metrics_by_categories", post_data)
rankings = []
# Process the response
if response and 'tasks' in response:
for task in response['tasks']:
if 'result' in task and task['result']:
for item in task['result']:
# Extract keyword data from the response
keyword_data = item.get('metrics', {})
rankings.append(KeywordRanking(
keyword=item.get('key', 'N/A'),
position=float(keyword_data.get('pos', 0)),
url=item.get('domain', ''),
search_volume=keyword_data.get('search_volume', 0),
cpc=float(keyword_data.get('cpc', 0)),
competition=float(keyword_data.get('competition', 0)),
country_code=country_code
))
return rankings
except Exception as e:
logger.error(f"Error in get_domain_rankings: {str(e)}")
return []
@app.post("/export-keywords-csv")
async def export_keywords_csv(request: DomainRequest):
"""
Export domain keywords to CSV
"""
try:
client = get_dfs_client()
rankings = await get_domain_rankings(
client,
request.domain,
request.country_code,
request.language_code,
request.limit
)
if not rankings:
raise HTTPException(status_code=404, detail="No keywords found for this domain")
# Convert to DataFrame
df = pd.DataFrame([keyword.dict() for keyword in rankings])
# Save to CSV in exports directory
exports_dir = "/app/exports"
os.makedirs(exports_dir, exist_ok=True)
filename = f"{request.domain}_{request.country_code}_keywords.csv"
filepath = os.path.join(exports_dir, filename)
df.to_csv(filepath, index=False, encoding='utf-8')
return {
"domain": request.domain,
"filename": filename,
"total_keywords": len(rankings),
"download_url": f"/download-csv/{filename}",
"message": f"تم تصدير {len(rankings)} كلمة مفتاحية إلى {filename} بنجاح"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export error: {str(e)}")
@app.get("/download-csv/{filename}")
async def download_csv(filename: str):
"""
Download generated CSV file
"""
filepath = f"/app/exports/{filename}"
if os.path.exists(filepath):
return FileResponse(
path=filepath,
filename=filename,
media_type='text/csv',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
else:
raise HTTPException(status_code=404, detail="File not found. Please generate the CSV first using /export-keywords-csv")
@app.get("/keywords-by-country/{domain}")
async def get_keywords_by_country(domain: str, country_code: str = "us", limit: int = 50):
"""
Quick endpoint to get keywords for a domain by country
"""
try:
client = get_dfs_client()
rankings = await get_domain_rankings(
client,
domain,
country_code,
"en", # Default language
limit
)
return {
"domain": domain,
"country_code": country_code,
"total_keywords": len(rankings),
"keywords": rankings[:10] if rankings else [] # Return first 10 for quick view
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)