After standalone Testing

This commit is contained in:
2025-09-18 07:21:59 +02:00
parent 69e6e0659f
commit 8a7c8ad75a

519
webhook_endpoint.py Normal file
View File

@@ -0,0 +1,519 @@
#!/usr/bin/env python3
import os
import json
import base64
import sqlite3
import logging
import hmac
import hashlib
import contextlib
from typing import Optional, Tuple, Dict, Any
import httpx
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from github import Github, Auth
# ---------------- Logging (UTC) ----------------
class UTCFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
import datetime as dt
from datetime import timezone
ts = dt.datetime.fromtimestamp(record.created, tz=timezone.utc)
return ts.isoformat(timespec="seconds").replace("+00:00", "Z")
def configure_logging():
handler = logging.StreamHandler()
handler.setFormatter(UTCFormatter("[%(asctime)s] %(levelname)s %(message)s"))
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
level = os.environ.get("LOG_LEVEL", "INFO").upper()
root.setLevel(getattr(logging, level, logging.INFO))
configure_logging()
logger = logging.getLogger(__name__)
# ---------------- Config via env ----------------
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "").strip()
if not GITHUB_TOKEN:
logger.warning("GITHUB_TOKEN is not set. You must set it unless GITHUB_DRY_RUN=true.")
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "").strip()
if not WEBHOOK_SECRET:
logger.warning("WEBHOOK_SECRET is not set. Webhook signature verification will fail.")
# Canonical links
GITEA_BASE_URL = os.environ.get("GITEA_BASE_URL", "https://src.koozali.org").rstrip("/")
GITEA_ORG = os.environ.get("GITEA_ORG", "smeserver").strip() or "smeserver"
# Bugzilla config
BUGZILLA_BASE_URL = os.environ.get("BUGZILLA_BASE_URL", "https://bugs.koozali.org").rstrip("/")
BUGZILLA_AUTH_MODE = os.environ.get("BUGZILLA_AUTH_MODE", "").strip().lower() # "api_key" or "basic" (or auto)
BUGZILLA_API_KEY = os.environ.get("BUGZILLA_API_KEY", "").strip()
BUGZILLA_USER = os.environ.get("BUGZILLA_USER", "").strip()
BUGZILLA_PASSWORD = os.environ.get("BUGZILLA_PASSWORD", "").strip()
# REQUIRED fields for bug creation
BUGZILLA_PRODUCT = os.environ.get("BUGZILLA_PRODUCT", "").strip()
BUGZILLA_VERSION = os.environ.get("BUGZILLA_VERSION", "").strip()
# Component default (as requested)
BUGZILLA_COMPONENT_DEFAULT = os.environ.get("BUGZILLA_COMPONENT_DEFAULT", "e-smith-*/smeserver-* packages").strip()
# In case we still need a fallback name for unexpected errors
BUGZILLA_COMPONENT_FALLBACK = os.environ.get("BUGZILLA_COMPONENT_FALLBACK", BUGZILLA_COMPONENT_DEFAULT).strip()
# Custom field settings
CF_PACKAGE_FIELD = os.environ.get("BUGZILLA_CF_PACKAGE_FIELD", "cf_package").strip()
CF_PACKAGE_FALLBACK = os.environ.get("BUGZILLA_CF_PACKAGE_FALLBACK", "---").strip()
# Behavior toggles
BUGZILLA_ATTACH_DIFF = os.environ.get("BUGZILLA_ATTACH_DIFF", "true").strip().lower() == "true"
BUGZILLA_FAILURE_LABEL = os.environ.get("BUGZILLA_FAILURE_LABEL", "bugzilla-needed").strip()
GITHUB_DRY_RUN = os.environ.get("GITHUB_DRY_RUN", "false").strip().lower() == "true"
if GITHUB_DRY_RUN:
logger.info("GITHUB_DRY_RUN is enabled. The server will not comment/close PRs.")
# SQLite for PR→Bug mapping
STATE_PATH = os.environ.get("STATE_PATH", "./webhook_state.sqlite")
# ---------------- State store ----------------
class State:
def __init__(self, path: str):
self.conn = sqlite3.connect(path, check_same_thread=False)
self._init()
def _init(self):
c = self.conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS pr_map (
pr_key TEXT PRIMARY KEY,
bug_id INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)""")
self.conn.commit()
def get_bug(self, pr_key: str) -> Optional[int]:
c = self.conn.cursor()
c.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,))
row = c.fetchone()
return row[0] if row else None
def set_bug(self, pr_key: str, bug_id: int):
import datetime as dt
from datetime import timezone
now = dt.datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
c = self.conn.cursor()
c.execute("""
INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at
""", (pr_key, bug_id, now, now))
self.conn.commit()
# ---------------- Bugzilla hybrid client ----------------
class BugzillaHybrid:
def __init__(self):
self.base = BUGZILLA_BASE_URL
self.mode = None # "rest" or "jsonrpc"
self.rest_base = None
self.rpc_url = f"{self.base}/jsonrpc.cgi"
self._token: Optional[str] = None # for basic via JSON-RPC
self._detect_mode()
# Log auth status
if BUGZILLA_AUTH_MODE in ("api_key", "") and BUGZILLA_API_KEY:
logger.info("Bugzilla auth: API key mode")
elif BUGZILLA_AUTH_MODE == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD):
logger.info("Bugzilla auth: basic/login (JSON-RPC token) mode")
else:
logger.warning("Bugzilla auth NOT configured yet (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD).")
def _detect_mode(self):
# Try REST at /rest then /rest.cgi
try:
r = httpx.get(f"{self.base}/rest/version", timeout=10)
if r.status_code == 200:
self.mode = "rest"
self.rest_base = f"{self.base}/rest"
logger.info("Bugzilla REST detected at /rest")
return
except Exception:
pass
try:
r = httpx.get(f"{self.base}/rest.cgi/version", timeout=10)
if r.status_code == 200:
self.mode = "rest"
self.rest_base = f"{self.base}/rest.cgi"
logger.info("Bugzilla REST detected at /rest.cgi")
return
except Exception:
pass
# Fallback to JSON-RPC
self.mode = "jsonrpc"
logger.warning("Bugzilla REST not available; falling back to JSON-RPC")
# ---------- REST helpers ----------
def _rest_headers(self) -> Dict[str, str]:
h = {"Accept": "application/json", "Content-Type": "application/json"}
# Prefer API key if available
if BUGZILLA_API_KEY:
h["X-BUGZILLA-API-KEY"] = BUGZILLA_API_KEY
return h
def _rest_auth(self) -> Optional[Tuple[str, str]]:
if BUGZILLA_AUTH_MODE == "basic" and BUGZILLA_USER and BUGZILLA_PASSWORD:
return (BUGZILLA_USER, BUGZILLA_PASSWORD)
return None
# ---------- JSON-RPC helpers ----------
def _rpc_call(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]:
body = {"method": method, "params": [params_obj], "id": req_id}
r = httpx.post(self.rpc_url, headers={"Content-Type": "application/json"}, json=body, timeout=60)
r.raise_for_status()
data = r.json()
if data.get("error"):
err = data["error"]
raise RuntimeError(f"Bugzilla RPC error {err.get('code')}: {err.get('message')}")
return data.get("result", {})
def _rpc(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]:
# Decide auth automatically
mode = BUGZILLA_AUTH_MODE
if (mode in ("", "api_key") and BUGZILLA_API_KEY):
params_obj = dict(params_obj, Bugzilla_api_key=BUGZILLA_API_KEY)
elif (mode == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD)):
if not self._token:
self._token = self._login_rpc()
params_obj = dict(params_obj, Bugzilla_token=self._token)
else:
raise RuntimeError("Bugzilla auth not configured (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD)")
return self._rpc_call(method, params_obj, req_id=req_id)
def _login_rpc(self) -> str:
if not (BUGZILLA_USER and BUGZILLA_PASSWORD):
raise RuntimeError("Bugzilla basic auth requires BUGZILLA_USER and BUGZILLA_PASSWORD")
data = self._rpc_call("User.login", {"login": BUGZILLA_USER, "password": BUGZILLA_PASSWORD}, req_id=0)
token = data.get("token")
if not token:
raise RuntimeError("Bugzilla login did not return a token")
logger.info("Bugzilla JSON-RPC login succeeded")
return token
# ---------- Public API ----------
def create_bug(self, summary: str, description: str, component: str) -> int:
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug"
payload = {
"product": BUGZILLA_PRODUCT,
"component": component or BUGZILLA_COMPONENT_DEFAULT,
"version": BUGZILLA_VERSION,
"summary": summary,
"description": description,
}
r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST create not available", request=r.request, response=r)
r.raise_for_status()
data = r.json()
bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"])
if not bug_id:
raise RuntimeError(f"REST create result missing id: {data}")
return int(bug_id)
except Exception as e:
logger.warning(f"Bugzilla REST create failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
# JSON-RPC
params = {
"product": BUGZILLA_PRODUCT,
"component": component or BUGZILLA_COMPONENT_DEFAULT,
"version": BUGZILLA_VERSION,
"summary": summary,
"description": description,
}
res = self._rpc("Bug.create", params, req_id=1)
bug_id = res.get("id") or (res.get("bugs") and res["bugs"][0]["id"])
if not bug_id:
raise RuntimeError(f"RPC create result missing id: {res}")
return int(bug_id)
def add_attachment(self, bug_id: int, filename: str, summary: str, content_bytes: bytes):
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}/attachment"
payload = {
"ids": [bug_id],
"data": base64.b64encode(content_bytes).decode("ascii"),
"file_name": filename,
"summary": summary,
"content_type": "text/x-patch",
"is_patch": True,
}
r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=120)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST attachment not available", request=r.request, response=r)
r.raise_for_status()
return
except Exception as e:
logger.warning(f"Bugzilla REST add_attachment failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
params = {
"ids": [bug_id],
"data": base64.b64encode(content_bytes).decode("ascii"),
"file_name": filename,
"summary": summary,
"content_type": "text/x-patch",
"is_patch": True,
}
self._rpc("Bug.add_attachment", params, req_id=3)
def add_comment(self, bug_id: int, comment: str):
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}/comment"
payload = {"comment": comment}
r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST add_comment not available", request=r.request, response=r)
r.raise_for_status()
return
except Exception as e:
logger.warning(f"Bugzilla REST add_comment failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
self._rpc("Bug.add_comment", {"id": bug_id, "comment": comment}, req_id=2)
def update_bug_fields(self, bug_id: int, fields: Dict[str, Any]):
# REST first
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}"
r = httpx.put(url, headers=self._rest_headers(), auth=self._rest_auth(), json=fields, timeout=60)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST update not available", request=r.request, response=r)
r.raise_for_status()
return
except Exception as e:
logger.warning(f"Bugzilla REST update failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
# JSON-RPC fallback
params = {"ids": [bug_id]}
params.update(fields)
self._rpc("Bug.update", params, req_id=5)
# ---------------- GitHub client (PyGithub) ----------------
gh = Github(auth=Auth.Token(GITHUB_TOKEN)) if GITHUB_TOKEN else Github()
state = State(STATE_PATH)
bz = BugzillaHybrid()
app = FastAPI()
# ---------------- Helpers ----------------
def verify_signature(secret: str, body: bytes, signature: str):
if not signature:
raise HTTPException(status_code=401, detail="Missing signature")
expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
def pr_comment_success(gitea_repo_url: str, bug_id: int) -> str:
return (
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
f"- Canonical: {gitea_repo_url}\n"
f"- Please file and discuss changes in Bugzilla: {BUGZILLA_BASE_URL}\n\n"
f"We created Bug {bug_id} to track this proposal:\n"
f"- {BUGZILLA_BASE_URL}/show_bug.cgi?id={bug_id}\n\n"
"This pull request will be closed here. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n"
)
def pr_comment_failure(gitea_repo_url: str, repo: str, pr_url: str, target_branch: str) -> str:
# Uses default component name and required product/version in guidance
return (
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
f"- Canonical: {gitea_repo_url}\n\n"
"We were unable to create a Bugzilla ticket automatically at this time.\n"
f"Please open a bug at {BUGZILLA_BASE_URL} (Product: {BUGZILLA_PRODUCT}, Version: {BUGZILLA_VERSION}, Component: {BUGZILLA_COMPONENT_DEFAULT}) and include:\n"
f"- GitHub PR: {pr_url}\n"
f"- Target branch: {target_branch}\n"
"- Summary and rationale for the change\n\n"
"This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n"
)
def fetch_pr_patch(owner: str, repo: str, pr_num: int) -> bytes:
url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_num}"
headers = {"Accept": "application/vnd.github.v3.patch"}
if GITHUB_TOKEN:
headers["Authorization"] = f"token {GITHUB_TOKEN}"
r = httpx.get(url, headers=headers, timeout=120)
r.raise_for_status()
return r.content
# ---------------- Routes ----------------
@app.get("/healthz")
async def healthz():
return PlainTextResponse("ok")
@app.post("/webhook/github")
async def github_webhook(
request: Request,
x_hub_signature_256: str = Header(None),
x_github_event: str = Header(None),
):
# Validate Bugzilla required env
if not BUGZILLA_PRODUCT or not BUGZILLA_VERSION:
raise HTTPException(status_code=500, detail="BUGZILLA_PRODUCT and BUGZILLA_VERSION must be set")
body = await request.body()
verify_signature(WEBHOOK_SECRET, body, x_hub_signature_256)
try:
payload = json.loads(body.decode("utf-8"))
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
if x_github_event != "pull_request":
return JSONResponse({"status": "ignored", "event": x_github_event})
action = payload.get("action")
if action not in ("opened", "reopened", "synchronize", "ready_for_review"):
return JSONResponse({"status": "ignored", "action": action})
repo_full = payload["repository"]["full_name"] # owner/repo
owner = payload["repository"]["owner"]["login"]
repo = payload["repository"]["name"]
pr = payload["pull_request"]
pr_num = pr["number"]
pr_url = pr["html_url"]
pr_title = pr.get("title") or ""
pr_body = pr.get("body") or ""
base = pr["base"]
head = pr["head"]
target_branch = base["ref"]
head_sha = head["sha"][:7]
gitea_repo_url = f"{GITEA_BASE_URL}/{GITEA_ORG}/{repo}"
pr_key = f"{repo_full}#{pr_num}"
bug_id = state.get_bug(pr_key)
# Acquire repo object only if we will write to GitHub
gh_repo = None
if not GITHUB_DRY_RUN:
try:
gh_repo = gh.get_repo(repo_full)
except Exception as ge:
logger.warning(f"Could not access GitHub repo {repo_full}: {ge}")
if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"):
# Create Bugzilla bug
summary = f"[GH PR #{pr_num}] {GITEA_ORG}/{repo}: {pr_title}"
description = (
"Source\n"
f"- Canonical repo (Gitea): {gitea_repo_url}\n"
f"- GitHub mirror PR: {pr_url}\n\n"
"Project policy\n"
"This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla.\n\n"
"Submitters notes\n"
f"{pr_body}\n"
)
try:
# Use the requested default component
bug_id = bz.create_bug(summary, description, component=BUGZILLA_COMPONENT_DEFAULT)
state.set_bug(pr_key, bug_id)
logger.info(f"Created Bugzilla bug {bug_id} for {pr_key}")
# Set cf_package to repo name, fallback to "---" if that fails
try:
bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: repo})
logger.info(f"Set {CF_PACKAGE_FIELD}={repo} on bug {bug_id}")
except Exception as e_cf:
logger.warning(f"Failed to set {CF_PACKAGE_FIELD} to '{repo}' on bug {bug_id}: {e_cf}; falling back to '{CF_PACKAGE_FALLBACK}'")
with contextlib.suppress(Exception):
bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: CF_PACKAGE_FALLBACK})
logger.info(f"Set {CF_PACKAGE_FIELD}={CF_PACKAGE_FALLBACK} on bug {bug_id}")
except Exception as e:
logger.error(f"Bugzilla create failed for {pr_key}: {e}")
# Post failure comment and keep PR open (best effort)
if not GITHUB_DRY_RUN and gh_repo is not None:
try:
gh_repo.get_pull(pr_num).create_issue_comment(pr_comment_failure(gitea_repo_url, repo, pr_url, target_branch))
if BUGZILLA_FAILURE_LABEL:
with contextlib.suppress(Exception):
gh_repo.create_label(BUGZILLA_FAILURE_LABEL, "ededed")
with contextlib.suppress(Exception):
gh_repo.get_issue(pr_num).add_to_labels(BUGZILLA_FAILURE_LABEL)
except Exception as ge:
logger.warning(f"Could not post failure comment on PR #{pr_num}: {ge}")
else:
logger.info(f"[GITHUB_DRY_RUN] Would comment failure on PR #{pr_num}, label={BUGZILLA_FAILURE_LABEL}")
return JSONResponse({"status": "bugzilla_failed", "action": action})
# Attach patch if enabled
if BUGZILLA_ATTACH_DIFF:
try:
patch = fetch_pr_patch(owner, repo, pr_num)
bz.add_attachment(
bug_id=bug_id,
filename=f"PR-{pr_num}-{head_sha}.patch",
summary=f"Patch for PR #{pr_num} ({head_sha})",
content_bytes=patch
)
except Exception as e:
logger.warning(f"Attach patch failed for bug {bug_id}: {e}")
# Comment and close PR
comment = pr_comment_success(gitea_repo_url, bug_id)
if not GITHUB_DRY_RUN and gh_repo is not None:
try:
pr_obj = gh_repo.get_pull(pr_num)
pr_obj.create_issue_comment(comment)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
except Exception as ge:
logger.warning(f"Could not comment/close PR #{pr_num}: {ge}")
else:
logger.info(f"[GITHUB_DRY_RUN] Would post success comment and close PR #{pr_num}")
return JSONResponse({"status": "ok", "bug_id": bug_id})
elif bug_id is not None and action == "synchronize":
# Attach updated patch and ensure PR closed
try:
if BUGZILLA_ATTACH_DIFF:
patch = fetch_pr_patch(owner, repo, pr_num)
bz.add_attachment(
bug_id=bug_id,
filename=f"PR-{pr_num}-{head_sha}.patch",
summary=f"Updated patch for PR #{pr_num} ({head_sha})",
content_bytes=patch
)
if not GITHUB_DRY_RUN and gh_repo is not None:
try:
pr_obj = gh_repo.get_pull(pr_num)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
except Exception as ge:
logger.warning(f"Could not ensure PR #{pr_num} closed: {ge}")
else:
logger.info(f"[GITHUB_DRY_RUN] Would ensure PR #{pr_num} closed")
return JSONResponse({"status": "ok", "bug_id": bug_id})
except Exception as e:
logger.error(f"Attach update failed for bug {bug_id}: {e}")
if not GITHUB_DRY_RUN and gh_repo is not None:
with contextlib.suppress(Exception):
pr_obj = gh_repo.get_pull(pr_num)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
else:
logger.info(f"[GITHUB_DRY_RUN] Would ensure PR #{pr_num} closed (after attach failure)")
return JSONResponse({"status": "attach_failed", "bug_id": bug_id})
return JSONResponse({"status": "noop"})