Add main.py
هذا الالتزام موجود في:
185
main.py
Normal file
185
main.py
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
import os
|
||||||
|
from fastapi import FastAPI, HTTPException
|
||||||
|
from fastapi.responses import FileResponse
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from dataforseo_sdk import DataForSeoClient
|
||||||
|
from dataforseo_sdk.config import DFSConfig
|
||||||
|
import asyncio
|
||||||
|
from typing import List, Dict, Optional
|
||||||
|
import pandas as pd
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load environment variables
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# Initialize FastAPI app
|
||||||
|
app = FastAPI(title="Domain Ranking Keywords API", version="1.0.0")
|
||||||
|
|
||||||
|
# DataForSEO configuration
|
||||||
|
class DataForSEOConfig:
|
||||||
|
def __init__(self):
|
||||||
|
self.login = os.getenv('DATAFORSEO_API_LOGIN')
|
||||||
|
self.password = os.getenv('DATAFORSEO_API_PASSWORD')
|
||||||
|
|
||||||
|
if not self.login or not self.password:
|
||||||
|
raise ValueError("Please set DATAFORSEO_API_LOGIN and DATAFORSEO_API_PASSWORD environment variables")
|
||||||
|
|
||||||
|
self.config = DFSConfig(login=self.login, password=self.password)
|
||||||
|
|
||||||
|
# Request model
|
||||||
|
class DomainRequest(BaseModel):
|
||||||
|
domain: str
|
||||||
|
country_code: Optional[str] = "us"
|
||||||
|
language_code: Optional[str] = "en"
|
||||||
|
limit: Optional[int] = 1000
|
||||||
|
|
||||||
|
# Response model
|
||||||
|
class KeywordRanking(BaseModel):
|
||||||
|
keyword: str
|
||||||
|
position: float
|
||||||
|
url: str
|
||||||
|
search_volume: Optional[int]
|
||||||
|
cpc: Optional[float]
|
||||||
|
competition: Optional[float]
|
||||||
|
|
||||||
|
class DomainResponse(BaseModel):
|
||||||
|
domain: str
|
||||||
|
total_keywords: int
|
||||||
|
keywords: List[KeywordRanking]
|
||||||
|
message: str
|
||||||
|
|
||||||
|
# Initialize DataForSEO client
|
||||||
|
def get_dfs_client():
|
||||||
|
config = DataForSEOConfig()
|
||||||
|
return DataForSeoClient(config.config)
|
||||||
|
|
||||||
|
@app.get("/")
|
||||||
|
async def root():
|
||||||
|
return {"message": "Domain Ranking Keywords API - بسم الله الرحمن الرحيم"}
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
async def health_check():
|
||||||
|
return {"status": "healthy", "message": "API is running"}
|
||||||
|
|
||||||
|
@app.post("/domain-keywords", response_model=DomainResponse)
|
||||||
|
async def get_domain_keywords(request: DomainRequest):
|
||||||
|
"""
|
||||||
|
Get all ranking keywords for a domain and their positions in Google search
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client = get_dfs_client()
|
||||||
|
|
||||||
|
# Get domain rankings using DataForSEO
|
||||||
|
rankings = await get_domain_rankings(
|
||||||
|
client,
|
||||||
|
request.domain,
|
||||||
|
request.country_code,
|
||||||
|
request.language_code,
|
||||||
|
request.limit
|
||||||
|
)
|
||||||
|
|
||||||
|
return DomainResponse(
|
||||||
|
domain=request.domain,
|
||||||
|
total_keywords=len(rankings),
|
||||||
|
keywords=rankings,
|
||||||
|
message="تم جلب البيانات بنجاح بفضل الله"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}")
|
||||||
|
|
||||||
|
async def get_domain_rankings(client, domain: str, country_code: str = "us",
|
||||||
|
language_code: str = "en", limit: int = 1000) -> List[KeywordRanking]:
|
||||||
|
"""
|
||||||
|
Fetch domain ranking keywords from DataForSEO
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Using DataForSEO's domain analytics service
|
||||||
|
domain_analytics = client.domain_analytics()
|
||||||
|
|
||||||
|
# Get keywords for the domain
|
||||||
|
response = domain_analytics.get_domain_metrics_by_categories(
|
||||||
|
target=domain,
|
||||||
|
country_code=country_code,
|
||||||
|
language_code=language_code,
|
||||||
|
limit=limit
|
||||||
|
)
|
||||||
|
|
||||||
|
rankings = []
|
||||||
|
|
||||||
|
if response and hasattr(response, 'tasks'):
|
||||||
|
for task in response.tasks:
|
||||||
|
if hasattr(task, 'result') and task.result:
|
||||||
|
for item in task.result:
|
||||||
|
rankings.append(KeywordRanking(
|
||||||
|
keyword=item.keyword if hasattr(item, 'keyword') else "N/A",
|
||||||
|
position=float(item.pos) if hasattr(item, 'pos') else 0.0,
|
||||||
|
url=item.url if hasattr(item, 'url') else "",
|
||||||
|
search_volume=int(item.search_volume) if hasattr(item, 'search_volume') else 0,
|
||||||
|
cpc=float(item.cpc) if hasattr(item, 'cpc') else 0.0,
|
||||||
|
competition=float(item.competition) if hasattr(item, 'competition') else 0.0
|
||||||
|
))
|
||||||
|
|
||||||
|
return rankings
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error in get_domain_rankings: {str(e)}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Export to CSV endpoint
|
||||||
|
@app.post("/export-keywords-csv")
|
||||||
|
async def export_keywords_csv(request: DomainRequest):
|
||||||
|
"""
|
||||||
|
Export domain keywords to CSV
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client = get_dfs_client()
|
||||||
|
|
||||||
|
rankings = await get_domain_rankings(
|
||||||
|
client,
|
||||||
|
request.domain,
|
||||||
|
request.country_code,
|
||||||
|
request.language_code,
|
||||||
|
request.limit
|
||||||
|
)
|
||||||
|
|
||||||
|
# Convert to DataFrame
|
||||||
|
df = pd.DataFrame([keyword.dict() for keyword in rankings])
|
||||||
|
|
||||||
|
# Save to CSV in exports directory
|
||||||
|
exports_dir = "/app/exports"
|
||||||
|
os.makedirs(exports_dir, exist_ok=True)
|
||||||
|
filename = f"{request.domain}_keywords.csv"
|
||||||
|
filepath = os.path.join(exports_dir, filename)
|
||||||
|
df.to_csv(filepath, index=False, encoding='utf-8')
|
||||||
|
|
||||||
|
return {
|
||||||
|
"domain": request.domain,
|
||||||
|
"filename": filename,
|
||||||
|
"filepath": filepath,
|
||||||
|
"total_keywords": len(rankings),
|
||||||
|
"message": f"تم تصدير البيانات إلى {filename} بنجاح"
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(status_code=500, detail=f"Export error: {str(e)}")
|
||||||
|
|
||||||
|
# Download CSV file endpoint
|
||||||
|
@app.get("/download-csv/{filename}")
|
||||||
|
async def download_csv(filename: str):
|
||||||
|
"""
|
||||||
|
Download generated CSV file
|
||||||
|
"""
|
||||||
|
filepath = f"/app/exports/{filename}"
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
return FileResponse(
|
||||||
|
path=filepath,
|
||||||
|
filename=filename,
|
||||||
|
media_type='text/csv'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise HTTPException(status_code=404, detail="File not found")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import uvicorn
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
المرجع في مشكلة جديدة
حظر مستخدم