From fa8dd2d075c54064d64faba90736b84309daa2cc Mon Sep 17 00:00:00 2001
From: ghaymah_dev
Date: Wed, 29 Oct 2025 13:53:03 +0000
Subject: [PATCH] Update main.py
---
main.py | 444 ++++++++++++++++++++++++--------------------------------
1 file changed, 191 insertions(+), 253 deletions(-)
diff --git a/main.py b/main.py
index 10bb72a..783bc19 100644
--- a/main.py
+++ b/main.py
@@ -1,3 +1,5 @@
+# بسم الله الرحمن الرحيم
+
import os
import gzip
import json
@@ -21,15 +23,20 @@ load_dotenv()
app = FastAPI(
title="DataForSEO API Service",
version="1.0.0",
- description="Comprehensive DataForSEO API integration including Domain Keywords and AI Mode SERP - بسم الله الرحمن الرحيم"
+ description="Comprehensive DataForSEO API integration including Ranked Keywords and SERP Analysis - بسم الله الرحمن الرحيم"
)
+# Cache for storing results (optional)
+cached_results = {}
+
# Request models
class DomainRequest(BaseModel):
domain: str
- country_code: Optional[str] = "us"
- language_code: Optional[str] = "en"
- limit: Optional[int] = 100
+ country_code: Optional[str] = "US"
+ language: Optional[str] = "en"
+ limit: Optional[int] = 500
+ offset: Optional[int] = 0
+ min_search_volume: Optional[int] = 20
class SERPRequest(BaseModel):
keyword: str
@@ -37,6 +44,24 @@ class SERPRequest(BaseModel):
language_code: Optional[str] = "en"
# Response models
+class RankedKeyword(BaseModel):
+ keyword: str
+ position: float
+ search_volume: int
+ cpc: float
+ competition: float
+ url: str
+ country_code: str
+ language: str
+
+class RankedKeywordsResponse(BaseModel):
+ domain: str
+ total_results: int
+ page: int
+ per_page: int
+ results: List[RankedKeyword]
+ message: str
+
class KeywordRanking(BaseModel):
keyword: str
position: float
@@ -83,6 +108,7 @@ async def root():
"message": "DataForSEO API Service - بسم الله الرحمن الرحيم",
"endpoints": {
"health": "/health",
+ "ranked_keywords": "/get_ranked_kw_for_domain",
"domain_keywords": "/domain-keywords",
"serp_search": "/api/search",
"export_csv": "/export-keywords-csv",
@@ -104,7 +130,159 @@ async def health_check():
except Exception as e:
raise HTTPException(status_code=500, detail=f"DataForSEO connection failed: {str(e)}")
-# AI Mode SERP Search Endpoint
+# Ranked Keywords Endpoint - GET and POST methods
+@app.api_route("/get_ranked_kw_for_domain", methods=["GET", "POST"])
+async def get_ranked_keywords(request_data: Optional[DomainRequest] = None):
+ """
+ Get ranked keywords for a domain with search volume >= 20
+ Returns first 100 results إن شاء الله
+ """
+ try:
+ # Handle both GET and POST requests
+ if request_data:
+ # POST request with JSON body
+ domain = request_data.domain
+ country_code = request_data.country_code
+ language = request_data.language
+ limit = request_data.limit
+ min_search_volume = request_data.min_search_volume
+ else:
+ # GET request with query parameters
+ domain = request.query_params.get('domain')
+ country_code = request.query_params.get('country_code', 'US')
+ language = request.query_params.get('language', 'en')
+ limit = int(request.query_params.get('limit', 500))
+ min_search_volume = int(request.query_params.get('min_search_volume', 20))
+
+ if not domain:
+ raise HTTPException(status_code=400, detail="Domain parameter is required")
+
+ logger.info(f"Fetching ranked keywords for domain: {domain}, country: {country_code}")
+
+ # Initialize DataForSEO client
+ client = get_dfs_client()
+
+ # Prepare post data according to the API requirements
+ post_data = {
+ 1112: {
+ "domain": domain,
+ "country_code": country_code,
+ "language": language,
+ "limit": limit,
+ "offset": 0,
+ "orderby": "position,asc",
+ "filters": [["search_volume", ">=", min_search_volume]]
+ }
+ }
+
+ # Make API request
+ response = client.post("/v2/kwrd_finder_ranked_keywords_get", {"data": post_data})
+
+ # Process and cache results
+ location_keywords = process_ranked_keywords(response)
+ cached_results[domain] = location_keywords
+
+ # Return first 100 results إن شاء الله
+ results_to_return = location_keywords[:100]
+
+ return RankedKeywordsResponse(
+ domain=domain,
+ total_results=len(location_keywords),
+ page=1,
+ per_page=100,
+ results=results_to_return,
+ message=f"تم جلب {len(results_to_return)} كلمة مفتاحية بنجاح بفضل الله"
+ )
+
+ except Exception as e:
+ logger.error(f"Error in get_ranked_keywords: {str(e)}")
+ raise HTTPException(status_code=500, detail=f"Error fetching ranked keywords: {str(e)}")
+
+def process_ranked_keywords(response: Dict) -> List[RankedKeyword]:
+ """
+ Process the ranked keywords API response
+ """
+ try:
+ location_keywords = []
+
+ if "results" in response:
+ for task_id, task_data in response["results"].items():
+ if "items" in task_data:
+ for item in task_data["items"]:
+ ranked_keyword = RankedKeyword(
+ keyword=item.get("keyword", ""),
+ position=float(item.get("position", 0)),
+ search_volume=int(item.get("search_volume", 0)),
+ cpc=float(item.get("cpc", 0)),
+ competition=float(item.get("competition", 0)),
+ url=item.get("url", ""),
+ country_code=item.get("country_code", "US"),
+ language=item.get("language", "en")
+ )
+ location_keywords.append(ranked_keyword)
+
+ # Sort by position (ascending)
+ location_keywords.sort(key=lambda x: x.position)
+
+ return location_keywords
+
+ except Exception as e:
+ logger.error(f"Error processing ranked keywords: {str(e)}")
+ return []
+
+# Export ranked keywords to CSV
+@app.post("/export-ranked-keywords-csv")
+async def export_ranked_keywords_csv(request_data: DomainRequest):
+ """
+ Export ranked keywords to CSV file
+ """
+ try:
+ client = get_dfs_client()
+
+ # Prepare post data
+ post_data = {
+ 1112: {
+ "domain": request_data.domain,
+ "country_code": request_data.country_code,
+ "language": request_data.language,
+ "limit": request_data.limit,
+ "offset": 0,
+ "orderby": "position,asc",
+ "filters": [["search_volume", ">=", request_data.min_search_volume]]
+ }
+ }
+
+ # Make API request
+ response = client.post("/v2/kwrd_finder_ranked_keywords_get", {"data": post_data})
+
+ # Process results
+ location_keywords = process_ranked_keywords(response)
+
+ if not location_keywords:
+ raise HTTPException(status_code=404, detail="No ranked keywords found for this domain")
+
+ # Convert to DataFrame
+ df = pd.DataFrame([keyword.dict() for keyword in location_keywords])
+
+ # Save to CSV in exports directory
+ exports_dir = "/app/exports"
+ os.makedirs(exports_dir, exist_ok=True)
+ filename = f"{request_data.domain}_ranked_keywords.csv"
+ filepath = os.path.join(exports_dir, filename)
+ df.to_csv(filepath, index=False, encoding='utf-8')
+
+ return {
+ "domain": request_data.domain,
+ "filename": filename,
+ "total_keywords": len(location_keywords),
+ "download_url": f"/download-csv/{filename}",
+ "message": f"تم تصدير {len(location_keywords)} كلمة مفتاحية إلى {filename} بنجاح"
+ }
+
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Export error: {str(e)}")
+
+# Existing endpoints (keep your previous implementations)
@app.get("/api/search")
async def search_ai_mode(
keyword: str = Query(..., description="Search keyword"),
@@ -114,232 +292,16 @@ async def search_ai_mode(
"""
AI Mode SERP Search - Get comprehensive search results for a keyword
"""
- # Validate required parameters
- if not keyword:
- raise HTTPException(status_code=400, detail="keyword parameter is required")
-
- if not location_code:
- raise HTTPException(status_code=400, detail="location_code parameter is required")
-
- try:
- client = get_dfs_client()
+ # Your existing implementation here
+ pass
- # Prepare request data
- post_data = dict()
- post_data[len(post_data)] = dict(
- language_code=language_code,
- location_code=location_code,
- keyword=keyword
- )
-
- # Make API call
- logger.info(f"Making AI Mode API call for keyword: {keyword}, location_code: {location_code}")
- response = client.post("/v3/serp/google/ai_mode/live/advanced", post_data)
-
- logger.info(f"Raw API response status: {response.get('status_code')}")
-
- # Process response
- if response.get("status_code") == 20000:
- # Extract and simplify the response
- simplified_response = extract_simplified_data(response)
- return simplified_response
- else:
- logger.error(f"API error: {response.get('status_code')} - {response.get('status_message')}")
- raise HTTPException(
- status_code=500,
- detail=f"API error. Code: {response.get('status_code')} Message: {response.get('status_message')}"
- )
-
- except Exception as e:
- logger.error(f"Error in AI Mode search: {str(e)}")
- raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
-
-def extract_simplified_data(response: Dict) -> Dict:
- """
- Extract and simplify the AI Mode SERP response data
- """
- try:
- simplified_items = []
-
- if "tasks" in response and response["tasks"]:
- for task in response["tasks"]:
- if "result" in task and task["result"]:
- for result in task["result"]:
- # Extract basic search info
- search_info = {
- "keyword": result.get("keyword", ""),
- "location_code": result.get("location_code"),
- "language_code": result.get("language_code"),
- "check_url": result.get("check_url"),
- "datetime": result.get("datetime")
- }
-
- # Extract items
- if "items" in result:
- for item in result["items"]:
- simplified_item = {
- "type": item.get("type", "unknown"),
- "title": item.get("title", ""),
- "url": item.get("url", ""),
- "description": item.get("description"),
- "position": item.get("position"),
- "rank_group": item.get("rank_group")
- }
-
- # Handle different item types
- if item.get("type") == "organic":
- simplified_item.update({
- "domain": item.get("domain"),
- "breadcrumb": item.get("breadcrumb"),
- "website_name": item.get("website_name")
- })
- elif item.get("type") == "paid":
- simplified_item.update({
- "domain": item.get("domain"),
- "price": item.get("price"),
- "currency": item.get("currency")
- })
- elif item.get("type") == "featured_snippet":
- simplified_item.update({
- "domain": item.get("domain"),
- "website_name": item.get("website_name")
- })
-
- simplified_items.append(simplified_item)
-
- return {
- "search_metadata": search_info,
- "total_results": len(simplified_items),
- "items": simplified_items,
- "message": "تم جلب نتائج البحث بنجاح بفضل الله"
- }
-
- except Exception as e:
- logger.error(f"Error simplifying response data: {str(e)}")
- return {
- "search_metadata": {},
- "total_results": 0,
- "items": [],
- "message": "حدث خطأ في معالجة البيانات"
- }
-
-# Existing Domain Keywords Endpoints
@app.post("/domain-keywords", response_model=DomainResponse)
async def get_domain_keywords(request: DomainRequest):
"""
Get all ranking keywords for a domain and their positions in Google search
"""
- try:
- client = get_dfs_client()
-
- # Get domain rankings using DataForSEO Domain Analytics API
- rankings = await get_domain_rankings(
- client,
- request.domain,
- request.country_code,
- request.language_code,
- request.limit
- )
-
- return DomainResponse(
- domain=request.domain,
- total_keywords=len(rankings),
- keywords=rankings,
- message="تم جلب البيانات بنجاح بفضل الله"
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}")
-
-async def get_domain_rankings(client, domain: str, country_code: str = "us",
- language_code: str = "en", limit: int = 100) -> List[KeywordRanking]:
- """
- Fetch domain ranking keywords from DataForSEO using Domain Analytics API
- """
- try:
- # Prepare the POST data for Domain Analytics API
- post_data = [
- {
- "target": domain,
- "limit": limit,
- "country_code": country_code.upper(),
- "language_code": language_code.lower(),
- "filters": [
- ["country_code", "=", country_code.upper()],
- "and",
- ["language_code", "=", language_code.lower()]
- ]
- }
- ]
-
- # Make API request to Domain Analytics endpoint
- response = client.post("/v3/dataforseo_labs/domain_metrics_by_categories", post_data)
-
- rankings = []
-
- # Process the response
- if response and 'tasks' in response:
- for task in response['tasks']:
- if 'result' in task and task['result']:
- for item in task['result']:
- # Extract keyword data from the response
- keyword_data = item.get('metrics', {})
-
- rankings.append(KeywordRanking(
- keyword=item.get('key', 'N/A'),
- position=float(keyword_data.get('pos', 0)),
- url=item.get('domain', ''),
- search_volume=keyword_data.get('search_volume', 0),
- cpc=float(keyword_data.get('cpc', 0)),
- competition=float(keyword_data.get('competition', 0)),
- country_code=country_code
- ))
-
- return rankings
-
- except Exception as e:
- logger.error(f"Error in get_domain_rankings: {str(e)}")
- return []
-
-@app.post("/export-keywords-csv")
-async def export_keywords_csv(request: DomainRequest):
- """
- Export domain keywords to CSV
- """
- try:
- client = get_dfs_client()
-
- rankings = await get_domain_rankings(
- client,
- request.domain,
- request.country_code,
- request.language_code,
- request.limit
- )
-
- if not rankings:
- raise HTTPException(status_code=404, detail="No keywords found for this domain")
-
- # Convert to DataFrame
- df = pd.DataFrame([keyword.dict() for keyword in rankings])
-
- # Save to CSV in exports directory
- exports_dir = "/app/exports"
- os.makedirs(exports_dir, exist_ok=True)
- filename = f"{request.domain}_{request.country_code}_keywords.csv"
- filepath = os.path.join(exports_dir, filename)
- df.to_csv(filepath, index=False, encoding='utf-8')
-
- return {
- "domain": request.domain,
- "filename": filename,
- "total_keywords": len(rankings),
- "download_url": f"/download-csv/{filename}",
- "message": f"تم تصدير {len(rankings)} كلمة مفتاحية إلى {filename} بنجاح"
- }
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Export error: {str(e)}")
+ # Your existing implementation here
+ pass
@app.get("/download-csv/{filename}")
async def download_csv(filename: str):
@@ -355,34 +317,10 @@ async def download_csv(filename: str):
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
else:
- raise HTTPException(status_code=404, detail="File not found. Please generate the CSV first using /export-keywords-csv")
-
-@app.get("/keywords-by-country/{domain}")
-async def get_keywords_by_country(domain: str, country_code: str = "us", limit: int = 50):
- """
- Quick endpoint to get keywords for a domain by country
- """
- try:
- client = get_dfs_client()
-
- rankings = await get_domain_rankings(
- client,
- domain,
- country_code,
- "en", # Default language
- limit
- )
-
- return {
- "domain": domain,
- "country_code": country_code,
- "total_keywords": len(rankings),
- "keywords": rankings[:10] if rankings else [] # Return first 10 for quick view
- }
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
+ raise HTTPException(status_code=404, detail="File not found")
if __name__ == "__main__":
import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
\ No newline at end of file
+ uvicorn.run(app, host="0.0.0.0", port=8000)
+
+
\ No newline at end of file