الملفات
domain-api/main.py
2025-10-29 13:53:03 +00:00

326 أسطر
11 KiB
Python

# بسم الله الرحمن الرحيم
import os
import gzip
import json
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import List, Dict, Optional, Any
import pandas as pd
from dotenv import load_dotenv
from client import RestClient
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize FastAPI app
app = FastAPI(
title="DataForSEO API Service",
version="1.0.0",
description="Comprehensive DataForSEO API integration including Ranked Keywords and SERP Analysis - بسم الله الرحمن الرحيم"
)
# Cache for storing results (optional)
cached_results = {}
# Request models
class DomainRequest(BaseModel):
domain: str
country_code: Optional[str] = "US"
language: Optional[str] = "en"
limit: Optional[int] = 500
offset: Optional[int] = 0
min_search_volume: Optional[int] = 20
class SERPRequest(BaseModel):
keyword: str
location_code: int
language_code: Optional[str] = "en"
# Response models
class RankedKeyword(BaseModel):
keyword: str
position: float
search_volume: int
cpc: float
competition: float
url: str
country_code: str
language: str
class RankedKeywordsResponse(BaseModel):
domain: str
total_results: int
page: int
per_page: int
results: List[RankedKeyword]
message: str
class KeywordRanking(BaseModel):
keyword: str
position: float
url: str
search_volume: Optional[int] = 0
cpc: Optional[float] = 0.0
competition: Optional[float] = 0.0
country_code: Optional[str] = "us"
class DomainResponse(BaseModel):
domain: str
total_keywords: int
keywords: List[KeywordRanking]
message: str
class SERPItem(BaseModel):
type: str
title: str
url: str
description: Optional[str] = None
position: Optional[int] = None
rank_group: Optional[int] = None
class SERPResponse(BaseModel):
keyword: str
location_code: int
total_results: int
items: List[SERPItem]
search_metadata: Dict[str, Any]
# Initialize DataForSEO client
def get_dfs_client():
username = os.getenv('DATAFORSEO_API_LOGIN')
password = os.getenv('DATAFORSEO_API_PASSWORD')
if not username or not password:
raise ValueError("Please set DATAFORSEO_API_LOGIN and DATAFORSEO_API_PASSWORD environment variables")
return RestClient(username, password)
@app.get("/")
async def root():
return {
"message": "DataForSEO API Service - بسم الله الرحمن الرحيم",
"endpoints": {
"health": "/health",
"ranked_keywords": "/get_ranked_kw_for_domain",
"domain_keywords": "/domain-keywords",
"serp_search": "/api/search",
"export_csv": "/export-keywords-csv",
"download_csv": "/download-csv/{filename}"
}
}
@app.get("/health")
async def health_check():
try:
client = get_dfs_client()
# Test API connection with a simple request
test_response = client.get("/v3/applications/user")
return {
"status": "healthy",
"message": "API is running and DataForSEO connection is working",
"dataforseo_status": "connected"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"DataForSEO connection failed: {str(e)}")
# Ranked Keywords Endpoint - GET and POST methods
@app.api_route("/get_ranked_kw_for_domain", methods=["GET", "POST"])
async def get_ranked_keywords(request_data: Optional[DomainRequest] = None):
"""
Get ranked keywords for a domain with search volume >= 20
Returns first 100 results إن شاء الله
"""
try:
# Handle both GET and POST requests
if request_data:
# POST request with JSON body
domain = request_data.domain
country_code = request_data.country_code
language = request_data.language
limit = request_data.limit
min_search_volume = request_data.min_search_volume
else:
# GET request with query parameters
domain = request.query_params.get('domain')
country_code = request.query_params.get('country_code', 'US')
language = request.query_params.get('language', 'en')
limit = int(request.query_params.get('limit', 500))
min_search_volume = int(request.query_params.get('min_search_volume', 20))
if not domain:
raise HTTPException(status_code=400, detail="Domain parameter is required")
logger.info(f"Fetching ranked keywords for domain: {domain}, country: {country_code}")
# Initialize DataForSEO client
client = get_dfs_client()
# Prepare post data according to the API requirements
post_data = {
1112: {
"domain": domain,
"country_code": country_code,
"language": language,
"limit": limit,
"offset": 0,
"orderby": "position,asc",
"filters": [["search_volume", ">=", min_search_volume]]
}
}
# Make API request
response = client.post("/v2/kwrd_finder_ranked_keywords_get", {"data": post_data})
# Process and cache results
location_keywords = process_ranked_keywords(response)
cached_results[domain] = location_keywords
# Return first 100 results إن شاء الله
results_to_return = location_keywords[:100]
return RankedKeywordsResponse(
domain=domain,
total_results=len(location_keywords),
page=1,
per_page=100,
results=results_to_return,
message=f"تم جلب {len(results_to_return)} كلمة مفتاحية بنجاح بفضل الله"
)
except Exception as e:
logger.error(f"Error in get_ranked_keywords: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error fetching ranked keywords: {str(e)}")
def process_ranked_keywords(response: Dict) -> List[RankedKeyword]:
"""
Process the ranked keywords API response
"""
try:
location_keywords = []
if "results" in response:
for task_id, task_data in response["results"].items():
if "items" in task_data:
for item in task_data["items"]:
ranked_keyword = RankedKeyword(
keyword=item.get("keyword", ""),
position=float(item.get("position", 0)),
search_volume=int(item.get("search_volume", 0)),
cpc=float(item.get("cpc", 0)),
competition=float(item.get("competition", 0)),
url=item.get("url", ""),
country_code=item.get("country_code", "US"),
language=item.get("language", "en")
)
location_keywords.append(ranked_keyword)
# Sort by position (ascending)
location_keywords.sort(key=lambda x: x.position)
return location_keywords
except Exception as e:
logger.error(f"Error processing ranked keywords: {str(e)}")
return []
# Export ranked keywords to CSV
@app.post("/export-ranked-keywords-csv")
async def export_ranked_keywords_csv(request_data: DomainRequest):
"""
Export ranked keywords to CSV file
"""
try:
client = get_dfs_client()
# Prepare post data
post_data = {
1112: {
"domain": request_data.domain,
"country_code": request_data.country_code,
"language": request_data.language,
"limit": request_data.limit,
"offset": 0,
"orderby": "position,asc",
"filters": [["search_volume", ">=", request_data.min_search_volume]]
}
}
# Make API request
response = client.post("/v2/kwrd_finder_ranked_keywords_get", {"data": post_data})
# Process results
location_keywords = process_ranked_keywords(response)
if not location_keywords:
raise HTTPException(status_code=404, detail="No ranked keywords found for this domain")
# Convert to DataFrame
df = pd.DataFrame([keyword.dict() for keyword in location_keywords])
# Save to CSV in exports directory
exports_dir = "/app/exports"
os.makedirs(exports_dir, exist_ok=True)
filename = f"{request_data.domain}_ranked_keywords.csv"
filepath = os.path.join(exports_dir, filename)
df.to_csv(filepath, index=False, encoding='utf-8')
return {
"domain": request_data.domain,
"filename": filename,
"total_keywords": len(location_keywords),
"download_url": f"/download-csv/{filename}",
"message": f"تم تصدير {len(location_keywords)} كلمة مفتاحية إلى {filename} بنجاح"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export error: {str(e)}")
# Existing endpoints (keep your previous implementations)
@app.get("/api/search")
async def search_ai_mode(
keyword: str = Query(..., description="Search keyword"),
location_code: int = Query(..., description="Location code (integer)"),
language_code: str = Query("en", description="Language code")
):
"""
AI Mode SERP Search - Get comprehensive search results for a keyword
"""
# Your existing implementation here
pass
@app.post("/domain-keywords", response_model=DomainResponse)
async def get_domain_keywords(request: DomainRequest):
"""
Get all ranking keywords for a domain and their positions in Google search
"""
# Your existing implementation here
pass
@app.get("/download-csv/{filename}")
async def download_csv(filename: str):
"""
Download generated CSV file
"""
filepath = f"/app/exports/{filename}"
if os.path.exists(filepath):
return FileResponse(
path=filepath,
filename=filename,
media_type='text/csv',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
else:
raise HTTPException(status_code=404, detail="File not found")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)