الملفات
MCP_SERCVER_SEARCH/services/search.py

35 أسطر
936 B
Python

import httpx
import json
import os
from config import SERPER_URL, USER_AGENT
# Function: search docs via Google Serper API
async def search_web(query: str) -> dict | None:
# search query, 2 results only
payload = json.dumps({"q": query, "num": 2})
headers = {
"X-API-KEY": os.getenv("SERPER_API_KEY"),
"Content-Type": "application/json",
"User-Agent": USER_AGENT,
}
# Send POST request to Serper API
async with httpx.AsyncClient() as client:
try:
response = await client.post(
SERPER_URL,
headers=headers,
data=payload,
timeout=30.0
)
response.raise_for_status()
# return search results
return response.json()
except httpx.TimeoutException:
# return empty if timeout
return {
"organic": []
}