Files
smeserver-gitutils/webhook_endpoint.py

721 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import json
import base64
import sqlite3
import logging
import hmac
import hashlib
import contextlib
from typing import Optional, Tuple, Dict, Any, List
import httpx
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from github import Github, Auth
#
# .env contents (with keys obscured))
#
# WEBHOOK_SECRET=xxxxxxxxxxxxxxxxxxx
# LOG_LEVEL=INFO
# # GitHub (set GITHUB_DRY_RUN=true during early tests to avoid PR writes)
# GITHUB_TOKEN=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# GITHUB_DRY_RUN=false
# # Bugzilla base
# BUGZILLA_BASE_URL=https://bugs.koozali.org
# BUGZILLA_PRODUCT=SME Server 11.x
# BUGZILLA_VERSION=11.beta1
# BUGZILLA_COMPONENT_DEFAULT=Github PR
# # Auth: prefer API key
# BUGZILLA_AUTH_MODE=api_key
# BUGZILLA_API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# # (or JSON-RPC login if no API key)
# # BUGZILLA_AUTH_MODE=basic
# BUGZILLA_USER=githubpr
# # BUGZILLA_PASSWORD=your_password
# # Behavior
# BUGZILLA_ATTACH_DIFF=true
# STATE_PATH=/home/githubpr/app/webhook/state/webhook_state.sqlite
# GITEA_BASE_URL=https://src.koozali.org
# GITEA_ORG=smeserver
# BUGZILLA_CC_ENABLE=true
# BUGZILLA_CC_ALLOW_NOREPLY=false
#
# ---------------- Logging (UTC) ----------------
class UTCFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
import datetime as dt
from datetime import timezone
ts = dt.datetime.fromtimestamp(record.created, tz=timezone.utc)
return ts.isoformat(timespec="seconds").replace("+00:00", "Z")
def configure_logging():
handler = logging.StreamHandler()
handler.setFormatter(UTCFormatter("[%(asctime)s] %(levelname)s %(message)s"))
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
level = os.environ.get("LOG_LEVEL", "INFO").upper()
root.setLevel(getattr(logging, level, logging.INFO))
configure_logging()
logger = logging.getLogger(__name__)
# ---------------- Config via env ----------------
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "").strip()
if not GITHUB_TOKEN:
logger.warning("GITHUB_TOKEN is not set. You must set it unless GITHUB_DRY_RUN=true.")
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "").strip()
if not WEBHOOK_SECRET:
logger.warning("WEBHOOK_SECRET is not set. Webhook signature verification will fail.")
# Canonical links
GITEA_BASE_URL = os.environ.get("GITEA_BASE_URL", "https://src.koozali.org").rstrip("/")
GITEA_ORG = os.environ.get("GITEA_ORG", "smeserver").strip() or "smeserver"
# Bugzilla config
BUGZILLA_BASE_URL = os.environ.get("BUGZILLA_BASE_URL", "https://bugs.koozali.org").rstrip("/")
BUGZILLA_AUTH_MODE = os.environ.get("BUGZILLA_AUTH_MODE", "").strip().lower() # "api_key" or "basic" (or auto)
BUGZILLA_API_KEY = os.environ.get("BUGZILLA_API_KEY", "").strip()
BUGZILLA_USER = os.environ.get("BUGZILLA_USER", "").strip()
BUGZILLA_PASSWORD = os.environ.get("BUGZILLA_PASSWORD", "").strip()
# REQUIRED fields
BUGZILLA_PRODUCT = os.environ.get("BUGZILLA_PRODUCT", "").strip()
BUGZILLA_VERSION = os.environ.get("BUGZILLA_VERSION", "").strip()
# Component default
BUGZILLA_COMPONENT_DEFAULT = os.environ.get("BUGZILLA_COMPONENT_DEFAULT", "e-smith-*/smeserver-* packages").strip()
BUGZILLA_COMPONENT_FALLBACK = os.environ.get("BUGZILLA_COMPONENT_FALLBACK", BUGZILLA_COMPONENT_DEFAULT).strip()
# Custom field settings
CF_PACKAGE_FIELD = os.environ.get("BUGZILLA_CF_PACKAGE_FIELD", "cf_package").strip()
CF_PACKAGE_FALLBACK = os.environ.get("BUGZILLA_CF_PACKAGE_FALLBACK", "---").strip()
# CC behavior
BUGZILLA_CC_ENABLE = os.environ.get("BUGZILLA_CC_ENABLE", "true").strip().lower() == "true"
BUGZILLA_CC_ALLOW_NOREPLY = os.environ.get("BUGZILLA_CC_ALLOW_NOREPLY", "false").strip().lower() == "true"
BUGZILLA_CC_FALLBACK_COMMENT = os.environ.get("BUGZILLA_CC_FALLBACK_COMMENT", "true").strip().lower() == "true"
AUTHOR_EMAIL_MARKER = os.environ.get("BUGZILLA_AUTHOR_EMAIL_MARKER", "[mirror-bot:author-email]").strip()
# Behavior toggles
BUGZILLA_ATTACH_DIFF = os.environ.get("BUGZILLA_ATTACH_DIFF", "true").strip().lower() == "true"
BUGZILLA_FAILURE_LABEL = os.environ.get("BUGZILLA_FAILURE_LABEL", "bugzilla-needed").strip()
GITHUB_DRY_RUN = os.environ.get("GITHUB_DRY_RUN", "false").strip().lower() == "true"
if GITHUB_DRY_RUN:
logger.info("GITHUB_DRY_RUN is enabled. The server will not comment/close PRs.")
# SQLite for PR→Bug mapping
STATE_PATH = os.environ.get("STATE_PATH", "./webhook_state.sqlite")
# ---------------- State store ----------------
class State:
def __init__(self, path: str):
self.conn = sqlite3.connect(path, check_same_thread=False)
self._init()
def _init(self):
c = self.conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS pr_map (
pr_key TEXT PRIMARY KEY,
bug_id INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)""")
self.conn.commit()
def get_bug(self, pr_key: str) -> Optional[int]:
c = self.conn.cursor()
c.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,))
row = c.fetchone()
return row[0] if row else None
def set_bug(self, pr_key: str, bug_id: int):
import datetime as dt
from datetime import timezone
now = dt.datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
c = self.conn.cursor()
c.execute("""
INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at
""", (pr_key, bug_id, now, now))
self.conn.commit()
# ---------------- Bugzilla hybrid client ----------------
class BugzillaHybrid:
def __init__(self):
self.base = BUGZILLA_BASE_URL
self.mode = None # "rest" or "jsonrpc"
self.rest_base = None
self.rpc_url = f"{self.base}/jsonrpc.cgi"
self._token: Optional[str] = None # for basic via JSON-RPC
self._detect_mode()
if BUGZILLA_AUTH_MODE in ("api_key", "") and BUGZILLA_API_KEY:
logger.info("Bugzilla auth: API key mode")
elif BUGZILLA_AUTH_MODE == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD):
logger.info("Bugzilla auth: basic/login (JSON-RPC token) mode")
else:
logger.warning("Bugzilla auth NOT configured yet (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD).")
def _detect_mode(self):
try:
r = httpx.get(f"{self.base}/rest/version", timeout=10)
if r.status_code == 200:
self.mode = "rest"
self.rest_base = f"{self.base}/rest"
logger.info("Bugzilla REST detected at /rest")
return
except Exception:
pass
try:
r = httpx.get(f"{self.base}/rest.cgi/version", timeout=10)
if r.status_code == 200:
self.mode = "rest"
self.rest_base = f"{self.base}/rest.cgi"
logger.info("Bugzilla REST detected at /rest.cgi")
return
except Exception:
pass
self.mode = "jsonrpc"
logger.warning("Bugzilla REST not available; falling back to JSON-RPC")
# ---------- REST helpers ----------
def _rest_headers(self) -> Dict[str, str]:
h = {"Accept": "application/json", "Content-Type": "application/json"}
if BUGZILLA_API_KEY:
h["X-BUGZILLA-API-KEY"] = BUGZILLA_API_KEY
return h
def _rest_auth(self) -> Optional[Tuple[str, str]]:
if BUGZILLA_AUTH_MODE == "basic" and BUGZILLA_USER and BUGZILLA_PASSWORD:
return (BUGZILLA_USER, BUGZILLA_PASSWORD)
return None
# ---------- JSON-RPC helpers ----------
def _rpc_call(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]:
body = {"method": method, "params": [params_obj], "id": req_id}
r = httpx.post(self.rpc_url, headers={"Content-Type": "application/json"}, json=body, timeout=60)
r.raise_for_status()
data = r.json()
if data.get("error"):
err = data["error"]
raise RuntimeError(f"Bugzilla RPC error {err.get('code')}: {err.get('message')}")
return data.get("result", {})
def _rpc(self, method: str, params_obj: Dict[str, Any], req_id: int = 1) -> Dict[str, Any]:
mode = BUGZILLA_AUTH_MODE
if (mode in ("", "api_key") and BUGZILLA_API_KEY):
params_obj = dict(params_obj, Bugzilla_api_key=BUGZILLA_API_KEY)
elif (mode == "basic" or (BUGZILLA_USER and BUGZILLA_PASSWORD)):
if not self._token:
self._token = self._login_rpc()
params_obj = dict(params_obj, Bugzilla_token=self._token)
else:
raise RuntimeError("Bugzilla auth not configured (set BUGZILLA_API_KEY or BUGZILLA_USER/BUGZILLA_PASSWORD)")
return self._rpc_call(method, params_obj, req_id=req_id)
def _login_rpc(self) -> str:
if not (BUGZILLA_USER and BUGZILLA_PASSWORD):
raise RuntimeError("Bugzilla basic auth requires BUGZILLA_USER and BUGZILLA_PASSWORD")
data = self._rpc_call("User.login", {"login": BUGZILLA_USER, "password": BUGZILLA_PASSWORD}, req_id=0)
token = data.get("token")
if not token:
raise RuntimeError("Bugzilla login did not return a token")
logger.info("Bugzilla JSON-RPC login succeeded")
return token
# ---------- Public API ----------
def create_bug(self, summary: str, description: str, component: str) -> int:
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug"
payload = {
"product": BUGZILLA_PRODUCT,
"component": component or BUGZILLA_COMPONENT_DEFAULT,
"version": BUGZILLA_VERSION,
"summary": summary,
"description": description,
}
r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST create not available", request=r.request, response=r)
r.raise_for_status()
data = r.json()
bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"])
if not bug_id:
raise RuntimeError(f"REST create result missing id: {data}")
return int(bug_id)
except Exception as e:
logger.warning(f"Bugzilla REST create failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
params = {
"product": BUGZILLA_PRODUCT,
"component": component or BUGZILLA_COMPONENT_DEFAULT,
"version": BUGZILLA_VERSION,
"summary": summary,
"description": description,
}
res = self._rpc("Bug.create", params, req_id=1)
bug_id = res.get("id") or (res.get("bugs") and res["bugs"][0]["id"])
if not bug_id:
raise RuntimeError(f"RPC create result missing id: {res}")
return int(bug_id)
def add_attachment(self, bug_id: int, filename: str, summary: str, content_bytes: bytes):
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}/attachment"
payload = {
"ids": [bug_id],
"data": base64.b64encode(content_bytes).decode("ascii"),
"file_name": filename,
"summary": summary,
"content_type": "text/x-patch",
"is_patch": True,
}
r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=120)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST attachment not available", request=r.request, response=r)
r.raise_for_status()
return
except Exception as e:
logger.warning(f"Bugzilla REST add_attachment failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
params = {
"ids": [bug_id],
"data": base64.b64encode(content_bytes).decode("ascii"),
"file_name": filename,
"summary": summary,
"content_type": "text/x-patch",
"is_patch": True,
}
self._rpc("Bug.add_attachment", params, req_id=3)
def add_comment(self, bug_id: int, comment: str):
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}/comment"
payload = {"comment": comment}
r = httpx.post(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=60)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST add_comment not available", request=r.request, response=r)
r.raise_for_status()
return
except Exception as e:
logger.warning(f"Bugzilla REST add_comment failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
self._rpc("Bug.add_comment", {"id": bug_id, "comment": comment}, req_id=2)
def update_bug_fields(self, bug_id: int, fields: Dict[str, Any]):
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}"
r = httpx.put(url, headers=self._rest_headers(), auth=self._rest_auth(), json=fields, timeout=60)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST update not available", request=r.request, response=r)
r.raise_for_status()
return
except Exception as e:
logger.warning(f"Bugzilla REST update failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
params = {"ids": [bug_id]}
params.update(fields)
self._rpc("Bug.update", params, req_id=5)
def add_cc(self, bug_id: int, emails: List[str]):
if not emails:
return
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}"
payload = {"cc": {"add": emails}}
r = httpx.put(url, headers=self._rest_headers(), auth=self._rest_auth(), json=payload, timeout=30)
if r.status_code in (404, 405):
raise httpx.HTTPStatusError("REST update not available", request=r.request, response=r)
r.raise_for_status()
return
except Exception as e:
logger.warning(f"Bugzilla REST add_cc failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
params = {"ids": [bug_id], "cc": {"add": emails}}
self._rpc("Bug.update", params, req_id=6)
def list_comments(self, bug_id: int) -> List[str]:
texts: List[str] = []
if self.mode == "rest":
try:
url = f"{self.rest_base}/bug/{bug_id}/comment"
r = httpx.get(url, headers=self._rest_headers(), auth=self._rest_auth(), timeout=30)
r.raise_for_status()
data = r.json()
bugs = data.get("bugs") or {}
key = str(bug_id)
if key in bugs:
for c in (bugs[key].get("comments") or []):
txt = c.get("text")
if isinstance(txt, str):
texts.append(txt)
return texts
except Exception as e:
logger.warning(f"Bugzilla REST list_comments failed, switching to JSON-RPC: {e}")
self.mode = "jsonrpc"
# JSON-RPC
res = self._rpc("Bug.comments", {"ids": [bug_id]}, req_id=7)
bugs = res.get("bugs") or {}
key = str(bug_id)
if key in bugs:
for c in (bugs[key].get("comments") or []):
txt = c.get("text")
if isinstance(txt, str):
texts.append(txt)
return texts
def add_or_update_author_email_note(self, bug_id: int, email: str):
"""Post a marker comment with the author email if not already present."""
try:
existing = self.list_comments(bug_id)
except Exception as e:
logger.warning(f"Could not list comments for bug {bug_id}: {e}")
existing = []
marker_line = f"{AUTHOR_EMAIL_MARKER} PR author email could not be added to CC (not registered): {email}"
# Idempotent: if an existing marker has this exact email, do nothing
for text in existing:
if AUTHOR_EMAIL_MARKER in text and email in text:
return
# Otherwise add a new note
self.add_comment(bug_id, marker_line)
# ---------------- GitHub client (PyGithub) ----------------
gh = Github(auth=Auth.Token(GITHUB_TOKEN)) if GITHUB_TOKEN else Github()
state = State(STATE_PATH)
bz = BugzillaHybrid()
app = FastAPI()
# ---------------- Helpers ----------------
def verify_signature(secret: str, body: bytes, signature: str):
if not signature:
raise HTTPException(status_code=401, detail="Missing signature")
expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
def pr_comment_success(gitea_repo_url: str, bug_id: int) -> str:
return (
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
f"- Canonical: {gitea_repo_url}\n"
f"- Please file and discuss changes in Bugzilla: {BUGZILLA_BASE_URL}\n\n"
f"We created Bug {bug_id} to track this proposal:\n"
f"- {BUGZILLA_BASE_URL}/show_bug.cgi?id={bug_id}\n\n"
"This pull request will be closed here. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n"
)
def pr_comment_failure(gitea_repo_url: str, repo: str, pr_url: str, target_branch: str) -> str:
return (
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
f"- Canonical: {gitea_repo_url}\n\n"
"We were unable to create a Bugzilla ticket automatically at this time.\n"
f"Please open a bug at {BUGZILLA_BASE_URL} (Product: {BUGZILLA_PRODUCT}, Version: {BUGZILLA_VERSION}, Component: {BUGZILLA_COMPONENT_DEFAULT}) and include:\n"
f"- GitHub PR: {pr_url}\n"
f"- Target branch: {target_branch}\n"
"- Summary and rationale for the change\n\n"
"This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n"
)
def fetch_pr_patch(owner: str, repo: str, pr_num: int) -> bytes:
url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_num}"
headers = {"Accept": "application/vnd.github.v3.patch"}
if GITHUB_TOKEN:
headers["Authorization"] = f"token {GITHUB_TOKEN}"
r = httpx.get(url, headers=headers, timeout=120)
r.raise_for_status()
return r.content
def derive_pr_author_email(owner: str, repo: str, pr_num: int, head_sha: str) -> Optional[str]:
try:
url = f"https://api.github.com/repos/{owner}/{repo}/commits/{head_sha}"
headers = {"Accept": "application/vnd.github+json"}
if GITHUB_TOKEN:
headers["Authorization"] = f"token {GITHUB_TOKEN}"
r = httpx.get(url, headers=headers, timeout=20)
if r.status_code == 200:
data = r.json()
email = (data.get("commit") or {}).get("author", {}).get("email")
if email:
return email
except Exception:
pass
try:
url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_num}/commits"
headers = {"Accept": "application/vnd.github+json"}
if GITHUB_TOKEN:
headers["Authorization"] = f"token {GITHUB_TOKEN}"
r = httpx.get(url, headers=headers, timeout=20)
if r.status_code == 200:
commits = r.json()
if commits:
email = (commits[0].get("commit") or {}).get("author", {}).get("email")
if email:
return email
except Exception:
pass
try:
patch = fetch_pr_patch(owner, repo, pr_num).decode("utf-8", "ignore")
for line in patch.splitlines():
if line.startswith("From: ") and "<" in line and ">" in line:
cand = line[line.find("<")+1:line.find(">", line.find("<")+1)].strip()
if "@" in cand:
return cand
break
except Exception:
pass
return None
# ---------------- Routes ----------------
@app.get("/healthz")
async def healthz():
gh_status = {"ok": False}
bz_status = {"ok": False}
# GitHub quick check: rate limit endpoint
try:
gh_headers = {"Accept": "application/vnd.github+json"}
if GITHUB_TOKEN:
gh_headers["Authorization"] = f"token {GITHUB_TOKEN}"
r = httpx.get("https://api.github.com/rate_limit", headers=gh_headers, timeout=5)
gh_status["http_status"] = r.status_code
if r.status_code == 200:
data = r.json()
core = (data.get("resources") or {}).get("core") or {}
gh_status.update({
"remaining": core.get("remaining"),
"reset": core.get("reset"),
})
gh_status["ok"] = True
else:
gh_status["error"] = r.text[:200]
except Exception as e:
gh_status["error"] = str(e)[:200]
# Bugzilla quick check: REST /version if available, else JSON-RPC Bugzilla.version
try:
if getattr(bz, "mode", None) == "rest" and getattr(bz, "rest_base", None):
r2 = httpx.get(f"{bz.rest_base}/version",
headers=bz._rest_headers(),
auth=bz._rest_auth(),
timeout=5)
bz_status["http_status"] = r2.status_code
if r2.status_code == 200:
bz_status["version"] = (r2.json() or {}).get("version")
bz_status["ok"] = True
else:
bz_status["error"] = r2.text[:200]
else:
payload = {"method": "Bugzilla.version", "params": [{}], "id": 99}
# Include API key if configured (not strictly required for version)
if BUGZILLA_API_KEY:
payload["params"][0]["Bugzilla_api_key"] = BUGZILLA_API_KEY
r2 = httpx.post(bz.rpc_url,
headers={"Content-Type": "application/json"},
json=payload,
timeout=5)
bz_status["http_status"] = r2.status_code
if r2.status_code == 200:
data = r2.json()
if not data.get("error"):
bz_status["version"] = (data.get("result") or {}).get("version")
bz_status["ok"] = True
else:
bz_status["error"] = str(data["error"])[:200]
else:
bz_status["error"] = r2.text[:200]
except Exception as e:
bz_status["error"] = str(e)[:200]
overall_ok = bool(gh_status.get("ok") and bz_status.get("ok"))
status_code = 200 if overall_ok else 503
return JSONResponse({"ok": overall_ok, "github": gh_status, "bugzilla": bz_status},
status_code=status_code)
@app.post("/webhook/github")
async def github_webhook(
request: Request,
x_hub_signature_256: str = Header(None),
x_github_event: str = Header(None),
):
if not BUGZILLA_PRODUCT or not BUGZILLA_VERSION:
raise HTTPException(status_code=500, detail="BUGZILLA_PRODUCT and BUGZILLA_VERSION must be set")
body = await request.body()
verify_signature(WEBHOOK_SECRET, body, x_hub_signature_256)
try:
payload = json.loads(body.decode("utf-8"))
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
if x_github_event != "pull_request":
return JSONResponse({"status": "ignored", "event": x_github_event})
action = payload.get("action")
if action not in ("opened", "reopened", "synchronize", "ready_for_review"):
return JSONResponse({"status": "ignored", "action": action})
repo_full = payload["repository"]["full_name"]
owner = payload["repository"]["owner"]["login"]
repo = payload["repository"]["name"]
pr = payload["pull_request"]
pr_num = pr["number"]
pr_url = pr["html_url"]
pr_title = pr.get("title") or ""
pr_body = pr.get("body") or ""
base = pr["base"]
head = pr["head"]
target_branch = base["ref"]
head_sha = head["sha"][:7]
gitea_repo_url = f"{GITEA_BASE_URL}/{GITEA_ORG}/{repo}"
pr_key = f"{repo_full}#{pr_num}"
bug_id = state.get_bug(pr_key)
gh_repo = None
if not GITHUB_DRY_RUN:
try:
gh_repo = gh.get_repo(repo_full)
except Exception as ge:
logger.warning(f"Could not access GitHub repo {repo_full}: {ge}")
if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"):
summary = f"[GH PR #{pr_num}] {GITEA_ORG}/{repo}: {pr_title}"
description = (
"Source\n"
f"- Canonical repo (Gitea): {gitea_repo_url}\n"
f"- GitHub mirror PR: {pr_url}\n\n"
"Project policy\n"
"This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla.\n\n"
"Submitters notes\n"
f"{pr_body}\n"
)
try:
bug_id = bz.create_bug(summary, description, component=BUGZILLA_COMPONENT_DEFAULT)
state.set_bug(pr_key, bug_id)
logger.info(f"Created Bugzilla bug {bug_id} for {pr_key}")
# Set cf_package to repo, fallback to '---'
try:
bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: repo})
logger.info(f"Set {CF_PACKAGE_FIELD}={repo} on bug {bug_id}")
except Exception as e_cf:
logger.warning(f"Failed to set {CF_PACKAGE_FIELD} to '{repo}' on bug {bug_id}: {e_cf}; falling back to '{CF_PACKAGE_FALLBACK}'")
with contextlib.suppress(Exception):
bz.update_bug_fields(bug_id, {CF_PACKAGE_FIELD: CF_PACKAGE_FALLBACK})
logger.info(f"Set {CF_PACKAGE_FIELD}={CF_PACKAGE_FALLBACK} on bug {bug_id}")
# Add PR author to CC (or comment fallback)
if BUGZILLA_CC_ENABLE:
email = derive_pr_author_email(owner, repo, pr_num, head_sha)
if email:
is_noreply = email.endswith("@users.noreply.github.com")
if not is_noreply or BUGZILLA_CC_ALLOW_NOREPLY:
try:
bz.add_cc(bug_id, [email])
logger.info(f"Added CC {email} to bug {bug_id}")
except Exception as e_cc:
logger.warning(f"Failed to add CC {email} to bug {bug_id}: {e_cc}")
if BUGZILLA_CC_FALLBACK_COMMENT:
with contextlib.suppress(Exception):
bz.add_or_update_author_email_note(bug_id, email)
else:
logger.info(f"Skipping noreply CC {email} (set BUGZILLA_CC_ALLOW_NOREPLY=true to allow)")
else:
logger.info("Could not derive PR author email; CC not added")
except Exception as e:
logger.error(f"Bugzilla create failed for {pr_key}: {e}")
if not GITHUB_DRY_RUN and gh_repo is not None:
try:
gh_repo.get_pull(pr_num).create_issue_comment(pr_comment_failure(gitea_repo_url, repo, pr_url, target_branch))
if BUGZILLA_FAILURE_LABEL:
with contextlib.suppress(Exception):
gh_repo.create_label(BUGZILLA_FAILURE_LABEL, "ededed")
with contextlib.suppress(Exception):
gh_repo.get_issue(pr_num).add_to_labels(BUGZILLA_FAILURE_LABEL)
except Exception as ge:
logger.warning(f"Could not post failure comment on PR #{pr_num}: {ge}")
else:
logger.info(f"[GITHUB_DRY_RUN] Would comment failure on PR #{pr_num}, label={BUGZILLA_FAILURE_LABEL}")
return JSONResponse({"status": "bugzilla_failed", "action": action})
if BUGZILLA_ATTACH_DIFF:
try:
patch = fetch_pr_patch(owner, repo, pr_num)
bz.add_attachment(
bug_id=bug_id,
filename=f"PR-{pr_num}-{head_sha}.patch",
summary=f"Patch for PR #{pr_num} ({head_sha})",
content_bytes=patch
)
except Exception as e:
logger.warning(f"Attach patch failed for bug {bug_id}: {e}")
comment = pr_comment_success(gitea_repo_url, bug_id)
if not GITHUB_DRY_RUN and gh_repo is not None:
try:
pr_obj = gh_repo.get_pull(pr_num)
pr_obj.create_issue_comment(comment)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
except Exception as ge:
logger.warning(f"Could not comment/close PR #{pr_num}: {ge}")
else:
logger.info(f"[GITHUB_DRY_RUN] Would post success comment and close PR #{pr_num}")
return JSONResponse({"status": "ok", "bug_id": bug_id})
elif bug_id is not None and action == "synchronize":
try:
if BUGZILLA_ATTACH_DIFF:
patch = fetch_pr_patch(owner, repo, pr_num)
bz.add_attachment(
bug_id=bug_id,
filename=f"PR-{pr_num}-{head_sha}.patch",
summary=f"Updated patch for PR #{pr_num} ({head_sha})",
content_bytes=patch
)
if not GITHUB_DRY_RUN and gh_repo is not None:
try:
pr_obj = gh_repo.get_pull(pr_num)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
except Exception as ge:
logger.warning(f"Could not ensure PR #{pr_num} closed: {ge}")
else:
logger.info(f"[GITHUB_DRY_RUN] Would ensure PR #{pr_num} closed")
return JSONResponse({"status": "ok", "bug_id": bug_id})
except Exception as e:
logger.error(f"Attach update failed for bug {bug_id}: {e}")
if not GITHUB_DRY_RUN and gh_repo is not None:
with contextlib.suppress(Exception):
pr_obj = gh_repo.get_pull(pr_num)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
else:
logger.info(f"[GITHUB_DRY_RUN] Would ensure PR #{pr_num} closed (after attach failure)")
return JSONResponse({"status": "attach_failed", "bug_id": bug_id})
return JSONResponse({"status": "noop"})