#!/usr/bin/env python3 import os import json import base64 import sqlite3 import logging import hmac import hashlib import contextlib from typing import Optional, Tuple, Dict, Any import httpx from fastapi import FastAPI, Header, HTTPException, Request from fastapi.responses import JSONResponse, PlainTextResponse from github import Github, Auth # ---------------- Logging (UTC) ---------------- class UTCFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): import datetime as dt from datetime import timezone ts = dt.datetime.fromtimestamp(record.created, tz=timezone.utc) return ts.isoformat(timespec="seconds").replace("+00:00", "Z") def configure_logging(): handler = logging.StreamHandler() handler.setFormatter(UTCFormatter("[%(asctime)s] %(levelname)s %(message)s")) root = logging.getLogger() root.handlers.clear() root.addHandler(handler) level = os.environ.get("LOG_LEVEL", "INFO").upper() root.setLevel(getattr(logging, level, logging.INFO)) configure_logging() logger = logging.getLogger(__name__) # ---------------- Config via env ---------------- GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "").strip() if not GITHUB_TOKEN: logger.warning("GITHUB_TOKEN is not set. You must set it unless GITHUB_DRY_RUN=true.") WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "").strip() if not WEBHOOK_SECRET: logger.warning("WEBHOOK_SECRET is not set. Webhook signature verification will fail.") # Canonical links GITEA_BASE_URL = os.environ.get("GITEA_BASE_URL", "https://src.koozali.org").rstrip("/") GITEA_ORG = os.environ.get("GITEA_ORG", "smeserver").strip() or "smeserver" # Bugzilla config BUGZILLA_BASE_URL = os.environ.get("BUGZILLA_BASE_URL", "https://bugs.koozali.org").rstrip("/") BUGZILLA_AUTH_MODE = os.environ.get("BUGZILLA_AUTH_MODE", "").strip().lower() # "api_key" or "basic" (or auto) BUGZILLA_API_KEY = os.environ.get("BUGZILLA_API_KEY", "").strip() BUGZILLA_USER = os.environ.get("BUGZILLA_USER", "").strip() BUGZILLA_PASSWORD = os.environ.get("BUGZILLA_PASSWORD", "").strip() # REQUIRED fields for bug creation BUGZILLA_PRODUCT = os.environ.get("BUGZILLA_PRODUCT", "").strip() BUGZILLA_VERSION = os.environ.get("BUGZILLA_VERSION", "").strip() # Component default (as requested) BUGZILLA_COMPONENT_DEFAULT = os.environ.get("BUGZILLA_COMPONENT_DEFAULT", "e-smith-*/smeserver-* packages").strip() # In case we still need a fallback name for unexpected errors BUGZILLA_COMPONENT_FALLBACK = os.environ.get("BUGZILLA_COMPONENT_FALLBACK", BUGZILLA_COMPONENT_DEFAULT).strip() # Custom field settings CF_PACKAGE_FIELD = os.environ.get("BUGZILLA_CF_PACKAGE_FIELD", "cf_package").strip() CF_PACKAGE_FALLBACK = os.environ.get("BUGZILLA_CF_PACKAGE_FALLBACK", "---").strip() # Behavior toggles BUGZILLA_ATTACH_DIFF = os.environ.get("BUGZILLA_ATTACH_DIFF", "true").strip().lower() == "true" BUGZILLA_FAILURE_LABEL = os.environ.get("BUGZILLA_FAILURE_LABEL", "bugzilla-needed").strip() GITHUB_DRY_RUN = os.environ.get("GITHUB_DRY_RUN", "false").strip().lower() == "true" if GITHUB_DRY_RUN: logger.info("GITHUB_DRY_RUN is enabled. The server will not comment/close PRs.") # SQLite for PR→Bug mapping STATE_PATH = os.environ.get("STATE_PATH", "./webhook_state.sqlite") # ---------------- State store ---------------- class State: def __init__(self, path: str): self.conn = sqlite3.connect(path, check_same_thread=False) self._init() def _init(self): c = self.conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS pr_map ( pr_key TEXT PRIMARY KEY, bug_id INTEGER NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL )""") self.conn.commit() def get_bug(self, pr_key: str) -> Optional[int]: c = self.conn.cursor() c.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,)) row = c.fetchone() return row[0] if row else None def set_bug(self, pr_key: str, bug_id: int): import datetime as dt from datetime import timezone now = dt.datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") c = self.conn.cursor() c.execute(""" INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at """, (pr_key, bug_id, now, now)) self.conn.commit() # ---------------- Bugzilla hybrid client ---------------- class BugzillaHybrid: def __init__(self): self.base = BUGZILLA_BASE_URL self.mode = None # "rest" or "jsonrpc" self.rest_base = None self.rpc_url = f"{self.base}/jsonrpc.cgi" self._token: Optional[str] = None # for basic via JSON-RPC self._detect_mode() # Log auth status if BUGZILLA_AUTH_MODE in ("api_key", "") and BUGZILLA_API_KEY: logger.info("Bugzilla auth: API key mode") elif BUGZILLA_AUTH_MODE == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD): logger.info("Bugzilla auth: basic/login (JSON-RPC token) mode") else: logger.warning("Bugzilla auth NOT configured yet (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD).") def _detect_mode(self): # Try REST at /rest then /rest.cgi try: r = httpx.get(f"{self.base}/rest/version", timeout=10) if r.status_code == 200: self.mode = "rest" self.rest_base = f"{self.base}/rest" logger.info("Bugzilla REST detected at /rest") return except Exception: pass try: r = httpx.get(f"{self.base}/rest.cgi/version", timeout=10) if r.status_code == 200: self.mode = "rest" self.rest_base = f"{self.base}/rest.cgi" logger.info("Bugzilla REST detected at /rest.cgi") return except Exception: pass # Fallback to JSON-RPC self.mode = "jsonrpc" logger.warning("Bugzilla REST not available; falling back to JSON-RPC") # ---------- REST helpers ---------- def _rest_headers(self) -> Dict[str, str]: h = {"Accept": "application/json", "Content-Type": "application/json"} # Prefer API key if available if BUGZILLA_API_KEY: h["X-BUGZILLA-API-KEY"] = BUGZILLA_API_KEY return h def _rest_auth(self) -> Optional[Tuple[str, str]]: if BUGZILLA_AUTH_MODE == "basic" and BUGZILLA_USER and BUGZILLA_PASSWORD: return (BUGZILLA_USER, BUGZILLA_PASSWORD) return None # ---------- JSON-RPC helpers ---------- def _rpc_call(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]: body = {"method": method, "params": [params_obj], "id": req_id} r = httpx.post(self.rpc_url, headers={"Content-Type": "application/json"}, json=body, timeout=60) r.raise_for_status() data = r.json() if data.get("error"): err = data["error"] raise RuntimeError(f"Bugzilla RPC error {err.get('code')}: {err.get('message')}") return data.get("result", {}) def _rpc(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]: # Decide auth automatically mode = BUGZILLA_AUTH_MODE if (mode in ("", "api_key") and BUGZILLA_API_KEY): params_obj = dict(params_obj, Bugzilla_api_key=BUGZILLA_API_KEY) elif (mode == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD)): if not self._token: self._token = self._login_rpc() params_obj = dict(params_obj, Bugzilla_token=self._token) else: raise RuntimeError("Bugzilla auth not configured (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD)") return self._rpc_call(method, params_obj, req_id=req_id) def _login_rpc(self) -> str: if not (BUGZILLA_USER and BUGZILLA_PASSWORD): raise RuntimeError("Bugzilla basic auth requires BUGZILLA_USER and BUGZILLA_PASSWORD") data = self._rpc_call("User.login", {"login": BUGZILLA_USER, "password": BUGZILLA_PASSWORD}, req_id=0) token = data.get("token") if not token: raise RuntimeError("Bugzilla login did not return a token") logger.info("Bugzilla JSON-RPC login succeeded") return token # ---------- Public API ---------- def create_bug(self, summary: str, description: str, component: str) -> int: if self.mode == "rest": try: url = f"{self.rest_base}/bug" payload = { "product": BUGZILLA_PRODUCT, "component": component or BUGZILLA_COMPONENT_DEFAULT, "version": BUGZILLA_VERSION, "summary": summary, "description": description, } r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60) if r.status_code in (404, 405): raise httpx.HTTPStatusError("REST create not available", request=r.request, response=r) r.raise_for_status() data = r.json() bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"]) if not bug_id: raise RuntimeError(f"REST create result missing id: {data}") return int(bug_id) except Exception as e: logger.warning(f"Bugzilla REST create failed, switching to JSON-RPC: {e}") self.mode = "jsonrpc" # JSON-RPC params = { "product": BUGZILLA_PRODUCT, "component": component or BUGZILLA_COMPONENT_DEFAULT, "version": BUGZILLA_VERSION, "summary": summary, "description": description, } res = self._rpc("Bug.create", params, req_id=1) bug_id = res.get("id") or (res.get("bugs") and res["bugs"][0]["id"]) if not bug_id: raise RuntimeError(f"RPC create result missing id: {res}") return int(bug_id) def add_attachment(self, bug_id: int, filename: str, summary: str, content_bytes: bytes): if self.mode == "rest": try: url = f"{self.rest_base}/bug/{bug_id}/attachment" payload = { "ids": [bug_id], "data": base64.b64encode(content_bytes).decode("ascii"), "file_name": filename, "summary": summary, "content_type": "text/x-patch", "is_patch": True, } r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=120) if r.status_code in (404, 405): raise httpx.HTTPStatusError("REST attachment not available", request=r.request, response=r) r.raise_for_status() return except Exception as e: logger.warning(f"Bugzilla REST add_attachment failed, switching to JSON-RPC: {e}") self.mode = "jsonrpc" params = { "ids": [bug_id], "data": base64.b64encode(content_bytes).decode("ascii"), "file_name": filename, "summary": summary, "content_type": "text/x-patch", "is_patch": True, } self._rpc("Bug.add_attachment", params, req_id=3) def add_comment(self, bug_id: int, comment: str): if self.mode == "rest": try: url = f"{self.rest_base}/bug/{bug_id}/comment" payload = {"comment": comment} r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60) if r.status_code in (404, 405): raise httpx.HTTPStatusError("REST add_comment not available", request=r.request, response=r) r.raise_for_status() return except Exception as e: logger.warning(f"Bugzilla REST add_comment failed, switching to JSON-RPC: {e}") self.mode = "jsonrpc" self._rpc("Bug.add_comment", {"id": bug_id, "comment": comment}, req_id=2) def update_bug_fields(self, bug_id: int, fields: Dict[str, Any]): # REST first if self.mode == "rest": try: url = f"{self.rest_base}/bug/{bug_id}" r = httpx.put(url, headers=self._rest_headers(), auth=self._rest_auth(), json=fields, timeout=60) if r.status_code in (404, 405): raise httpx.HTTPStatusError("REST update not available", request=r.request, response=r) r.raise_for_status() return except Exception as e: logger.warning(f"Bugzilla REST update failed, switching to JSON-RPC: {e}") self.mode = "jsonrpc" # JSON-RPC fallback params = {"ids": [bug_id]} params.update(fields) self._rpc("Bug.update", params, req_id=5) # ---------------- GitHub client (PyGithub) ---------------- gh = Github(auth=Auth.Token(GITHUB_TOKEN)) if GITHUB_TOKEN else Github() state = State(STATE_PATH) bz = BugzillaHybrid() app = FastAPI() # ---------------- Helpers ---------------- def verify_signature(secret: str, body: bytes, signature: str): if not signature: raise HTTPException(status_code=401, detail="Missing signature") expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest() if not hmac.compare_digest(expected, signature): raise HTTPException(status_code=401, detail="Invalid signature") def pr_comment_success(gitea_repo_url: str, bug_id: int) -> str: return ( "Thanks for the contribution!\n\n" "This repository is a read-only mirror of the canonical repo on Gitea:\n" f"- Canonical: {gitea_repo_url}\n" f"- Please file and discuss changes in Bugzilla: {BUGZILLA_BASE_URL}\n\n" f"We created Bug {bug_id} to track this proposal:\n" f"- {BUGZILLA_BASE_URL}/show_bug.cgi?id={bug_id}\n\n" "This pull request will be closed here. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n" ) def pr_comment_failure(gitea_repo_url: str, repo: str, pr_url: str, target_branch: str) -> str: # Uses default component name and required product/version in guidance return ( "Thanks for the contribution!\n\n" "This repository is a read-only mirror of the canonical repo on Gitea:\n" f"- Canonical: {gitea_repo_url}\n\n" "We were unable to create a Bugzilla ticket automatically at this time.\n" f"Please open a bug at {BUGZILLA_BASE_URL} (Product: {BUGZILLA_PRODUCT}, Version: {BUGZILLA_VERSION}, Component: {BUGZILLA_COMPONENT_DEFAULT}) and include:\n" f"- GitHub PR: {pr_url}\n" f"- Target branch: {target_branch}\n" "- Summary and rationale for the change\n\n" "This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n" ) def fetch_pr_patch(owner: str, repo: str, pr_num: int) -> bytes: url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_num}" headers = {"Accept": "application/vnd.github.v3.patch"} if GITHUB_TOKEN: headers["Authorization"] = f"token {GITHUB_TOKEN}" r = httpx.get(url, headers=headers, timeout=120) r.raise_for_status() return r.content # ---------------- Routes ---------------- @app.get("/healthz") async def healthz(): return PlainTextResponse("ok") @app.post("/webhook/github") async def github_webhook( request: Request, x_hub_signature_256: str = Header(None), x_github_event: str = Header(None), ): # Validate Bugzilla required env if not BUGZILLA_PRODUCT or not BUGZILLA_VERSION: raise HTTPException(status_code=500, detail="BUGZILLA_PRODUCT and BUGZILLA_VERSION must be set") body = await request.body() verify_signature(WEBHOOK_SECRET, body, x_hub_signature_256) try: payload = json.loads(body.decode("utf-8")) except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") if x_github_event != "pull_request": return JSONResponse({"status": "ignored", "event": x_github_event}) action = payload.get("action") if action not in ("opened", "reopened", "synchronize", "ready_for_review"): return JSONResponse({"status": "ignored", "action": action}) repo_full = payload["repository"]["full_name"] # owner/repo owner = payload["repository"]["owner"]["login"] repo = payload["repository"]["name"] pr = payload["pull_request"] pr_num = pr["number"] pr_url = pr["html_url"] pr_title = pr.get("title") or "" pr_body = pr.get("body") or "" base = pr["base"] head = pr["head"] target_branch = base["ref"] head_sha = head["sha"][:7] gitea_repo_url = f"{GITEA_BASE_URL}/{GITEA_ORG}/{repo}" pr_key = f"{repo_full}#{pr_num}" bug_id = state.get_bug(pr_key) # Acquire repo object only if we will write to GitHub gh_repo = None if not GITHUB_DRY_RUN: try: gh_repo = gh.get_repo(repo_full) except Exception as ge: logger.warning(f"Could not access GitHub repo {repo_full}: {ge}") if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"): # Create Bugzilla bug summary = f"[GH PR #{pr_num}] {GITEA_ORG}/{repo}: {pr_title}" description = ( "Source\n" f"- Canonical repo (Gitea): {gitea_repo_url}\n" f"- GitHub mirror PR: {pr_url}\n\n" "Project policy\n" "This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla.\n\n" "Submitter’s notes\n" f"{pr_body}\n" ) try: # Use the requested default component bug_id = bz.create_bug(summary, description, component=BUGZILLA_COMPONENT_DEFAULT) state.set_bug(pr_key, bug_id) logger.info(f"Created Bugzilla bug {bug_id} for {pr_key}") # Set cf_package to repo name, fallback to "---" if that fails try: bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: repo}) logger.info(f"Set {CF_PACKAGE_FIELD}={repo} on bug {bug_id}") except Exception as e_cf: logger.warning(f"Failed to set {CF_PACKAGE_FIELD} to '{repo}' on bug {bug_id}: {e_cf}; falling back to '{CF_PACKAGE_FALLBACK}'") with contextlib.suppress(Exception): bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: CF_PACKAGE_FALLBACK}) logger.info(f"Set {CF_PACKAGE_FIELD}={CF_PACKAGE_FALLBACK} on bug {bug_id}") except Exception as e: logger.error(f"Bugzilla create failed for {pr_key}: {e}") # Post failure comment and keep PR open (best effort) if not GITHUB_DRY_RUN and gh_repo is not None: try: gh_repo.get_pull(pr_num).create_issue_comment(pr_comment_failure(gitea_repo_url, repo, pr_url, target_branch)) if BUGZILLA_FAILURE_LABEL: with contextlib.suppress(Exception): gh_repo.create_label(BUGZILLA_FAILURE_LABEL, "ededed") with contextlib.suppress(Exception): gh_repo.get_issue(pr_num).add_to_labels(BUGZILLA_FAILURE_LABEL) except Exception as ge: logger.warning(f"Could not post failure comment on PR #{pr_num}: {ge}") else: logger.info(f"[GITHUB_DRY_RUN] Would comment failure on PR #{pr_num}, label={BUGZILLA_FAILURE_LABEL}") return JSONResponse({"status": "bugzilla_failed", "action": action}) # Attach patch if enabled if BUGZILLA_ATTACH_DIFF: try: patch = fetch_pr_patch(owner, repo, pr_num) bz.add_attachment( bug_id=bug_id, filename=f"PR-{pr_num}-{head_sha}.patch", summary=f"Patch for PR #{pr_num} ({head_sha})", content_bytes=patch ) except Exception as e: logger.warning(f"Attach patch failed for bug {bug_id}: {e}") # Comment and close PR comment = pr_comment_success(gitea_repo_url, bug_id) if not GITHUB_DRY_RUN and gh_repo is not None: try: pr_obj = gh_repo.get_pull(pr_num) pr_obj.create_issue_comment(comment) if pr_obj.state != "closed": pr_obj.edit(state="closed") except Exception as ge: logger.warning(f"Could not comment/close PR #{pr_num}: {ge}") else: logger.info(f"[GITHUB_DRY_RUN] Would post success comment and close PR #{pr_num}") return JSONResponse({"status": "ok", "bug_id": bug_id}) elif bug_id is not None and action == "synchronize": # Attach updated patch and ensure PR closed try: if BUGZILLA_ATTACH_DIFF: patch = fetch_pr_patch(owner, repo, pr_num) bz.add_attachment( bug_id=bug_id, filename=f"PR-{pr_num}-{head_sha}.patch", summary=f"Updated patch for PR #{pr_num} ({head_sha})", content_bytes=patch ) if not GITHUB_DRY_RUN and gh_repo is not None: try: pr_obj = gh_repo.get_pull(pr_num) if pr_obj.state != "closed": pr_obj.edit(state="closed") except Exception as ge: logger.warning(f"Could not ensure PR #{pr_num} closed: {ge}") else: logger.info(f"[GITHUB_DRY_RUN] Would ensure PR #{pr_num} closed") return JSONResponse({"status": "ok", "bug_id": bug_id}) except Exception as e: logger.error(f"Attach update failed for bug {bug_id}: {e}") if not GITHUB_DRY_RUN and gh_repo is not None: with contextlib.suppress(Exception): pr_obj = gh_repo.get_pull(pr_num) if pr_obj.state != "closed": pr_obj.edit(state="closed") else: logger.info(f"[GITHUB_DRY_RUN] Would ensure PR #{pr_num} closed (after attach failure)") return JSONResponse({"status": "attach_failed", "bug_id": bug_id}) return JSONResponse({"status": "noop"})