الملفات
GitPasha-MCP-Server/server.py

512 أسطر
15 KiB
Python

import os
import json
import logging
import base64
from typing import Optional, Dict, Any, List
import httpx
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
# Setup
load_dotenv()
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
LOG_FILE = os.getenv("LOG_FILE", "server.log")
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.DEBUG),
format="%(asctime)s | %(levelname)s | %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, mode="a", encoding="utf-8"),
logging.StreamHandler() # still prints to console
],
)
log = logging.getLogger("gitpasha")
mcp = FastMCP("Git Pasha Server")
API_KEY = os.getenv("GITPASHA_API_KEY", "")
BASE_URL = os.getenv(
"GITPASHA_BASE_URL",
"https://app.gitpasha.com/api/v1").rstrip("/")
# "https://app.gitpasha.com/api/v1/repos/search?sort=stars&order=desc&limit=6"
if not API_KEY:
log.warning("GITPASHA_API_KEY not set in .env")
# Helpers
def get_headers() -> Dict[str, str]:
return {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def suggest_description(name: str) -> str:
name = (name or "").strip()
return f"{name or 'test'}: Repository for testing and experiments"
def log_request(req: httpx.Request):
safe_headers = {
k: ("<hidden>" if k.lower() == "authorization" else v) for k,
v in req.headers.items()
}
log.info(f"=> {req.method} {req.url}")
log.debug(f"Headers: {safe_headers}")
try:
log.debug(f"Body: {req.content.decode()[:1000]}")
except Exception:
pass
def log_response(res: httpx.Response):
log.info(f"<= {res.status_code} {res.request.method} {res.request.url}")
log.debug(f"Response: {res.text[:2000]}")
def build_client() -> httpx.Client:
"""Create a sync HTTP client with logging hooks"""
return httpx.Client(
follow_redirects=False,
timeout=30.0,
event_hooks={"request": [log_request], "response": [log_response]},
)
def format_error(prefix: str, e: Exception) -> str:
"""Format different error types"""
if isinstance(e, httpx.HTTPStatusError):
code = e.response.status_code
text = e.response.text
if code in (301, 302, 307, 308):
return f"{prefix}: {code} - Redirect detected. Probably web UI, not API."
if "CSRF" in text:
return f"{prefix}: {code} - CSRF error. Wrong domain."
if code in (401, 403):
return f"{prefix}: {code} - Invalid or expired API key."
return f"{prefix}: {code} - {text}"
return f"{prefix}: {str(e)}"
# Core API calls (sync)
def api_create_repo(
name: str,
description: Optional[str] = None,
private: bool = False
) -> Dict[str, Any]:
"""Create a repository"""
payload = {
"name": name,
"description": description or suggest_description(name),
"private": private
}
with build_client() as client:
# Use /user/repos for creating a repository according to Gitea API
res = client.post(
f"{BASE_URL}/user/repos",
headers=get_headers(),
json=payload,
)
res.raise_for_status()
return res.json()
def api_update_repo(
repo: str, *, name: Optional[str] = None,
description: Optional[str] = None, private: Optional[bool] = None
) -> Dict[str, Any]:
payload: Dict[str, Any] = {}
if name is not None:
payload["name"] = name
if description is not None:
payload["description"] = description
if private is not None:
payload["private"] = bool(private)
if not payload:
raise ValueError("No fields to update")
with build_client() as client:
url = f"{BASE_URL}/repos/{repo}" # repo = "owner/repo"
res = client.patch(url, headers=get_headers(), json=payload)
if res.status_code == 405:
res = client.put(url, headers=get_headers(), json=payload)
res.raise_for_status()
return res.json() if res.text else {"status": "ok"}
def api_delete_repo(repo: str) -> Dict[str, Any]:
"""Delete repository"""
with build_client() as client:
url = f"{BASE_URL}/repos/{repo}" # repo = "owner/repo"
res = client.delete(url, headers=get_headers())
if res.status_code in (200, 202, 204):
return {"status": "deleted"}
res.raise_for_status()
return res.json() if res.text else {"status": "deleted"}
def api_create_readme(repo: str, content: str) -> Dict[str, Any]:
"""Create a README.md file in the repository."""
with build_client() as client:
# Use PUT on contents endpoint as per Gitea API
url = f"{BASE_URL}/repos/{repo}/contents/README.md"
# Encode content in base64 per API spec
encoded = base64.b64encode(content.encode("utf-8")).decode("utf-8")
body = {
"message": "Add README",
"content": encoded,
}
res = client.put(url, headers=get_headers(), json=body)
if res.status_code < 300:
return res.json()
return {
"error": "Failed to create README",
"response": res.text,
}
# Issues
def api_list_issues(
repo: str,
state: Optional[str] = None
) -> List[Dict[str, Any]]:
"""List issues. state can be 'open', 'closed', or 'all'"""
params = {}
if state:
params["state"] = state
with build_client() as client:
url = f"{BASE_URL}/repos/{repo}/issues" # repo = "owner/repo"
res = client.get(url, headers=get_headers(), params=params)
res.raise_for_status()
try:
return res.json()
except Exception:
return []
def api_create_issue(
repo: str,
title: str,
body: str = "",
labels: Optional[List[str]] = None,
assignees: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Create a new issue"""
payload: Dict[str, Any] = {"title": title, "body": body}
if labels:
payload["labels"] = labels
if assignees:
payload["assignees"] = assignees
with build_client() as client:
url = f"{BASE_URL}/repos/{repo}/issues" # repo = "owner/repo"
res = client.post(url, headers=get_headers(), json=payload)
res.raise_for_status()
return res.json()
def api_update_issue(
repo: str,
issue: str,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
labels: Optional[List[str]] = None,
assignees: Optional[List[str]] = None,
comment: Optional[str] = None
) -> Dict[str, Any]:
"""Update an issue and/or add a comment"""
out: Dict[str, Any] = {}
with build_client() as client:
# Update fields
patch_payload: Dict[str, Any] = {}
if title is not None:
patch_payload["title"] = title
if body is not None:
patch_payload["body"] = body
if state is not None:
patch_payload["state"] = state # e.g., "open" or "closed"
if labels is not None:
patch_payload["labels"] = labels
if assignees is not None:
patch_payload["assignees"] = assignees
if patch_payload:
url = f"{BASE_URL}/repos/{repo}/issues/{issue}"
res = client.patch(url, headers=get_headers(), json=patch_payload)
if res.status_code == 405:
res = client.put(
url,
headers=get_headers(),
json=patch_payload
)
res.raise_for_status()
out["update"] = res.json() if res.text else {"status": "ok"}
# Optional comment
if comment:
url_c = f"{BASE_URL}/repos/{repo}/issues/{issue}/comments"
res_c = client.post(
url_c,
headers=get_headers(),
json={
"body": comment
}
)
res_c.raise_for_status()
out["comment"] = res_c.json() if res_c.text else {
"status": "commented"
}
return out or {"status": "no-op"}
# Pull Requests
def api_open_pr(
repo: str,
title: str,
head: str,
base: str,
body: str = ""
) -> Dict[str, Any]:
"""Open a pull request from head -> base"""
payload = {"title": title, "head": head, "base": base, "body": body}
with build_client() as client:
url = f"{BASE_URL}/repos/{repo}/pulls" # repo = "owner/repo"
res = client.post(url, headers=get_headers(), json=payload)
res.raise_for_status()
return res.json()
# MCP Tools
@mcp.tool()
def create_repo(
name: str,
description: str = "",
private: bool = False,
create_readme: bool = True,
readme_content: str = "# Test Repository\n\nThis repository is for testing.\n",
) -> str:
"""Create repo and optional README"""
if not API_KEY:
return "GITPASHA_API_KEY not set"
desc = description or suggest_description(name)
log.info(f"Creating repo: {name}")
try:
repo = api_create_repo(name, desc, private)
repo_id = repo.get("full_name") or repo.get("name") or name # "owner/repo"
repo_url = repo.get("html_url") or repo.get("web_url") or "N/A"
readme_status = "skipped"
if create_readme:
try:
r = api_create_readme(str(repo_id), readme_content)
readme_status = "created" if "error" not in r else "failed"
except Exception as e:
readme_status = f"failed: {e}"
result = {
"status": "success",
"name": name,
"description": desc,
"private": private,
"repo_url": repo_url,
"readme": readme_status
}
return json.dumps(result, ensure_ascii=False, indent=2)
except httpx.HTTPStatusError as e:
return format_error("Repo creation failed", e)
except Exception as e:
log.exception("Unexpected error")
return f"{e}"
@mcp.tool()
def update_repo(
repo: str,
name: str = "",
description: str = "",
set_private: bool = False,
private: bool = False,
) -> str:
"""Update a repository metadata"""
try:
new_name = name or None
new_desc = description or None
new_priv = private if set_private else None
out = api_update_repo(
repo,
name=new_name,
description=new_desc,
private=new_priv
)
return json.dumps(
{
"status": "success",
"repo": repo,
"result": out
},
ensure_ascii=False
)
except httpx.HTTPStatusError as e:
return format_error("Repo update failed", e)
except Exception as e:
log.exception("Unexpected error while updating repo")
return f"{e}"
@mcp.tool()
def delete_repo(repo: str) -> str:
"""Delete a repository"""
try:
out = api_delete_repo(repo)
return json.dumps(
{
"status": "success",
"repo": repo,
"result": out
},
ensure_ascii=False
)
except httpx.HTTPStatusError as e:
return format_error("Repo delete failed", e)
except Exception as e:
log.exception("Unexpected error while deleting repo")
return f"{e}"
# Issues tools
@mcp.tool()
def issues_list(repo: str, state: str = "open") -> str:
"""List issues. state: open|closed|all"""
try:
items = api_list_issues(
repo,
state if state in ("open", "closed", "all") else None)
return json.dumps(
{
"status": "success",
"count": len(items),
"items": items
},
ensure_ascii=False
)
except httpx.HTTPStatusError as e:
return format_error("Issues list failed", e)
except Exception as e:
log.exception("Unexpected error while listing issues")
return f"{e}"
@mcp.tool()
def issue_create(
repo: str,
title: str,
body: str = "",
labels_csv: str = "",
assignees_csv: str = ""
) -> str:
"""Create issue. labels_csv and assignees_csv are comma-separated"""
try:
labels = [
x.strip() for x in labels_csv.split(",") if x.strip()
] if labels_csv else None
assignees = [
x.strip() for x in assignees_csv.split(",") if x.strip()
] if assignees_csv else None
item = api_create_issue(
repo,
title=title,
body=body,
labels=labels,
assignees=assignees
)
return json.dumps(
{
"status": "success",
"item": item
},
ensure_ascii=False
)
except httpx.HTTPStatusError as e:
return format_error("Issue create failed", e)
except Exception as e:
log.exception("Unexpected error while creating issue")
return f"{e}"
@mcp.tool()
def issue_update(
repo: str,
issue: str,
title: str = "",
body: str = "",
state: str = "",
labels_csv: str = "",
assignees_csv: str = "",
comment: str = "",
) -> str:
"""Update issue fields and/or add a comment"""
try:
labels = [
x.strip() for x in labels_csv.split(",") if x.strip()
] if labels_csv else None
assignees = [
x.strip() for x in assignees_csv.split(",") if x.strip()
] if assignees_csv else None
st = state or None
# Empty strings -> None (skip)
t = title or None
b = body or None
c = comment or None
out = api_update_issue(
repo,
issue,
title=t,
body=b,
state=st,
labels=labels,
assignees=assignees,
comment=c
)
return json.dumps(
{
"status": "success",
"result": out
},
ensure_ascii=False
)
except httpx.HTTPStatusError as e:
return format_error("Issue update failed", e)
except Exception as e:
log.exception("Unexpected error while updating issue")
return f"{e}"
# Pull Requests tools
@mcp.tool()
def pr_open(repo: str,
title: str,
head: str,
base: str,
body: str = ""
) -> str:
"""Open a pull request from head -> base"""
try:
pr = api_open_pr(repo, title=title, head=head, base=base, body=body)
return json.dumps({"status": "success", "pr": pr}, ensure_ascii=False)
except httpx.HTTPStatusError as e:
return format_error("PR open failed", e)
except Exception as e:
log.exception("Unexpected error while opening PR")
return f"{e}"
# Entry Point
if __name__ == "__main__":
log.info(f"Starting MCP server | BASE_URL={BASE_URL}")
mcp.run(transport="stdio")