From 9e577a1abe4755ccb1399ed0cdfebcaa9871f02b Mon Sep 17 00:00:00 2001 From: Brian Read Date: Thu, 25 Sep 2025 12:41:31 +0200 Subject: [PATCH] Upload Enhanced webhook endpoint with gitea PR creation - not fully tested --- ...ea_pr_creation_not_really_fully _tested.py | 1026 +++++++++++++++++ 1 file changed, 1026 insertions(+) create mode 100644 TestPrograms/webhook_endpoint_with_gitea_pr_creation_not_really_fully _tested.py diff --git a/TestPrograms/webhook_endpoint_with_gitea_pr_creation_not_really_fully _tested.py b/TestPrograms/webhook_endpoint_with_gitea_pr_creation_not_really_fully _tested.py new file mode 100644 index 0000000..a6649d5 --- /dev/null +++ b/TestPrograms/webhook_endpoint_with_gitea_pr_creation_not_really_fully _tested.py @@ -0,0 +1,1026 @@ +#!/usr/bin/env python3 +import os +import json +import base64 +import sqlite3 +import logging +import hmac +import hashlib +import contextlib +import subprocess +from typing import Optional, Tuple, Dict, Any, List + +import httpx +from fastapi import FastAPI, Header, HTTPException, Request +from fastapi.responses import JSONResponse, PlainTextResponse +from github import Github, Auth +from pathlib import Path +from urllib.parse import quote as urlquote + +# ---------------- .env auto-loader ---------------- + +def _load_env_file(): + from os import environ + from pathlib import Path as _P + candidate_paths = [] + user_spec = environ.get("ENV_FILE") or environ.get("DOTENV_PATH") + if user_spec: + candidate_paths.append(_P(user_spec)) + here = _P(__file__).resolve().parent + candidate_paths += [here / ".env", Path.cwd() / ".env"] + + override = str(environ.get("ENV_OVERRIDE", "false")).strip().lower() == "true" + env_path = next((p for p in candidate_paths if p and p.is_file()), None) + if not env_path: + return + try: + from dotenv import load_dotenv # type: ignore + load_dotenv(dotenv_path=str(env_path), override=override, interpolate=True) + except Exception: + try: + with env_path.open("r", encoding="utf-8") as f: + for raw in f: + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, val = line.split("=", 1) + key = key.strip() + val = val.strip() + if (val[:1], val[-1:]) in {('"', '"'), ("'", "'")}: + val = val[1:-1] + if override or key not in os.environ: + os.environ[key] = val + except Exception: + pass + +# ---------------- Logging (UTC) ---------------- + +class UTCFormatter(logging.Formatter): + def formatTime(self, record, datefmt=None): + import datetime as dt + from datetime import timezone + ts = dt.datetime.fromtimestamp(record.created, tz=timezone.utc) + return ts.isoformat(timespec="seconds").replace("+00:00", "Z") + +def configure_logging(): + handler = logging.StreamHandler() + handler.setFormatter(UTCFormatter("[%(asctime)s] %(levelname)s %(message)s")) + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + level = os.environ.get("LOG_LEVEL", "INFO").upper() + root.setLevel(getattr(logging, level, logging.INFO)) + +_load_env_file() +configure_logging() +logger = logging.getLogger(__name__) + +# ---------------- Config via env ---------------- + +GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "").strip() +if not GITHUB_TOKEN: + logger.warning("GITHUB_TOKEN is not set. You must set it unless GITHUB_DRY_RUN=true.") + +WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "").strip() +if not WEBHOOK_SECRET: + logger.warning("WEBHOOK_SECRET is not set. Webhook signature verification will fail.") + +# Canonical links +GITEA_BASE_URL = os.environ.get("GITEA_BASE_URL", "http://gitea.bjsystems.co.uk").rstrip("/") +GITEA_ORG = os.environ.get("GITEA_ORG", "brianr").strip() or "brianr" + +# Bugzilla config +BUGZILLA_BASE_URL = os.environ.get("BUGZILLA_BASE_URL", "https://bugs.koozali.org").rstrip("/") +BUGZILLA_AUTH_MODE = os.environ.get("BUGZILLA_AUTH_MODE", "").strip().lower() # "api_key" or "basic" (or auto) +BUGZILLA_API_KEY = os.environ.get("BUGZILLA_API_KEY", "").strip() +BUGZILLA_USER = os.environ.get("BUGZILLA_USER", "").strip() +BUGZILLA_PASSWORD = os.environ.get("BUGZILLA_PASSWORD", "").strip() + +# REQUIRED fields +BUGZILLA_PRODUCT = os.environ.get("BUGZILLA_PRODUCT", "").strip() +BUGZILLA_VERSION = os.environ.get("BUGZILLA_VERSION", "").strip() + +# Component default +BUGZILLA_COMPONENT_DEFAULT = os.environ.get("BUGZILLA_COMPONENT_DEFAULT", "e-smith-*/smeserver-* packages").strip() +BUGZILLA_COMPONENT_FALLBACK = os.environ.get("BUGZILLA_COMPONENT_FALLBACK", BUGZILLA_COMPONENT_DEFAULT).strip() + +# Custom field settings +CF_PACKAGE_FIELD = os.environ.get("BUGZILLA_CF_PACKAGE_FIELD", "cf_package").strip() +CF_PACKAGE_FALLBACK = os.environ.get("BUGZILLA_CF_PACKAGE_FALLBACK", "---").strip() + +# CC behavior +BUGZILLA_CC_ENABLE = os.environ.get("BUGZILLA_CC_ENABLE", "true").strip().lower() == "true" +BUGZILLA_CC_ALLOW_NOREPLY = os.environ.get("BUGZILLA_CC_ALLOW_NOREPLY", "false").strip().lower() == "true" +BUGZILLA_CC_FALLBACK_COMMENT = os.environ.get("BUGZILLA_CC_FALLBACK_COMMENT", "true").strip().lower() == "true" +AUTHOR_EMAIL_MARKER = os.environ.get("BUGZILLA_AUTHOR_EMAIL_MARKER", "[mirror-bot:author-email]").strip() + +# Behavior toggles +BUGZILLA_ATTACH_DIFF = os.environ.get("BUGZILLA_ATTACH_DIFF", "true").strip().lower() == "true" +BUGZILLA_FAILURE_LABEL = os.environ.get("BUGZILLA_FAILURE_LABEL", "bugzilla-needed").strip() +GITHUB_DRY_RUN = os.environ.get("GITHUB_DRY_RUN", "false").strip().lower() == "true" +if GITHUB_DRY_RUN: + logger.info("GITHUB_DRY_RUN is enabled. The server will not comment/close PRs.") + +# SQLite for PR→Bug mapping +STATE_PATH = os.environ.get("STATE_PATH", "./webhook_state.sqlite") + +# Gitea PR mirroring +GITEA_MIRROR_PR_ENABLE = os.environ.get("GITEA_MIRROR_PR_ENABLE", "false").strip().lower() == "true" +GIT_CACHE_DIR = os.environ.get("GIT_CACHE_DIR", "./git-cache") +GITEA_API_TOKEN = os.environ.get("GITEA_API_TOKEN", os.environ.get("GITEA_TOKEN", "")).strip() +GITEA_PUSH_USER = os.environ.get("GITEA_PUSH_USER", "").strip() +GITEA_PUSH_TOKEN = os.environ.get("GITEA_PUSH_TOKEN", os.environ.get("GITEA_API_TOKEN", GITEA_API_TOKEN)).strip() +GITEA_PR_BRANCH_TEMPLATE = os.environ.get("GITEA_PR_BRANCH_TEMPLATE", "pr/github/{pr_number}").strip() +GITEA_LABEL_FROM_GH = os.environ.get("GITEA_LABEL_FROM_GH", "from-github").strip() + +# ---------------- State store ---------------- + +class State: + def __init__(self, path: str): + self.conn = sqlite3.connect(path, check_same_thread=False) + self._init() + + def _init(self): + c = self.conn.cursor() + c.execute(""" + CREATE TABLE IF NOT EXISTS pr_map ( + pr_key TEXT PRIMARY KEY, + bug_id INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + )""") + self.conn.commit() + + def get_bug(self, pr_key: str) -> Optional[int]: + c = self.conn.cursor() + c.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,)) + row = c.fetchone() + return row[0] if row else None + + def set_bug(self, pr_key: str, bug_id: int): + import datetime as dt + from datetime import timezone + now = dt.datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + c = self.conn.cursor() + c.execute(""" + INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at + """, (pr_key, bug_id, now, now)) + self.conn.commit() + +# ---------------- Gitea access (git + API) ---------------- + +def _run_git(cmd: list[str], cwd: Optional[str] = None) -> tuple[int, str, str]: + logger.debug(f"[git] {' '.join(cmd)} (cwd={cwd or os.getcwd()})") + p = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + out, err = p.communicate() + if p.returncode != 0: + logger.warning(f"[git] failed rc={p.returncode} cmd={' '.join(cmd)} stderr={err.strip()}") + return p.returncode, out, err + +class GitWorker: + def __init__(self, cache_dir: str): + self.cache_dir = cache_dir + os.makedirs(cache_dir, exist_ok=True) + + def repo_path(self, owner: str, repo: str) -> str: + safe = f"{owner}--{repo}.git" + return os.path.join(self.cache_dir, safe) + + def ensure_bare_repo(self, owner: str, repo: str) -> str: + path = self.repo_path(owner, repo) + if not os.path.isdir(path): + rc, _, err = _run_git(["git", "init", "--bare", path]) + if rc != 0: + raise RuntimeError(f"git init --bare failed: {err}") + return path + + def set_remote(self, path: str, name: str, url: str): + rc, out, err = _run_git(["git", "remote"], cwd=path) + if rc != 0: + raise RuntimeError("git remote failed") + remotes = {r.strip() for r in out.strip().splitlines() if r.strip()} + if name in remotes: + _run_git(["git", "remote", "set-url", name, url], cwd=path) + else: + rc, _, err = _run_git(["git", "remote", "add", name, url], cwd=path) + if rc != 0: + raise RuntimeError(f"git remote add {name} failed: {err}") + + def fetch_pr_head(self, path: str, gh_url: str, pr_number: int): + self.set_remote(path, "gh", gh_url) + refspec = f"refs/pull/{pr_number}/head:refs/remotes/gh/pr/{pr_number}" + rc, _, err = _run_git(["git", "fetch", "--force", "gh", refspec], cwd=path) + if rc != 0: + raise RuntimeError(f"git fetch PR {pr_number} failed: {err}") + + def push_to_gitea_branch(self, path: str, gitea_url: str, pr_number: int, target_branch: str): + self.set_remote(path, "gitea", gitea_url) + src = f"refs/remotes/gh/pr/{pr_number}" + dst = f"refs/heads/{target_branch}" + rc, _, err = _run_git(["git", "push", "--force", "gitea", f"+{src}:{dst}"], cwd=path) + if rc != 0: + raise RuntimeError(f"git push to Gitea failed: {err}") + +class GiteaAPI: + def __init__(self, base_url: str, token: str): + self.base = base_url.rstrip("/") + self.token = token + + def _headers(self) -> dict: + return {"Authorization": f"token {self.token}"} if self.token else {} + + def get_repo(self, owner: str, repo: str) -> dict: + r = httpx.get(f"{self.base}/api/v1/repos/{owner}/{repo}", headers=self._headers(), timeout=20) + r.raise_for_status() + return r.json() + + def list_pulls(self, owner: str, repo: str, state: str = "open") -> List[dict]: + r = httpx.get(f"{self.base}/api/v1/repos/{owner}/{repo}/pulls", params={"state": state}, headers=self._headers(), timeout=20) + r.raise_for_status() + return r.json() + + def create_pull(self, owner: str, repo: str, title: str, body: str, head: str, base: str) -> dict: + payload = {"title": title, "body": body, "head": head, "base": base} + r = httpx.post(f"{self.base}/api/v1/repos/{owner}/{repo}/pulls", headers=self._headers(), json=payload, timeout=30) + if r.status_code in (200, 201): + return r.json() + logger.warning(f"Gitea create PR returned {r.status_code}: {r.text[:200]}") + r.raise_for_status() + return r.json() + + def comment_on_issue(self, owner: str, repo: str, pr_index: int, text: str): + payload = {"body": text} + r = httpx.post(f"{self.base}/api/v1/repos/{owner}/{repo}/issues/{pr_index}/comments", + headers=self._headers(), json=payload, timeout=20) + r.raise_for_status() + return r.json() + + def add_labels(self, owner: str, repo: str, pr_index: int, labels: list[str]): + if not labels: + return + payload = {"labels": labels} + r = httpx.post(f"{self.base}/api/v1/repos/{owner}/{repo}/issues/{pr_index}/labels", + headers=self._headers(), json=payload, timeout=20) + if r.status_code not in (200, 201): + logger.debug(f"Gitea add labels returned {r.status_code}: {r.text[:200]}") + +class PRMirror: + def __init__(self, gitea_api: GiteaAPI, git: GitWorker, gitea_org: str, gitea_base: str): + self.gitea = gitea_api + self.git = git + self.org = gitea_org + self.base_url = gitea_base + + def _build_gh_url(self, owner: str, repo: str) -> str: + if GITHUB_TOKEN: + return f"https://{urlquote(GITHUB_TOKEN)}:x-oauth-basic@github.com/{owner}/{repo}.git" + return f"https://github.com/{owner}/{repo}.git" + + def _build_gitea_push_url(self, org: str, repo: str) -> str: + if GITEA_PUSH_TOKEN: + user = urlquote(GITEA_PUSH_USER or "git") + pwd = urlquote(GITEA_PUSH_TOKEN) + return f"{self.base_url}/{org}/{repo}.git".replace("://", f"://{user}:{pwd}@") + return f"{self.base_url}/{org}/{repo}.git" + + def mirror(self, gh_owner: str, gh_repo: str, pr_number: int, pr_title: str, pr_body: str) -> Optional[int]: + try: + repo_meta = self.gitea.get_repo(self.org, gh_repo) + default_branch = repo_meta.get("default_branch") or "master" + path = self.git.ensure_bare_repo(gh_owner, gh_repo) + gh_url = self._build_gh_url(gh_owner, gh_repo) + red_gh = gh_url.split("@")[-1] if "@" in gh_url else gh_url + logger.debug(f"[mirror] fetch GH {red_gh} PR#{pr_number} into {path}") + self.git.fetch_pr_head(path, gh_url, pr_number) + target_branch = GITEA_PR_BRANCH_TEMPLATE.format(pr_number=pr_number) + gitea_push = self._build_gitea_push_url(self.org, gh_repo) + red_ge = gitea_push.split("@")[-1] if "@" in gitea_push else gitea_push + logger.debug(f"[mirror] push to Gitea {red_ge} -> {target_branch}") + self.git.push_to_gitea_branch(path, gitea_push, pr_number, target_branch) + # Locate existing PR + try: + pulls = self.gitea.list_pulls(self.org, gh_repo, state="open") + for p in pulls: + hb = p.get("head") or {} + head_ref = hb.get("ref") or p.get("head_branch") or "" + if head_ref == target_branch: + pr_index = p.get("number") or p.get("index") + logger.info(f"Gitea PR already exists (#{pr_index}) for branch {target_branch}") + return int(pr_index) if pr_index else None + except Exception as e: + logger.debug(f"List pulls failed or not supported: {e}") + body = f"{pr_body or ''}\n\nMirrored from GitHub PR #{pr_number}: https://github.com/{gh_owner}/{gh_repo}/pull/{pr_number}" + created = self.gitea.create_pull(self.org, gh_repo, pr_title or f"Mirror of GH PR #{pr_number}", + body, head=target_branch, base=default_branch) + pr_index = created.get("number") or created.get("index") + if pr_index: + with contextlib.suppress(Exception): + self.gitea.add_labels(self.org, gh_repo, int(pr_index), [GITEA_LABEL_FROM_GH]) + return int(pr_index) + return None + except Exception as e: + logger.warning(f"PR mirroring failed for {gh_owner}/{gh_repo}#{pr_number}: {e}") + return None + +pr_mirror = PRMirror(GiteaAPI(GITEA_BASE_URL, GITEA_API_TOKEN), GitWorker(GIT_CACHE_DIR), GITEA_ORG, GITEA_BASE_URL) if GITEA_MIRROR_PR_ENABLE else None +logger.debug(f"PR mirroring {'enabled' if pr_mirror else 'disabled'}") + +# ---------------- Bugzilla hybrid client ---------------- + +class BugzillaHybrid: + def __init__(self): + self.base = BUGZILLA_BASE_URL + self.mode = None + self.rest_base = None + self.rpc_url = f"{self.base}/jsonrpc.cgi" + self._token: Optional[str] = None + self._detect_mode() + if BUGZILLA_AUTH_MODE in ("api_key", "") and BUGZILLA_API_KEY: + logger.info("Bugzilla auth: API key mode") + elif BUGZILLA_AUTH_MODE == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD): + logger.info("Bugzilla auth: basic/login (JSON-RPC token) mode") + else: + logger.warning("Bugzilla auth NOT configured yet (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD).") + + def _detect_mode(self): + try: + r = httpx.get(f"{self.base}/rest/version", timeout=10) + if r.status_code == 200: + self.mode = "rest" + self.rest_base = f"{self.base}/rest" + logger.info("Bugzilla REST detected at /rest") + return + except Exception: + pass + try: + r = httpx.get(f"{self.base}/rest.cgi/version", timeout=10) + if r.status_code == 200: + self.mode = "rest" + self.rest_base = f"{self.base}/rest.cgi" + logger.info("Bugzilla REST detected at /rest.cgi") + return + except Exception: + pass + self.mode = "jsonrpc" + logger.warning("Bugzilla REST not available; falling back to JSON-RPC") + + def _rest_headers(self) -> Dict[str, str]: + h = {"Accept": "application/json", "Content-Type": "application/json"} + if BUGZILLA_API_KEY: + h["X-BUGZILLA-API-KEY"] = BUGZILLA_API_KEY + return h + + def _rest_auth(self) -> Optional[Tuple[str, str]]: + if BUGZILLA_AUTH_MODE == "basic" and BUGZILLA_USER and BUGZILLA_PASSWORD: + return (BUGZILLA_USER, BUGZILLA_PASSWORD) + return None + + def _rpc_call(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]: + body = {"method": method, "params": [params_obj], "id": req_id} + r = httpx.post(self.rpc_url, headers={"Content-Type": "application/json"}, json=body, timeout=60) + r.raise_for_status() + data = r.json() + if data.get("error"): + err = data["error"] + raise RuntimeError(f"Bugzilla RPC error {err.get('code')}: {err.get('message')}") + return data.get("result", {}) + + def _rpc(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]: + mode = BUGZILLA_AUTH_MODE + if (mode in ("", "api_key") and BUGZILLA_API_KEY): + params_obj = dict(params_obj, Bugzilla_api_key=BUGZILLA_API_KEY) + elif (mode == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD)): + if not self._token: + self._token = self._login_rpc() + params_obj = dict(params_obj, Bugzilla_token=self._token) + else: + raise RuntimeError("Bugzilla auth not configured (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD)") + return self._rpc_call(method, params_obj, req_id=req_id) + + def _login_rpc(self) -> str: + if not (BUGZILLA_USER and BUGZILLA_PASSWORD): + raise RuntimeError("Bugzilla basic auth requires BUGZILLA_USER and BUGZILLA_PASSWORD") + data = self._rpc_call("User.login", {"login": BUGZILLA_USER, "password": BUGZILLA_PASSWORD}, req_id=0) + token = data.get("token") + if not token: + raise RuntimeError("Bugzilla login did not return a token") + logger.info("Bugzilla JSON-RPC login succeeded") + return token + + def create_bug(self, summary: str, description: str, component: str) -> int: + if self.mode == "rest": + try: + url = f"{self.rest_base}/bug" + payload = { + "product": BUGZILLA_PRODUCT, + "component": component or BUGZILLA_COMPONENT_DEFAULT, + "version": BUGZILLA_VERSION, + "summary": summary, + "description": description, + } + r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60) + if r.status_code in (404, 405): + raise httpx.HTTPStatusError("REST create not available", request=r.request, response=r) + r.raise_for_status() + data = r.json() + bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"]) + if not bug_id: + raise RuntimeError(f"REST create result missing id: {data}") + return int(bug_id) + except Exception as e: + logger.warning(f"Bugzilla REST create failed, switching to JSON-RPC: {e}") + self.mode = "jsonrpc" + params = { + "product": BUGZILLA_PRODUCT, + "component": component or BUGZILLA_COMPONENT_DEFAULT, + "version": BUGZILLA_VERSION, + "summary": summary, + "description": description, + } + res = self._rpc("Bug.create", params, req_id=1) + bug_id = res.get("id") or (res.get("bugs") and res["bugs"][0]["id"]) + if not bug_id: + raise RuntimeError(f"RPC create result missing id: {res}") + return int(bug_id) + + def add_attachment(self, bug_id: int, filename: str, summary: str, content_bytes: bytes): + if self.mode == "rest": + try: + url = f"{self.rest_base}/bug/{bug_id}/attachment" + payload = { + "ids": [bug_id], + "data": base64.b64encode(content_bytes).decode("ascii"), + "file_name": filename, + "summary": summary, + "content_type": "text/x-patch", + "is_patch": True, + } + r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=120) + if r.status_code in (404, 405): + raise httpx.HTTPStatusError("REST attachment not available", request=r.request, response=r) + r.raise_for_status() + return + except Exception as e: + logger.warning(f"Bugzilla REST add_attachment failed, switching to JSON-RPC: {e}") + self.mode = "jsonrpc" + params = { + "ids": [bug_id], + "data": base64.b64encode(content_bytes).decode("ascii"), + "file_name": filename, + "summary": summary, + "content_type": "text/x-patch", + "is_patch": True, + } + self._rpc("Bug.add_attachment", params, req_id=3) + + def add_comment(self, bug_id: int, comment: str): + if self.mode == "rest": + try: + url = f"{self.rest_base}/bug/{bug_id}/comment" + payload = {"comment": comment} + r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60) + if r.status_code in (404, 405): + raise httpx.HTTPStatusError("REST add_comment not available", request=r.request, response=r) + r.raise_for_status() + return + except Exception as e: + logger.warning(f"Bugzilla REST add_comment failed, switching to JSON-RPC: {e}") + self.mode = "jsonrpc" + self._rpc("Bug.add_comment", {"id": bug_id, "comment": comment}, req_id=2) + + def update_bug_fields(self, bug_id: int, fields: Dict[str, Any]): + if self.mode == "rest": + try: + url = f"{self.rest_base}/bug/{bug_id}" + r = httpx.put(url, headers=self._rest_headers(), auth=self._rest_auth(), json=fields, timeout=60) + if r.status_code in (404, 405): + raise httpx.HTTPStatusError("REST update not available", request=r.request, response=r) + r.raise_for_status() + return + except Exception as e: + logger.warning(f"Bugzilla REST update failed, switching to JSON-RPC: {e}") + self.mode = "jsonrpc" + params = {"ids": [bug_id]} + params.update(fields) + self._rpc("Bug.update", params, req_id=5) + + def add_cc(self, bug_id: int, emails: List[str]): + if not emails: + return + if self.mode == "rest": + try: + url = f"{self.rest_base}/bug/{bug_id}" + payload = {"cc": {"add": emails}} + r = httpx.put(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=30) + if r.status_code in (404, 405): + raise httpx.HTTPStatusError("REST update not available", request=r.request, response=r) + r.raise_for_status() + return + except Exception as e: + logger.warning(f"Bugzilla REST add_cc failed, switching to JSON-RPC: {e}") + self.mode = "jsonrpc" + params = {"ids": [bug_id], "cc": {"add": emails}} + self._rpc("Bug.update", params, req_id=6) + + def list_comments(self, bug_id: int) -> List[str]: + texts: List[str] = [] + if self.mode == "rest": + try: + url = f"{self.rest_base}/bug/{bug_id}/comment" + r = httpx.get(url, headers=self._rest_headers(), auth=self._rest_auth(), timeout=30) + r.raise_for_status() + data = r.json() + bugs = data.get("bugs") or {} + key = str(bug_id) + if key in bugs: + for c in (bugs[key].get("comments") or []): + txt = c.get("text") + if isinstance(txt, str): + texts.append(txt) + return texts + except Exception as e: + logger.warning(f"Bugzilla REST list_comments failed, switching to JSON-RPC: {e}") + self.mode = "jsonrpc" + res = self._rpc("Bug.comments", {"ids": [bug_id]}, req_id=7) + bugs = res.get("bugs") or {} + key = str(bug_id) + if key in bugs: + for c in (bugs[key].get("comments") or []): + txt = c.get("text") + if isinstance(txt, str): + texts.append(txt) + return texts + + def add_or_update_author_email_note(self, bug_id: int, email: str): + try: + existing = self.list_comments(bug_id) + except Exception as e: + logger.warning(f"Could not list comments for bug {bug_id}: {e}") + existing = [] + marker_line = f"{AUTHOR_EMAIL_MARKER} PR author email could not be added to CC (not registered): {email}" + for text in existing: + if AUTHOR_EMAIL_MARKER in text and email in text: + return + self.add_comment(bug_id, marker_line) + +# ---------------- GitHub client ---------------- + +gh = Github(auth=Auth.Token(GITHUB_TOKEN)) if GITHUB_TOKEN else Github() +state = State(STATE_PATH) +bz = BugzillaHybrid() +app = FastAPI() + +# ---------------- Helpers ---------------- + +def verify_signature(secret: str, body: bytes, signature: str): + if not signature: + raise HTTPException(status_code=401, detail="Missing signature") + expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest() + if not hmac.compare_digest(expected, signature): + raise HTTPException(status_code=401, detail="Invalid signature") + +def pr_comment_success(gitea_repo_url: str, bug_id: int) -> str: + return ( + "Thanks for the contribution!\n\n" + "This repository is a read-only mirror of the canonical repo on Gitea:\n" + f"- Canonical: {gitea_repo_url}\n" + f"- Please file and discuss changes in Bugzilla: {BUGZILLA_BASE_URL}\n\n" + f"We created Bug {bug_id} to track this proposal:\n" + f"- {BUGZILLA_BASE_URL}/show_bug.cgi?id={bug_id}\n\n" + "This pull request will be closed here. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n" + ) + +def pr_comment_failure(gitea_repo_url: str, repo: str, pr_url: str, target_branch: str) -> str: + return ( + "Thanks for the contribution!\n\n" + "This repository is a read-only mirror of the canonical repo on Gitea:\n" + f"- Canonical: {gitea_repo_url}\n\n" + "We were unable to create a Bugzilla ticket automatically at this time.\n" + f"Please open a bug at {BUGZILLA_BASE_URL} (Product: {BUGZILLA_PRODUCT}, Version: {BUGZILLA_VERSION}, Component: {BUGZILLA_COMPONENT_DEFAULT}) and include:\n" + f"- GitHub PR: {pr_url}\n" + f"- Target branch: {target_branch}\n" + "- Summary and rationale for the change\n\n" + "This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n" + ) + +def fetch_pr_patch(owner: str, repo: str, pr_num: int) -> bytes: + url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_num}" + headers = {"Accept": "application/vnd.github.v3.patch"} + if GITHUB_TOKEN: + headers["Authorization"] = f"token {GITHUB_TOKEN}" + r = httpx.get(url, headers=headers, timeout=120) + r.raise_for_status() + return r.content + +def derive_pr_author_email(owner: str, repo: str, pr_num: int, head_sha: str) -> Optional[str]: + try: + url = f"https://api.github.com/repos/{owner}/{repo}/commits/{head_sha}" + headers = {"Accept": "application/vnd.github+json"} + if GITHUB_TOKEN: + headers["Authorization"] = f"token {GITHUB_TOKEN}" + r = httpx.get(url, headers=headers, timeout=20) + if r.status_code == 200: + data = r.json() + email = (data.get("commit") or {}).get("author", {}).get("email") + if email: + return email + except Exception: + pass + try: + url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_num}/commits" + headers = {"Accept": "application/vnd.github+json"} + if GITHUB_TOKEN: + headers["Authorization"] = f"token {GITHUB_TOKEN}" + r = httpx.get(url, headers=headers, timeout=20) + if r.status_code == 200: + commits = r.json() + if commits: + email = (commits[0].get("commit") or {}).get("author", {}).get("email") + if email: + return email + except Exception: + pass + try: + patch = fetch_pr_patch(owner, repo, pr_num).decode("utf-8", "ignore") + for line in patch.splitlines(): + if line.startswith("From: ") and "<" in line and ">" in line: + cand = line[line.find("<")+1:line.find(">", line.find("<")+1)].strip() + if "@" in cand: + return cand + break + except Exception: + pass + return None + +# ---------------- Healthz ---------------- + +@app.get("/healthz") +async def healthz(): + github = {"ok": False} + bugzilla = {"ok": False} + gitea = {"ok": False} + + # GitHub + try: + gh_headers = {"Accept": "application/vnd.github+json"} + if GITHUB_TOKEN: + gh_headers["Authorization"] = f"token {GITHUB_TOKEN}" + r = httpx.get("https://api.github.com/rate_limit", headers=gh_headers, timeout=5) + github["http_status"] = r.status_code + if r.status_code == 200: + data = r.json() or {} + core = (data.get("resources") or {}).get("core") or {} + github.update({"remaining": core.get("remaining"), "reset": core.get("reset")}) + scopes_missing = [] + login = None + try: + ru = httpx.get("https://api.github.com/user", headers=gh_headers, timeout=5) + github["user_http_status"] = ru.status_code + if ru.status_code == 200: + login = (ru.json() or {}).get("login") + scopes_hdr = ru.headers.get("X-OAuth-Scopes") or r.headers.get("X-OAuth-Scopes") or "" + scopes = [s.strip() for s in scopes_hdr.split(",") if s.strip()] + required = ["repo"] + ([] if GITHUB_DRY_RUN else ["admin:repo_hook"]) + scopes_missing = [s for s in required if s not in scopes] if scopes else [] + github.update({"login": login, "scopes": scopes, "required_scopes": required, "missing_scopes": scopes_missing}) + except Exception as e: + github["user_error"] = str(e)[:200] + github["ok"] = (r.status_code == 200) and (GITHUB_DRY_RUN or (login is not None)) + if not GITHUB_DRY_RUN and scopes_missing: + github["ok"] = False + else: + github["error"] = r.text[:200] + except Exception as e: + github["error"] = str(e)[:200] + + # Bugzilla + try: + bz_mode = getattr(bz, "mode", None) + bugzilla["mode"] = bz_mode or "unknown" + if bz_mode == "rest" and getattr(bz, "rest_base", None): + rv = httpx.get(f"{bz.rest_base}/version", headers=bz._rest_headers(), auth=bz._rest_auth(), timeout=5) + bugzilla["http_status"] = rv.status_code + if rv.status_code == 200: + bugzilla["version"] = (rv.json() or {}).get("version") + bugzilla["ok"] = True + else: + bugzilla["error"] = rv.text[:200] + else: + payload = {"method": "Bugzilla.version", "params": [{}], "id": 99} + if BUGZILLA_API_KEY: + payload["params"][0]["Bugzilla_api_key"] = BUGZILLA_API_KEY + rv = httpx.post(bz.rpc_url, headers={"Content-Type": "application/json"}, json=payload, timeout=5) + bugzilla["http_status"] = rv.status_code + if rv.status_code == 200 and not (rv.json() or {}).get("error"): + bugzilla["version"] = (rv.json().get("result") or {}).get("version") + bugzilla["ok"] = True + else: + bugzilla["error"] = (rv.text if rv.status_code != 200 else str(rv.json().get("error")))[:200] + bugzilla["product"] = BUGZILLA_PRODUCT + bugzilla["version_field"] = BUGZILLA_VERSION + bugzilla["auth_mode"] = (BUGZILLA_AUTH_MODE or ("api_key" if BUGZILLA_API_KEY else "basic" if (BUGZILLA_USER and BUGZILLA_PASSWORD) else "unset")) + bugzilla["creds_present"] = bool(BUGZILLA_API_KEY or (BUGZILLA_USER and BUGZILLA_PASSWORD)) + try: + payload = {"method": "Product.get", "params": [{"names": [BUGZILLA_PRODUCT]}], "id": 100} + if BUGZILLA_API_KEY: + payload["params"][0]["Bugzilla_api_key"] = BUGZILLA_API_KEY + rp = httpx.post(bz.rpc_url, headers={"Content-Type": "application/json"}, json=payload, timeout=5) + if rp.status_code == 200 and not (rp.json() or {}).get("error"): + bugzilla["product_ok"] = True + else: + bugzilla["product_ok"] = False + bugzilla["product_error"] = (rp.text if rp.status_code != 200 else str(rp.json().get("error")))[:200] + except Exception as e: + bugzilla["product_error"] = str(e)[:200] + except Exception as e: + bugzilla["error"] = str(e)[:200] + + # Gitea + try: + gitea = {"ok": False, "base_url": GITEA_BASE_URL} + rv = httpx.get(f"{GITEA_BASE_URL.rstrip('/')}/api/v1/version", timeout=5) + gitea["http_status"] = rv.status_code + if rv.status_code == 200: + gitea["version"] = (rv.json() or {}).get("version") or rv.text.strip() + auth_ok = False + user_login = None + try: + ru = httpx.get(f"{GITEA_BASE_URL.rstrip('/')}/api/v1/user", + headers={"Authorization": f"token {GITEA_API_TOKEN}"} if GITEA_API_TOKEN else {}, + timeout=5) + gitea["user_http_status"] = ru.status_code + if ru.status_code == 200: + user_login = (ru.json() or {}).get("login") + auth_ok = True + else: + gitea["user_error"] = ru.text[:200] + except Exception as e: + gitea["user_error"] = str(e)[:200] + gitea["login"] = user_login + gitea["token_present"] = bool(GITEA_API_TOKEN) + health_repo = os.environ.get("GITEA_HEALTH_REPO", "").strip() + if GITEA_ORG and health_repo: + try: + rr = httpx.get(f"{GITEA_BASE_URL.rstrip('/')}/api/v1/repos/{GITEA_ORG}/{health_repo}", + headers={"Authorization": f"token {GITEA_API_TOKEN}"} if GITEA_API_TOKEN else {}, + timeout=5) + gitea["repo_http_status"] = rr.status_code + if rr.status_code == 200: + meta = rr.json() or {} + perms = meta.get("permissions") or {} + gitea["repo_ok"] = True + gitea["repo_name"] = f"{GITEA_ORG}/{health_repo}" + gitea["repo_permissions"] = perms + else: + gitea["repo_ok"] = False + gitea["repo_error"] = rr.text[:200] + except Exception as e: + gitea["repo_ok"] = False + gitea["repo_error"] = str(e)[:200] + gitea["mirror_enabled"] = GITEA_MIRROR_PR_ENABLE + gitea["push_creds_present"] = bool(GITEA_PUSH_USER and GITEA_PUSH_TOKEN) + gitea["ok"] = (rv.status_code == 200) and (not GITEA_MIRROR_PR_ENABLE or auth_ok) + else: + gitea["error"] = rv.text[:200] + gitea["ok"] = False + except Exception as e: + gitea["error"] = str(e)[:200] + gitea["ok"] = False + + overall_ok = bool(github.get("ok") and bugzilla.get("ok") and gitea.get("ok")) + status_code = 200 if overall_ok else 503 + + hints = [] + if GITEA_MIRROR_PR_ENABLE and not gitea.get("push_creds_present"): + hints.append("GITEA_MIRROR_PR_ENABLE is true but GITEA_PUSH_USER/TOKEN are missing.") + if not GITHUB_DRY_RUN and github.get("missing_scopes"): + hints.append(f"GitHub token missing scopes: {', '.join(github['missing_scopes'])}") + if not (BUGZILLA_API_KEY or (BUGZILLA_USER and BUGZILLA_PASSWORD)): + hints.append("Bugzilla credentials not set (API key or username/password).") + if os.environ.get("GITEA_HEALTH_REPO") and not gitea.get("repo_ok"): + hints.append("Gitea repo check failed; verify GITEA_HEALTH_REPO exists and token has access.") + + body = {"ok": overall_ok, "github": github, "bugzilla": bugzilla, "gitea": gitea, "hints": hints} + return JSONResponse(body, status_code=status_code) + +# ---------------- Webhook ---------------- + +@app.post("/webhook/github") +async def github_webhook( + request: Request, + x_hub_signature_256: str = Header(None), + x_github_event: str = Header(None), +): + if not BUGZILLA_PRODUCT or not BUGZILLA_VERSION: + raise HTTPException(status_code=500, detail="BUGZILLA_PRODUCT and BUGZILLA_VERSION must be set") + + body = await request.body() + verify_signature(WEBHOOK_SECRET, body, x_hub_signature_256) + + try: + payload = json.loads(body.decode("utf-8")) + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON") + + if x_github_event != "pull_request": + return JSONResponse({"status": "ignored", "event": x_github_event}) + + action = payload.get("action") + if action not in ("opened", "reopened", "synchronize", "ready_for_review"): + return JSONResponse({"status": "ignored", "action": action}) + + repo_full = payload["repository"]["full_name"] + owner = payload["repository"]["owner"]["login"] + repo = payload["repository"]["name"] + pr = payload["pull_request"] + pr_num = pr["number"] + pr_url = pr["html_url"] + pr_title = pr.get("title") or "" + pr_body = pr.get("body") or "" + base = pr["base"] + head = pr["head"] + target_branch = base["ref"] + head_sha = head["sha"][:7] + + gitea_repo_url = f"{GITEA_BASE_URL}/{GITEA_ORG}/{repo}" + pr_key = f"{repo_full}#{pr_num}" + bug_id = state.get_bug(pr_key) + + gh_repo = None + if not GITHUB_DRY_RUN: + try: + gh_repo = gh.get_repo(repo_full) + except Exception as ge: + logger.warning(f"Could not access GitHub repo {repo_full}: {ge}") + + # Creation-type events + if action in ("opened", "reopened", "ready_for_review"): + if bug_id is None: + summary = f"[GH PR #{pr_num}] {GITEA_ORG}/{repo}: {pr_title}" + description = ( + "Source\n" + f"- Canonical repo (Gitea): {gitea_repo_url}\n" + f"- GitHub mirror PR: {pr_url}\n\n" + "Project policy\n" + "This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla.\n\n" + "Submitter’s notes\n" + f"{pr_body}\n" + ) + try: + bug_id = bz.create_bug(summary, description, component=BUGZILLA_COMPONENT_DEFAULT) + state.set_bug(pr_key, bug_id) + logger.info(f"Created Bugzilla bug {bug_id} for {pr_key}") + # cf_package + try: + bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: repo}) + logger.info(f"Set {CF_PACKAGE_FIELD}={repo} on bug {bug_id}") + except Exception as e_cf: + logger.warning(f"Failed to set {CF_PACKAGE_FIELD} to '{repo}' on bug {bug_id}: {e_cf}; falling back to '{CF_PACKAGE_FALLBACK}'") + with contextlib.suppress(Exception): + bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: CF_PACKAGE_FALLBACK}) + logger.info(f"Set {CF_PACKAGE_FIELD}={CF_PACKAGE_FALLBACK} on bug {bug_id}") + # CC + if BUGZILLA_CC_ENABLE: + email = derive_pr_author_email(owner, repo, pr_num, head_sha) + if email: + is_noreply = email.endswith("@users.noreply.github.com") + if not is_noreply or BUGZILLA_CC_ALLOW_NOREPLY: + try: + bz.add_cc(bug_id, [email]) + logger.info(f"Added CC {email} to bug {bug_id}") + except Exception as e_cc: + logger.warning(f"Failed to add CC {email} to bug {bug_id}: {e_cc}") + if BUGZILLA_CC_FALLBACK_COMMENT: + with contextlib.suppress(Exception): + bz.add_or_update_author_email_note(bug_id, email) + else: + logger.info(f"Skipping noreply CC {email} (set BUGZILLA_CC_ALLOW_NOREPLY=true to allow)") + else: + logger.info("Could not derive PR author email; CC not added") + except Exception as e: + logger.error(f"Bugzilla create failed for {pr_key}: {e}") + if not GITHUB_DRY_RUN and gh_repo is not None: + try: + gh_repo.get_pull(pr_num).create_issue_comment(pr_comment_failure(gitea_repo_url, repo, pr_url, target_branch)) + if BUGZILLA_FAILURE_LABEL: + with contextlib.suppress(Exception): + gh_repo.create_label(BUGZILLA_FAILURE_LABEL, "ededed") + with contextlib.suppress(Exception): + gh_repo.get_issue(pr_num).add_to_labels(BUGZILLA_FAILURE_LABEL) + except Exception as ge: + logger.warning(f"Could not post failure comment on PR #{pr_num}: {ge}") + else: + logger.info(f"[GITHUB_DRY_RUN] Would comment failure on PR #{pr_num}, label={BUGZILLA_FAILURE_LABEL}") + return JSONResponse({"status": "bugzilla_failed", "action": action}) + + if BUGZILLA_ATTACH_DIFF: + try: + patch = fetch_pr_patch(owner, repo, pr_num) + bz.add_attachment( + bug_id=bug_id, + filename=f"PR-{pr_num}-{head_sha}.patch", + summary=f"Patch for PR #{pr_num} ({head_sha})", + content_bytes=patch + ) + except Exception as e: + logger.warning(f"Attach patch failed for bug {bug_id}: {e}") + + # Mirror (create/update) for opened path + if pr_mirror: + logger.debug(f"Mirroring PR #{pr_num} to Gitea (opened path)") + with contextlib.suppress(Exception): + pr_idx = pr_mirror.mirror(owner, repo, pr_num, pr_title, pr_body or "") + if pr_idx: + logger.info(f"Gitea PR #{pr_idx} ensured for {GITEA_ORG}/{repo}") + + # Close GH PR (or dry-run log) + comment = pr_comment_success(gitea_repo_url, bug_id) + if not GITHUB_DRY_RUN and gh_repo is not None: + try: + pr_obj = gh_repo.get_pull(pr_num) + pr_obj.create_issue_comment(comment) + if pr_obj.state != "closed": + pr_obj.edit(state="closed") + except Exception as ge: + logger.warning(f"Could not comment/close PR #{pr_num}: {ge}") + else: + logger.info(f"[GITHUB_DRY_RUN] Would post success comment and close PR #{pr_num}") + return JSONResponse({"status": "ok", "bug_id": bug_id}) + else: + # Bug already exists: idempotent mirror and ensure GH PR closed + if pr_mirror: + logger.debug(f"Mirroring PR #{pr_num} to Gitea (opened idempotent)") + with contextlib.suppress(Exception): + pr_mirror.mirror(owner, repo, pr_num, pr_title, pr_body or "") + if not GITHUB_DRY_RUN: + with contextlib.suppress(Exception): + gh_repo = gh.get_repo(repo_full) + pr_obj = gh_repo.get_pull(pr_num) + if pr_obj.state != "closed": + pr_obj.edit(state="closed") + return JSONResponse({"status": "ok", "bug_id": bug_id}) + + # Update event + elif action == "synchronize": + if bug_id is None: + # Treat as missed 'opened': create bug now then mirror + summary = f"[GH PR #{pr_num}] {GITEA_ORG}/{repo}: {pr_title}" + description = ( + "Source\n" + f"- Canonical repo (Gitea): {gitea_repo_url}\n" + f"- GitHub mirror PR: {pr_url}\n\n" + "Project policy\n" + "This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla.\n\n" + "Submitter’s notes\n" + f"{pr_body}\n" + ) + try: + bug_id = bz.create_bug(summary, description, component=BUGZILLA_COMPONENT_DEFAULT) + state.set_bug(pr_key, bug_id) + logger.info(f"(sync) Created Bugzilla bug {bug_id} for {pr_key}") + with contextlib.suppress(Exception): + bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: repo}) + except Exception as e: + logger.error(f"(sync) Bugzilla create failed for {pr_key}: {e}") + # Proceed to mirror anyway, but return bugzilla_failed + if pr_mirror: + logger.debug(f"Mirroring PR #{pr_num} to Gitea (sync, bug missing)") + with contextlib.suppress(Exception): + pr_mirror.mirror(owner, repo, pr_num, pr_title, pr_body or "") + return JSONResponse({"status": "bugzilla_failed", "action": action, "bug_id": 0}) + + # Attach updated patch if enabled + try: + if BUGZILLA_ATTACH_DIFF: + patch = fetch_pr_patch(owner, repo, pr_num) + bz.add_attachment( + bug_id=bug_id, + filename=f"PR-{pr_num}-{head_sha}.patch", + summary=f"Updated patch for PR #{pr_num} ({head_sha})", + content_bytes=patch + ) + except Exception as e: + logger.warning(f"(sync) Attach update failed for bug {bug_id}: {e}") + + # Mirror updated head (success path) + if pr_mirror: + logger.debug(f"Mirroring PR #{pr_num} to Gitea (synchronize)") + with contextlib.suppress(Exception): + pr_mirror.mirror(owner, repo, pr_num, pr_title, pr_body or "") + + # Ensure GH PR is closed + if not GITHUB_DRY_RUN and gh_repo is not None: + try: + pr_obj = gh_repo.get_pull(pr_num) + if pr_obj.state != "closed": + pr_obj.edit(state="closed") + except Exception as ge: + logger.warning(f"Could not ensure PR #{pr_num} closed: {ge}") + else: + logger.info(f"[GITHUB_DRY_RUN] Would ensure PR #{pr_num} closed") + + return JSONResponse({"status": "ok", "bug_id": bug_id}) + + return JSONResponse({"status": "noop"}) \ No newline at end of file