هذا الالتزام موجود في:
2025-10-29 14:01:53 +00:00
الأصل fa8dd2d075
التزام 7aa481f88c

375
main.py
عرض الملف

@@ -1,326 +1,121 @@
# بسم الله الرحمن الرحيم # بسم الله الرحمن الرحيم
import os # بسم الله الرحمن الرحيم
import gzip
# بسم الله الرحمن الرحيم
from flask import Flask, request, jsonify
from flask_cors import CORS
import spacy
import re
from collections import defaultdict
import json import json
from fastapi import FastAPI, HTTPException, Query import os
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import List, Dict, Optional, Any
import pandas as pd
from dotenv import load_dotenv
from client import RestClient from client import RestClient
import logging
# Configure logging app = Flask(__name__)
logging.basicConfig(level=logging.INFO) CORS(app)
logger = logging.getLogger(__name__)
# Load environment variables # Global variable to store processed results
load_dotenv()
# Initialize FastAPI app
app = FastAPI(
title="DataForSEO API Service",
version="1.0.0",
description="Comprehensive DataForSEO API integration including Ranked Keywords and SERP Analysis - بسم الله الرحمن الرحيم"
)
# Cache for storing results (optional)
cached_results = {} cached_results = {}
# Request models
class DomainRequest(BaseModel):
domain: str
country_code: Optional[str] = "US"
language: Optional[str] = "en"
limit: Optional[int] = 500
offset: Optional[int] = 0
min_search_volume: Optional[int] = 20
class SERPRequest(BaseModel): def process_ranked_keywords(response_data):
keyword: str
location_code: int
language_code: Optional[str] = "en"
# Response models
class RankedKeyword(BaseModel):
keyword: str
position: float
search_volume: int
cpc: float
competition: float
url: str
country_code: str
language: str
class RankedKeywordsResponse(BaseModel):
domain: str
total_results: int
page: int
per_page: int
results: List[RankedKeyword]
message: str
class KeywordRanking(BaseModel):
keyword: str
position: float
url: str
search_volume: Optional[int] = 0
cpc: Optional[float] = 0.0
competition: Optional[float] = 0.0
country_code: Optional[str] = "us"
class DomainResponse(BaseModel):
domain: str
total_keywords: int
keywords: List[KeywordRanking]
message: str
class SERPItem(BaseModel):
type: str
title: str
url: str
description: Optional[str] = None
position: Optional[int] = None
rank_group: Optional[int] = None
class SERPResponse(BaseModel):
keyword: str
location_code: int
total_results: int
items: List[SERPItem]
search_metadata: Dict[str, Any]
# Initialize DataForSEO client
def get_dfs_client():
username = os.getenv('DATAFORSEO_API_LOGIN')
password = os.getenv('DATAFORSEO_API_PASSWORD')
if not username or not password:
raise ValueError("Please set DATAFORSEO_API_LOGIN and DATAFORSEO_API_PASSWORD environment variables")
return RestClient(username, password)
@app.get("/")
async def root():
return {
"message": "DataForSEO API Service - بسم الله الرحمن الرحيم",
"endpoints": {
"health": "/health",
"ranked_keywords": "/get_ranked_kw_for_domain",
"domain_keywords": "/domain-keywords",
"serp_search": "/api/search",
"export_csv": "/export-keywords-csv",
"download_csv": "/download-csv/{filename}"
}
}
@app.get("/health")
async def health_check():
try:
client = get_dfs_client()
# Test API connection with a simple request
test_response = client.get("/v3/applications/user")
return {
"status": "healthy",
"message": "API is running and DataForSEO connection is working",
"dataforseo_status": "connected"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"DataForSEO connection failed: {str(e)}")
# Ranked Keywords Endpoint - GET and POST methods
@app.api_route("/get_ranked_kw_for_domain", methods=["GET", "POST"])
async def get_ranked_keywords(request_data: Optional[DomainRequest] = None):
""" """
Get ranked keywords for a domain with search volume >= 20 Process the API response, filter for location keywords using spaCy, and sort by volume
Returns first 100 results إن شاء الله
""" """
try: all_keywords = []
# Handle both GET and POST requests
if request_data:
# POST request with JSON body
domain = request_data.domain
country_code = request_data.country_code
language = request_data.language
limit = request_data.limit
min_search_volume = request_data.min_search_volume
else:
# GET request with query parameters
domain = request.query_params.get('domain')
country_code = request.query_params.get('country_code', 'US')
language = request.query_params.get('language', 'en')
limit = int(request.query_params.get('limit', 500))
min_search_volume = int(request.query_params.get('min_search_volume', 20))
if not domain: for post_id, post_data in response_data.items():
raise HTTPException(status_code=400, detail="Domain parameter is required") if "ranked" in post_data:
for keyword_data in post_data["ranked"]:
all_keywords.append(keyword_data)
logger.info(f"Fetching ranked keywords for domain: {domain}, country: {country_code}") # Filter and sort keywords using spaCy location detection only
location_keywords, non_location_keywords = filter_and_sort_keywords_by_location(all_keywords)
# Initialize DataForSEO client # Save results to file
client = get_dfs_client() with open('results.json', 'w') as f:
json.dump({
# Prepare post data according to the API requirements 'location_keywords': location_keywords,
post_data = { 'non_location_keywords': non_location_keywords
1112: { }, f, indent=2)
"domain": domain,
"country_code": country_code,
"language": language,
"limit": limit,
"offset": 0,
"orderby": "position,asc",
"filters": [["search_volume", ">=", min_search_volume]]
}
}
# Make API request
response = client.post("/v2/kwrd_finder_ranked_keywords_get", {"data": post_data})
# Process and cache results
location_keywords = process_ranked_keywords(response)
cached_results[domain] = location_keywords
# Return first 100 results إن شاء الله
results_to_return = location_keywords[:100]
return RankedKeywordsResponse(
domain=domain,
total_results=len(location_keywords),
page=1,
per_page=100,
results=results_to_return,
message=f"تم جلب {len(results_to_return)} كلمة مفتاحية بنجاح بفضل الله"
)
except Exception as e:
logger.error(f"Error in get_ranked_keywords: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error fetching ranked keywords: {str(e)}")
def process_ranked_keywords(response: Dict) -> List[RankedKeyword]:
"""
Process the ranked keywords API response
"""
try:
location_keywords = []
if "results" in response:
for task_id, task_data in response["results"].items():
if "items" in task_data:
for item in task_data["items"]:
ranked_keyword = RankedKeyword(
keyword=item.get("keyword", ""),
position=float(item.get("position", 0)),
search_volume=int(item.get("search_volume", 0)),
cpc=float(item.get("cpc", 0)),
competition=float(item.get("competition", 0)),
url=item.get("url", ""),
country_code=item.get("country_code", "US"),
language=item.get("language", "en")
)
location_keywords.append(ranked_keyword)
# Sort by position (ascending)
location_keywords.sort(key=lambda x: x.position)
return location_keywords return location_keywords
except Exception as e: @app.route('/get_ranked_kw_for_domain', methods=['GET', 'POST'])
logger.error(f"Error processing ranked keywords: {str(e)}") def get_ranked_keywords():
return []
# Export ranked keywords to CSV
@app.post("/export-ranked-keywords-csv")
async def export_ranked_keywords_csv(request_data: DomainRequest):
"""
Export ranked keywords to CSV file
"""
try: try:
client = get_dfs_client() if request.method == 'POST':
data = request.get_json()
domain = data.get('domain')
else:
domain = request.args.get('domain')
if not domain:
return jsonify({'error': 'Domain parameter is required'}), 400
username = os.getenv('DATAFORSEO_API_LOGIN')
password = os.getenv('DATAFORSEO_API_PASSWORD')
# Initialize DataForSEO client
client = RestClient(username,password )
# Prepare post data
post_data = { post_data = {
1112: { 1112: {
"domain": request_data.domain, "domain": domain,
"country_code": request_data.country_code, "country_code": "US",
"language": request_data.language, "language": "en",
"limit": request_data.limit, "limit": 500,
"offset": 0, "offset": 0,
"orderby": "position,asc", "orderby": "position,asc",
"filters": [["search_volume", ">=", request_data.min_search_volume]] "filters": [["search_volume", ">=", 1]]
} }
} }
# Make API request
response = client.post("/v2/kwrd_finder_ranked_keywords_get", {"data": post_data}) response = client.post("/v2/kwrd_finder_ranked_keywords_get", {"data": post_data})
# Process results return response
location_keywords = process_ranked_keywords(response)
if not location_keywords:
raise HTTPException(status_code=404, detail="No ranked keywords found for this domain")
# Convert to DataFrame
df = pd.DataFrame([keyword.dict() for keyword in location_keywords])
# Save to CSV in exports directory
exports_dir = "/app/exports"
os.makedirs(exports_dir, exist_ok=True)
filename = f"{request_data.domain}_ranked_keywords.csv"
filepath = os.path.join(exports_dir, filename)
df.to_csv(filepath, index=False, encoding='utf-8')
return {
"domain": request_data.domain,
"filename": filename,
"total_keywords": len(location_keywords),
"download_url": f"/download-csv/{filename}",
"message": f"تم تصدير {len(location_keywords)} كلمة مفتاحية إلى {filename} بنجاح"
}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"Export error: {str(e)}") return jsonify({'error': str(e)}), 500
# Existing endpoints (keep your previous implementations) @app.route('/get_ranked_kw_for_domain/<path:domain>/page<int:page>', methods=['GET'])
@app.get("/api/search") def get_ranked_keywords_paginated(domain, page):
async def search_ai_mode( try:
keyword: str = Query(..., description="Search keyword"), # Decode the domain if it's URL encoded
location_code: int = Query(..., description="Location code (integer)"), import urllib.parse
language_code: str = Query("en", description="Language code") domain = urllib.parse.unquote(domain)
):
"""
AI Mode SERP Search - Get comprehensive search results for a keyword
"""
# Your existing implementation here
pass
@app.post("/domain-keywords", response_model=DomainResponse) if domain not in cached_results:
async def get_domain_keywords(request: DomainRequest): return jsonify({'error': 'Domain not found in cache. Please call the main endpoint first.'}), 404
"""
Get all ranking keywords for a domain and their positions in Google search
"""
# Your existing implementation here
pass
@app.get("/download-csv/{filename}") location_keywords = cached_results[domain]
async def download_csv(filename: str): per_page = 10
""" total_pages = (len(location_keywords) + per_page - 1) // per_page
Download generated CSV file
"""
filepath = f"/app/exports/{filename}"
if os.path.exists(filepath):
return FileResponse(
path=filepath,
filename=filename,
media_type='text/csv',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
else:
raise HTTPException(status_code=404, detail="File not found")
if __name__ == "__main__": if page < 1 or page > total_pages:
import uvicorn return jsonify({'error': f'Page number must be between 1 and {total_pages}'}), 400
uvicorn.run(app, host="0.0.0.0", port=8000)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
return jsonify({
'total_results': len(location_keywords),
'page': page,
'per_page': per_page,
'total_pages': total_pages,
'results': location_keywords[start_idx:end_idx]
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=8000)