GitPasha MCP Server - Ready for deployment
هذا الالتزام موجود في:
511
server.py
Normal file
511
server.py
Normal file
@@ -0,0 +1,511 @@
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import base64
|
||||
from typing import Optional, Dict, Any, List
|
||||
import httpx
|
||||
from dotenv import load_dotenv
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
# Setup
|
||||
load_dotenv()
|
||||
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
|
||||
LOG_FILE = os.getenv("LOG_FILE", "server.log")
|
||||
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, LOG_LEVEL, logging.DEBUG),
|
||||
format="%(asctime)s | %(levelname)s | %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler(LOG_FILE, mode="a", encoding="utf-8"),
|
||||
logging.StreamHandler() # still prints to console
|
||||
],
|
||||
)
|
||||
|
||||
log = logging.getLogger("gitpasha")
|
||||
|
||||
mcp = FastMCP("Git Pasha Server")
|
||||
|
||||
API_KEY = os.getenv("GITPASHA_API_KEY", "")
|
||||
BASE_URL = os.getenv(
|
||||
"GITPASHA_BASE_URL",
|
||||
"https://app.gitpasha.com/api/v1").rstrip("/")
|
||||
|
||||
# "https://app.gitpasha.com/api/v1/repos/search?sort=stars&order=desc&limit=6"
|
||||
|
||||
if not API_KEY:
|
||||
log.warning("GITPASHA_API_KEY not set in .env")
|
||||
|
||||
|
||||
# Helpers
|
||||
def get_headers() -> Dict[str, str]:
|
||||
return {
|
||||
"Authorization": f"Bearer {API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def suggest_description(name: str) -> str:
|
||||
name = (name or "").strip()
|
||||
return f"{name or 'test'}: Repository for testing and experiments"
|
||||
|
||||
|
||||
def log_request(req: httpx.Request):
|
||||
safe_headers = {
|
||||
k: ("<hidden>" if k.lower() == "authorization" else v) for k,
|
||||
v in req.headers.items()
|
||||
}
|
||||
log.info(f"=> {req.method} {req.url}")
|
||||
log.debug(f"Headers: {safe_headers}")
|
||||
try:
|
||||
log.debug(f"Body: {req.content.decode()[:1000]}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def log_response(res: httpx.Response):
|
||||
log.info(f"<= {res.status_code} {res.request.method} {res.request.url}")
|
||||
log.debug(f"Response: {res.text[:2000]}")
|
||||
|
||||
|
||||
def build_client() -> httpx.Client:
|
||||
"""Create a sync HTTP client with logging hooks"""
|
||||
return httpx.Client(
|
||||
follow_redirects=False,
|
||||
timeout=30.0,
|
||||
event_hooks={"request": [log_request], "response": [log_response]},
|
||||
)
|
||||
|
||||
|
||||
def format_error(prefix: str, e: Exception) -> str:
|
||||
"""Format different error types"""
|
||||
if isinstance(e, httpx.HTTPStatusError):
|
||||
code = e.response.status_code
|
||||
text = e.response.text
|
||||
if code in (301, 302, 307, 308):
|
||||
return f"{prefix}: {code} - Redirect detected. Probably web UI, not API."
|
||||
if "CSRF" in text:
|
||||
return f"{prefix}: {code} - CSRF error. Wrong domain."
|
||||
if code in (401, 403):
|
||||
return f"{prefix}: {code} - Invalid or expired API key."
|
||||
return f"{prefix}: {code} - {text}"
|
||||
return f"{prefix}: {str(e)}"
|
||||
|
||||
|
||||
# Core API calls (sync)
|
||||
def api_create_repo(
|
||||
name: str,
|
||||
description: Optional[str] = None,
|
||||
private: bool = False
|
||||
) -> Dict[str, Any]:
|
||||
"""Create a repository"""
|
||||
payload = {
|
||||
"name": name,
|
||||
"description": description or suggest_description(name),
|
||||
"private": private
|
||||
}
|
||||
with build_client() as client:
|
||||
# Use /user/repos for creating a repository according to Gitea API
|
||||
res = client.post(
|
||||
f"{BASE_URL}/user/repos",
|
||||
headers=get_headers(),
|
||||
json=payload,
|
||||
)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
|
||||
def api_update_repo(
|
||||
repo: str, *, name: Optional[str] = None,
|
||||
description: Optional[str] = None, private: Optional[bool] = None
|
||||
) -> Dict[str, Any]:
|
||||
payload: Dict[str, Any] = {}
|
||||
if name is not None:
|
||||
payload["name"] = name
|
||||
if description is not None:
|
||||
payload["description"] = description
|
||||
if private is not None:
|
||||
payload["private"] = bool(private)
|
||||
if not payload:
|
||||
raise ValueError("No fields to update")
|
||||
|
||||
with build_client() as client:
|
||||
url = f"{BASE_URL}/repos/{repo}" # repo = "owner/repo"
|
||||
res = client.patch(url, headers=get_headers(), json=payload)
|
||||
if res.status_code == 405:
|
||||
res = client.put(url, headers=get_headers(), json=payload)
|
||||
res.raise_for_status()
|
||||
return res.json() if res.text else {"status": "ok"}
|
||||
|
||||
|
||||
def api_delete_repo(repo: str) -> Dict[str, Any]:
|
||||
"""Delete repository"""
|
||||
with build_client() as client:
|
||||
url = f"{BASE_URL}/repos/{repo}" # repo = "owner/repo"
|
||||
res = client.delete(url, headers=get_headers())
|
||||
if res.status_code in (200, 202, 204):
|
||||
return {"status": "deleted"}
|
||||
res.raise_for_status()
|
||||
return res.json() if res.text else {"status": "deleted"}
|
||||
|
||||
|
||||
def api_create_readme(repo: str, content: str) -> Dict[str, Any]:
|
||||
"""Create a README.md file in the repository."""
|
||||
with build_client() as client:
|
||||
# Use PUT on contents endpoint as per Gitea API
|
||||
url = f"{BASE_URL}/repos/{repo}/contents/README.md"
|
||||
# Encode content in base64 per API spec
|
||||
encoded = base64.b64encode(content.encode("utf-8")).decode("utf-8")
|
||||
body = {
|
||||
"message": "Add README",
|
||||
"content": encoded,
|
||||
}
|
||||
res = client.put(url, headers=get_headers(), json=body)
|
||||
if res.status_code < 300:
|
||||
return res.json()
|
||||
return {
|
||||
"error": "Failed to create README",
|
||||
"response": res.text,
|
||||
}
|
||||
|
||||
|
||||
# Issues
|
||||
def api_list_issues(
|
||||
repo: str,
|
||||
state: Optional[str] = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List issues. state can be 'open', 'closed', or 'all'"""
|
||||
params = {}
|
||||
if state:
|
||||
params["state"] = state
|
||||
with build_client() as client:
|
||||
url = f"{BASE_URL}/repos/{repo}/issues" # repo = "owner/repo"
|
||||
res = client.get(url, headers=get_headers(), params=params)
|
||||
res.raise_for_status()
|
||||
try:
|
||||
return res.json()
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def api_create_issue(
|
||||
repo: str,
|
||||
title: str,
|
||||
body: str = "",
|
||||
labels: Optional[List[str]] = None,
|
||||
assignees: Optional[List[str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Create a new issue"""
|
||||
payload: Dict[str, Any] = {"title": title, "body": body}
|
||||
if labels:
|
||||
payload["labels"] = labels
|
||||
if assignees:
|
||||
payload["assignees"] = assignees
|
||||
with build_client() as client:
|
||||
url = f"{BASE_URL}/repos/{repo}/issues" # repo = "owner/repo"
|
||||
res = client.post(url, headers=get_headers(), json=payload)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
|
||||
def api_update_issue(
|
||||
repo: str,
|
||||
issue: str,
|
||||
title: Optional[str] = None,
|
||||
body: Optional[str] = None,
|
||||
state: Optional[str] = None,
|
||||
labels: Optional[List[str]] = None,
|
||||
assignees: Optional[List[str]] = None,
|
||||
comment: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Update an issue and/or add a comment"""
|
||||
out: Dict[str, Any] = {}
|
||||
with build_client() as client:
|
||||
# Update fields
|
||||
patch_payload: Dict[str, Any] = {}
|
||||
if title is not None:
|
||||
patch_payload["title"] = title
|
||||
if body is not None:
|
||||
patch_payload["body"] = body
|
||||
if state is not None:
|
||||
patch_payload["state"] = state # e.g., "open" or "closed"
|
||||
if labels is not None:
|
||||
patch_payload["labels"] = labels
|
||||
if assignees is not None:
|
||||
patch_payload["assignees"] = assignees
|
||||
|
||||
if patch_payload:
|
||||
url = f"{BASE_URL}/repos/{repo}/issues/{issue}"
|
||||
res = client.patch(url, headers=get_headers(), json=patch_payload)
|
||||
if res.status_code == 405:
|
||||
res = client.put(
|
||||
url,
|
||||
headers=get_headers(),
|
||||
json=patch_payload
|
||||
)
|
||||
res.raise_for_status()
|
||||
out["update"] = res.json() if res.text else {"status": "ok"}
|
||||
|
||||
# Optional comment
|
||||
if comment:
|
||||
url_c = f"{BASE_URL}/repos/{repo}/issues/{issue}/comments"
|
||||
res_c = client.post(
|
||||
url_c,
|
||||
headers=get_headers(),
|
||||
json={
|
||||
"body": comment
|
||||
}
|
||||
)
|
||||
res_c.raise_for_status()
|
||||
out["comment"] = res_c.json() if res_c.text else {
|
||||
"status": "commented"
|
||||
}
|
||||
|
||||
return out or {"status": "no-op"}
|
||||
|
||||
|
||||
# Pull Requests
|
||||
def api_open_pr(
|
||||
repo: str,
|
||||
title: str,
|
||||
head: str,
|
||||
base: str,
|
||||
body: str = ""
|
||||
) -> Dict[str, Any]:
|
||||
"""Open a pull request from head -> base"""
|
||||
payload = {"title": title, "head": head, "base": base, "body": body}
|
||||
with build_client() as client:
|
||||
url = f"{BASE_URL}/repos/{repo}/pulls" # repo = "owner/repo"
|
||||
res = client.post(url, headers=get_headers(), json=payload)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
|
||||
# MCP Tools
|
||||
@mcp.tool()
|
||||
def create_repo(
|
||||
name: str,
|
||||
description: str = "",
|
||||
private: bool = False,
|
||||
create_readme: bool = True,
|
||||
readme_content: str = "# Test Repository\n\nThis repository is for testing.\n",
|
||||
) -> str:
|
||||
"""Create repo and optional README"""
|
||||
if not API_KEY:
|
||||
return "GITPASHA_API_KEY not set"
|
||||
|
||||
desc = description or suggest_description(name)
|
||||
log.info(f"Creating repo: {name}")
|
||||
try:
|
||||
repo = api_create_repo(name, desc, private)
|
||||
repo_id = repo.get("full_name") or repo.get("name") or name # "owner/repo"
|
||||
repo_url = repo.get("html_url") or repo.get("web_url") or "N/A"
|
||||
readme_status = "skipped"
|
||||
if create_readme:
|
||||
try:
|
||||
r = api_create_readme(str(repo_id), readme_content)
|
||||
readme_status = "created" if "error" not in r else "failed"
|
||||
except Exception as e:
|
||||
readme_status = f"failed: {e}"
|
||||
result = {
|
||||
"status": "success",
|
||||
"name": name,
|
||||
"description": desc,
|
||||
"private": private,
|
||||
"repo_url": repo_url,
|
||||
"readme": readme_status
|
||||
}
|
||||
return json.dumps(result, ensure_ascii=False, indent=2)
|
||||
except httpx.HTTPStatusError as e:
|
||||
return format_error("Repo creation failed", e)
|
||||
except Exception as e:
|
||||
log.exception("Unexpected error")
|
||||
return f"{e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def update_repo(
|
||||
repo: str,
|
||||
name: str = "",
|
||||
description: str = "",
|
||||
set_private: bool = False,
|
||||
private: bool = False,
|
||||
) -> str:
|
||||
"""Update a repository metadata"""
|
||||
try:
|
||||
new_name = name or None
|
||||
new_desc = description or None
|
||||
new_priv = private if set_private else None
|
||||
out = api_update_repo(
|
||||
repo,
|
||||
name=new_name,
|
||||
description=new_desc,
|
||||
private=new_priv
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "success",
|
||||
"repo": repo,
|
||||
"result": out
|
||||
},
|
||||
ensure_ascii=False
|
||||
)
|
||||
except httpx.HTTPStatusError as e:
|
||||
return format_error("Repo update failed", e)
|
||||
except Exception as e:
|
||||
log.exception("Unexpected error while updating repo")
|
||||
return f"{e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def delete_repo(repo: str) -> str:
|
||||
"""Delete a repository"""
|
||||
try:
|
||||
out = api_delete_repo(repo)
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "success",
|
||||
"repo": repo,
|
||||
"result": out
|
||||
},
|
||||
ensure_ascii=False
|
||||
)
|
||||
except httpx.HTTPStatusError as e:
|
||||
return format_error("Repo delete failed", e)
|
||||
except Exception as e:
|
||||
log.exception("Unexpected error while deleting repo")
|
||||
return f"{e}"
|
||||
|
||||
|
||||
# Issues tools
|
||||
@mcp.tool()
|
||||
def issues_list(repo: str, state: str = "open") -> str:
|
||||
"""List issues. state: open|closed|all"""
|
||||
try:
|
||||
items = api_list_issues(
|
||||
repo,
|
||||
state if state in ("open", "closed", "all") else None)
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "success",
|
||||
"count": len(items),
|
||||
"items": items
|
||||
},
|
||||
ensure_ascii=False
|
||||
)
|
||||
except httpx.HTTPStatusError as e:
|
||||
return format_error("Issues list failed", e)
|
||||
except Exception as e:
|
||||
log.exception("Unexpected error while listing issues")
|
||||
return f"{e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def issue_create(
|
||||
repo: str,
|
||||
title: str,
|
||||
body: str = "",
|
||||
labels_csv: str = "",
|
||||
assignees_csv: str = ""
|
||||
) -> str:
|
||||
"""Create issue. labels_csv and assignees_csv are comma-separated"""
|
||||
try:
|
||||
labels = [
|
||||
x.strip() for x in labels_csv.split(",") if x.strip()
|
||||
] if labels_csv else None
|
||||
assignees = [
|
||||
x.strip() for x in assignees_csv.split(",") if x.strip()
|
||||
] if assignees_csv else None
|
||||
item = api_create_issue(
|
||||
repo,
|
||||
title=title,
|
||||
body=body,
|
||||
labels=labels,
|
||||
assignees=assignees
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "success",
|
||||
"item": item
|
||||
},
|
||||
ensure_ascii=False
|
||||
)
|
||||
except httpx.HTTPStatusError as e:
|
||||
return format_error("Issue create failed", e)
|
||||
except Exception as e:
|
||||
log.exception("Unexpected error while creating issue")
|
||||
return f"{e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def issue_update(
|
||||
repo: str,
|
||||
issue: str,
|
||||
title: str = "",
|
||||
body: str = "",
|
||||
state: str = "",
|
||||
labels_csv: str = "",
|
||||
assignees_csv: str = "",
|
||||
comment: str = "",
|
||||
) -> str:
|
||||
"""Update issue fields and/or add a comment"""
|
||||
try:
|
||||
labels = [
|
||||
x.strip() for x in labels_csv.split(",") if x.strip()
|
||||
] if labels_csv else None
|
||||
assignees = [
|
||||
x.strip() for x in assignees_csv.split(",") if x.strip()
|
||||
] if assignees_csv else None
|
||||
st = state or None
|
||||
# Empty strings -> None (skip)
|
||||
t = title or None
|
||||
b = body or None
|
||||
c = comment or None
|
||||
out = api_update_issue(
|
||||
repo,
|
||||
issue,
|
||||
title=t,
|
||||
body=b,
|
||||
state=st,
|
||||
labels=labels,
|
||||
assignees=assignees,
|
||||
comment=c
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "success",
|
||||
"result": out
|
||||
},
|
||||
ensure_ascii=False
|
||||
)
|
||||
except httpx.HTTPStatusError as e:
|
||||
return format_error("Issue update failed", e)
|
||||
except Exception as e:
|
||||
log.exception("Unexpected error while updating issue")
|
||||
return f"{e}"
|
||||
|
||||
|
||||
# Pull Requests tools
|
||||
@mcp.tool()
|
||||
def pr_open(repo: str,
|
||||
title: str,
|
||||
head: str,
|
||||
base: str,
|
||||
body: str = ""
|
||||
) -> str:
|
||||
"""Open a pull request from head -> base"""
|
||||
try:
|
||||
pr = api_open_pr(repo, title=title, head=head, base=base, body=body)
|
||||
return json.dumps({"status": "success", "pr": pr}, ensure_ascii=False)
|
||||
except httpx.HTTPStatusError as e:
|
||||
return format_error("PR open failed", e)
|
||||
except Exception as e:
|
||||
log.exception("Unexpected error while opening PR")
|
||||
return f"{e}"
|
||||
|
||||
|
||||
# Entry Point
|
||||
if __name__ == "__main__":
|
||||
log.info(f"Starting MCP server | BASE_URL={BASE_URL}")
|
||||
mcp.run(transport="stdio")
|
||||
المرجع في مشكلة جديدة
حظر مستخدم