1254 lines
57 KiB
Python
1254 lines
57 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import base64
|
||
import contextlib
|
||
import dataclasses
|
||
import datetime as dt
|
||
import functools
|
||
import hashlib
|
||
import hmac
|
||
import io
|
||
import json
|
||
import logging
|
||
import os
|
||
import signal
|
||
import sqlite3
|
||
import subprocess
|
||
import sys
|
||
import threading
|
||
import time
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
# Third-party deps:
|
||
# pip install PyYAML httpx PyGithub fastapi uvicorn
|
||
import httpx
|
||
import yaml
|
||
from fastapi import FastAPI, Header, HTTPException, Request
|
||
from fastapi.responses import JSONResponse, PlainTextResponse
|
||
from github import Github, GithubException, Auth
|
||
from github.Branch import Branch
|
||
from github.Repository import Repository
|
||
import uvicorn
|
||
from urllib.parse import quote as urlquote
|
||
from datetime import timezone
|
||
|
||
# --------------------------
|
||
# Time helpers
|
||
# --------------------------
|
||
|
||
def utc_now() -> dt.datetime:
|
||
return dt.datetime.now(timezone.utc)
|
||
|
||
def rfc3339_now() -> str:
|
||
return utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
||
|
||
# --------------------------
|
||
# Logging setup
|
||
# --------------------------
|
||
|
||
class UTCFormatter(logging.Formatter):
|
||
converter = time.gmtime
|
||
def formatTime(self, record, datefmt=None):
|
||
ts = dt.datetime.fromtimestamp(record.created, tz=timezone.utc)
|
||
return ts.isoformat(timespec="seconds").replace("+00:00", "Z")
|
||
|
||
def configure_logging(level_name: str = "INFO"):
|
||
level = getattr(logging, (level_name or "INFO").upper(), logging.INFO)
|
||
handler = logging.StreamHandler()
|
||
handler.setFormatter(UTCFormatter("[%(asctime)s] %(levelname)s %(message)s"))
|
||
root = logging.getLogger()
|
||
root.handlers.clear()
|
||
root.addHandler(handler)
|
||
root.setLevel(level)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# --------------------------
|
||
# Configuration and defaults
|
||
# --------------------------
|
||
|
||
DEFAULT_CONFIG = {
|
||
"sync": {
|
||
"source_of_truth": "gitea",
|
||
"gitea": {
|
||
"base_url": "https://src.koozali.org",
|
||
"org": "smeserver",
|
||
"token": os.environ.get("GITEA_TOKEN", ""),
|
||
"create_if_missing": True,
|
||
"new_repo_defaults": {
|
||
"visibility": "public",
|
||
"default_branch": "main",
|
||
"seed_new_repo": True,
|
||
"seed_commit": {
|
||
"author_name": "Koozali Mirror Bot",
|
||
"author_email": "noreply@koozali.org",
|
||
"message": "Initialize repository (mirror seed)",
|
||
"readme_content": "This repository is initialized by the Koozali mirror service.\nCanonical development happens on Gitea:\n- https://src.koozali.org/{org}/{repo}\nPlease file issues and patches via Bugzilla:\n- https://bugs.koozali.org\n",
|
||
},
|
||
"description_template": "Canonical repository for {org}/{repo}",
|
||
"homepage_template": "https://src.koozali.org/{org}/{repo}",
|
||
},
|
||
"push_mirror": {
|
||
"enable": True
|
||
}
|
||
},
|
||
"github": {
|
||
"owner": "Koozali-SME-Server", # set your GitHub org/user
|
||
"auth": {
|
||
"mode": "pat",
|
||
"token": os.environ.get("GITHUB_TOKEN", ""),
|
||
},
|
||
"repo_defaults": {
|
||
"mirror_visibility": "mirror_source",
|
||
"topics": ["mirror", "read-only"],
|
||
"description_template": "Read-only mirror of {gitea_repo_url}. Submit patches via Bugzilla: https://bugs.koozali.org",
|
||
"homepage_template": "https://bugs.koozali.org",
|
||
},
|
||
"branch_protection": {
|
||
"apply_to": "default_branch",
|
||
"include_admins": True,
|
||
"allow_force_pushes": True,
|
||
"allow_deletions": False,
|
||
"restrict_push": {
|
||
"users": ["<PAT-username>"], # replaced at runtime
|
||
"teams": [],
|
||
"apps": []
|
||
},
|
||
"required_checks": [],
|
||
"required_reviews": {
|
||
"required_approving_review_count": 0
|
||
}
|
||
},
|
||
"webhook": {
|
||
"mode": "server",
|
||
"url": os.environ.get("GITHUB_WEBHOOK_URL", "http://koozali.bjsystems.co.uk/webhook/github"),
|
||
"secret": os.environ.get("WEBHOOK_SECRET", "change-me"),
|
||
"events": ["pull_request"]
|
||
}
|
||
},
|
||
"cache_path": os.environ.get("SYNC_CACHE_PATH", "./.sync-cache"),
|
||
"state_path": os.environ.get("SYNC_STATE_PATH", "./sync_state.sqlite"),
|
||
"concurrency": 4,
|
||
"interval": 1800,
|
||
"include": [],
|
||
"exclude": [],
|
||
"skip_forks": True
|
||
},
|
||
"bugzilla": {
|
||
"base_url": "https://bugs.koozali.org",
|
||
"auth": {
|
||
"mode": "basic", # or "api_key"
|
||
"username": "brianr",
|
||
"password": os.environ.get("BUGZILLA_PASSWORD", ""),
|
||
},
|
||
"product": "SME11",
|
||
"component_template": "{repo}",
|
||
"component_fallback": "General",
|
||
"auto_create_component": False,
|
||
"groups": [],
|
||
"attach_diff": True,
|
||
"failure_policy": {
|
||
"close_pr_on_bugzilla_failure": False,
|
||
"label_on_bugzilla_failure": "bugzilla-needed"
|
||
},
|
||
"templates": {
|
||
"pr_comment_success":
|
||
"Thanks for the contribution!\n\n"
|
||
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
|
||
"- Canonical: {gitea_repo_url}\n"
|
||
"- Please file and discuss changes in Bugzilla: {bugzilla_base_url}\n\n"
|
||
"We created Bug {bug_id} to track this proposal:\n"
|
||
"- {bug_url}\n\n"
|
||
"This pull request will be closed here. Please follow up on the Bugzilla ticket for review and next steps. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n",
|
||
"pr_comment_failure":
|
||
"Thanks for the contribution!\n\n"
|
||
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
|
||
"- Canonical: {gitea_repo_url}\n\n"
|
||
"We were unable to create a Bugzilla ticket automatically at this time.\n"
|
||
"Please open a bug at {bugzilla_base_url} (Product: SME11, Component: {repo}) and include:\n"
|
||
"- GitHub PR: {pr_url}\n"
|
||
"- Target branch: {target_branch}\n"
|
||
"- Summary and rationale for the change\n\n"
|
||
"This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n",
|
||
"bug_summary": "[GH PR #{pr_number}] {org}/{repo}: {pr_title}",
|
||
"bug_body":
|
||
"Source\n"
|
||
"- Canonical repo (Gitea): {gitea_repo_url}\n"
|
||
"- GitHub mirror PR: {pr_url}\n"
|
||
"- Submitted by: {github_user} ({github_user_url})\n"
|
||
"- Opened: {created_at}\n\n"
|
||
"Target and branch info\n"
|
||
"- Base branch: {target_branch} ({base_sha})\n"
|
||
"- Head branch: {source_branch} ({head_sha})\n"
|
||
"- Commits in PR: {commit_count}\n"
|
||
"- Compare: {compare_url}\n\n"
|
||
"Project policy\n"
|
||
"This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla. Any accepted changes will be applied to the Gitea repository and then mirrored back to GitHub.\n\n"
|
||
"Submitter’s notes\n"
|
||
"{pr_body}\n\n"
|
||
"Attachments\n"
|
||
"- The PR diff/patch is attached automatically by the mirror service. Subsequent updates to this PR will add new attachments to this bug.\n",
|
||
"bug_update_comment":
|
||
"Update from GitHub PR #{pr_number} for {org}/{repo}\n\n"
|
||
"- New head: {head_sha} (base: {base_sha})\n"
|
||
"- Commits in update: {commit_count}\n"
|
||
"- Compare: {compare_url}\n"
|
||
"- PR: {pr_url}\n\n"
|
||
"A refreshed patch set has been attached to this bug. Original PR description follows (for context):\n"
|
||
"{pr_body}\n",
|
||
"pr_sync_short_comment":
|
||
"Thanks for the update. We’ve attached a refreshed patch set to Bug {bug_id}: {bug_url}\n"
|
||
"This PR remains closed; please continue discussion in Bugzilla.\n"
|
||
}
|
||
}
|
||
}
|
||
|
||
# --------------------------
|
||
# Utilities
|
||
# --------------------------
|
||
|
||
def run(cmd: List[str], cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Tuple[int, str, str]:
|
||
logger.debug(f"Running command: {' '.join(cmd)} (cwd={cwd})")
|
||
p = subprocess.Popen(cmd, cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||
out, err = p.communicate()
|
||
if p.returncode != 0:
|
||
logger.error(f"Command failed ({p.returncode}): {' '.join(cmd)}\nstderr: {err.strip()}")
|
||
else:
|
||
logger.debug(f"Command succeeded: {' '.join(cmd)}")
|
||
return p.returncode, out, err
|
||
|
||
def ensure_dir(path: str):
|
||
os.makedirs(path, exist_ok=True)
|
||
|
||
def safe_join(*parts) -> str:
|
||
return os.path.normpath(os.path.join(*parts))
|
||
|
||
def hmac_sha256(secret: str, data: bytes) -> str:
|
||
return "sha256=" + hmac.new(secret.encode("utf-8"), data, hashlib.sha256).hexdigest()
|
||
|
||
# --------------------------
|
||
# State (SQLite)
|
||
# --------------------------
|
||
|
||
class StateStore:
|
||
def __init__(self, path: str):
|
||
self.path = path
|
||
ensure_dir(os.path.dirname(os.path.abspath(path)) or ".")
|
||
self.conn = sqlite3.connect(path, check_same_thread=False)
|
||
self.conn.execute("PRAGMA journal_mode=WAL;")
|
||
self._init()
|
||
logger.debug(f"StateStore initialized at {path}")
|
||
|
||
def _init(self):
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
CREATE TABLE IF NOT EXISTS pr_map (
|
||
pr_key TEXT PRIMARY KEY,
|
||
bug_id INTEGER NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
)
|
||
""")
|
||
cur.execute("""
|
||
CREATE TABLE IF NOT EXISTS repo_map (
|
||
gitea_repo_id INTEGER PRIMARY KEY,
|
||
gitea_full_name TEXT NOT NULL,
|
||
github_full_name TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
)
|
||
""")
|
||
self.conn.commit()
|
||
|
||
def set_pr_bug(self, pr_key: str, bug_id: int):
|
||
now = rfc3339_now()
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?)
|
||
ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at
|
||
""", (pr_key, bug_id, now, now))
|
||
self.conn.commit()
|
||
logger.debug(f"PR→Bug mapping set {pr_key} -> {bug_id}")
|
||
|
||
def get_pr_bug(self, pr_key: str) -> Optional[int]:
|
||
cur = self.conn.cursor()
|
||
cur.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,))
|
||
row = cur.fetchone()
|
||
return row[0] if row else None
|
||
|
||
def set_repo_map(self, gitea_repo_id: int, gitea_full_name: str, github_full_name: str):
|
||
now = rfc3339_now()
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
INSERT INTO repo_map (gitea_repo_id, gitea_full_name, github_full_name, updated_at)
|
||
VALUES (?, ?, ?, ?)
|
||
ON CONFLICT(gitea_repo_id) DO UPDATE SET gitea_full_name=excluded.gitea_full_name, github_full_name=excluded.github_full_name, updated_at=excluded.updated_at
|
||
""", (gitea_repo_id, gitea_full_name, github_full_name, now))
|
||
self.conn.commit()
|
||
logger.debug(f"Repo mapping set {gitea_full_name} (id={gitea_repo_id}) -> {github_full_name}")
|
||
|
||
def get_repo_map_by_id(self, gitea_repo_id: int) -> Optional[Tuple[str, str]]:
|
||
cur = self.conn.cursor()
|
||
cur.execute("SELECT gitea_full_name, github_full_name FROM repo_map WHERE gitea_repo_id=?", (gitea_repo_id,))
|
||
row = cur.fetchone()
|
||
return (row[0], row[1]) if row else None
|
||
|
||
# --------------------------
|
||
# Clients
|
||
# --------------------------
|
||
|
||
class GiteaClient:
|
||
def __init__(self, base_url: str, token: str):
|
||
import re
|
||
self.base_url = base_url.rstrip("/")
|
||
self.token = re.sub(r'^(token|bearer)\s+', '', (token or ''), flags=re.I).strip()
|
||
logger.debug(f"Gitea token len={len(self.token)} prefix={(self.token[:6] + '...') if self.token else '<empty>'}")
|
||
|
||
def _url(self, path: str) -> str:
|
||
return f"{self.base_url}{path}"
|
||
|
||
def _request(self, method: str, path: str, *, json_body=None, params=None, timeout=30) -> httpx.Response:
|
||
url = self._url(path)
|
||
# 1) token scheme
|
||
headers = {"Authorization": f"token {self.token}"} if self.token else {}
|
||
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout, trust_env=False)
|
||
logger.debug(f"Gitea {method} {path} auth=token -> {r.status_code}")
|
||
if r.status_code == 401 and self.token:
|
||
# 2) Bearer scheme
|
||
headers = {"Authorization": f"Bearer {self.token}"}
|
||
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout, trust_env=False)
|
||
logger.debug(f"Gitea {method} {path} auth=bearer -> {r.status_code}")
|
||
if r.status_code == 401:
|
||
# 3) Query param fallback (proxy may strip Authorization)
|
||
qp = dict(params or {})
|
||
qp["access_token"] = self.token
|
||
r = httpx.request(method, url, json=json_body, params=qp, timeout=timeout, trust_env=False)
|
||
logger.debug(f"Gitea {method} {path} auth=query -> {r.status_code}")
|
||
return r
|
||
|
||
def get_repo(self, owner: str, repo: str):
|
||
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}")
|
||
if r.status_code == 404:
|
||
logger.info(f"Gitea repo {owner}/{repo} not found (404)")
|
||
return None
|
||
if r.status_code == 401:
|
||
logger.error(f"Gitea 401 for GET /repos/{owner}/{repo}. Token may lack access or proxy strips Authorization.")
|
||
raise RuntimeError(f"Gitea 401 for GET /repos/{owner}/{repo}. Token user may lack access to org '{owner}', or proxy is stripping Authorization.")
|
||
r.raise_for_status()
|
||
logger.debug(f"Gitea repo {owner}/{repo} fetched OK")
|
||
return r.json()
|
||
|
||
def create_org_repo(self, org: str, name: str, private: bool, description: str, homepage: str, default_branch: str, auto_init: bool):
|
||
payload = {
|
||
"name": name,
|
||
"private": private,
|
||
"description": description,
|
||
"website": homepage,
|
||
"default_branch": default_branch,
|
||
"auto_init": auto_init
|
||
}
|
||
logger.info(f"Creating Gitea repo {org}/{name} (private={private}, auto_init={auto_init}, default_branch={default_branch})")
|
||
r = self._request("POST", f"/api/v1/orgs/{org}/repos", json_body=payload, timeout=60)
|
||
if r.status_code == 401:
|
||
logger.error(f"Gitea 401 creating repo in org '{org}'")
|
||
raise RuntimeError(f"Gitea 401 creating repo in org '{org}'. Ensure token user has permissions and token scopes include write:organization.")
|
||
if r.status_code not in (200, 201):
|
||
logger.error(f"Gitea create repo failed: {r.status_code} {r.text}")
|
||
raise RuntimeError(f"Gitea create repo failed: {r.status_code} {r.text}")
|
||
logger.info(f"Gitea repo {org}/{name} created")
|
||
return r.json()
|
||
|
||
def list_org_repos(self, org: str):
|
||
out, page = [], 1
|
||
logger.debug(f"Listing Gitea repos for org {org}")
|
||
while True:
|
||
r = self._request("GET", f"/api/v1/orgs/{org}/repos", params={"limit": 50, "page": page})
|
||
r.raise_for_status()
|
||
items = r.json()
|
||
if not items:
|
||
break
|
||
out.extend(items)
|
||
page += 1
|
||
logger.info(f"Found {len(out)} repos in Gitea org {org}")
|
||
return out
|
||
|
||
def get_push_mirrors(self, owner: str, repo: str):
|
||
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}/push_mirrors")
|
||
if r.status_code == 404:
|
||
logger.debug(f"Gitea push mirror list not available or none for {owner}/{repo}")
|
||
return []
|
||
r.raise_for_status()
|
||
return r.json()
|
||
|
||
def create_push_mirror(self, owner: str, repo: str, remote_address: str, username: Optional[str], password: Optional[str], sync_on_commit: bool = True, interval: str = "24h0m0s") -> bool:
|
||
payload = {
|
||
"remote_address": remote_address,
|
||
"remote_username": username or "",
|
||
"remote_password": password or "",
|
||
"sync_on_commit": sync_on_commit,
|
||
"interval": interval,
|
||
}
|
||
logger.info(f"Configuring Gitea push mirror for {owner}/{repo} -> GitHub (interval={interval}, sync_on_commit={sync_on_commit})")
|
||
r = self._request("POST", f"/api/v1/repos/{owner}/{repo}/push_mirrors", json_body=payload, timeout=30)
|
||
if r.status_code in (200, 201):
|
||
logger.info(f"Gitea push mirror created for {owner}/{repo}")
|
||
return True
|
||
if r.status_code == 404:
|
||
logger.warning("Gitea push mirror API not available on this server/version.")
|
||
return False
|
||
logger.error(f"Failed to create push mirror on Gitea {owner}/{repo}: {r.status_code} {r.text}")
|
||
return False
|
||
|
||
class GitHubClient:
|
||
def __init__(self, owner: str, token: str):
|
||
self.owner = owner
|
||
self.gh = Github(auth=Auth.Token(token), per_page=100)
|
||
self.token = token
|
||
# Redacted token logging
|
||
red_prefix = (token[:4] + '...') if token else '<empty>'
|
||
logger.debug(f"GitHub token prefix={red_prefix} len={len(token) if token else 0}")
|
||
try:
|
||
self.auth_user = self.gh.get_user().login
|
||
logger.info(f"Authenticated to GitHub as {self.auth_user}, target owner={self.owner}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to authenticate to GitHub: {e}")
|
||
self.auth_user = "<unknown>"
|
||
|
||
def make_push_url(self, repo_name: str) -> str:
|
||
user = self.auth_user or "git"
|
||
return f"https://{urlquote(user)}:{urlquote(self.token)}@github.com/{self.owner}/{repo_name}.git"
|
||
|
||
def get_or_create_repo(self, name: str, visibility: str, description: str, homepage: str) -> Repository:
|
||
logger.debug(f"Ensuring GitHub repo {self.owner}/{name} (visibility={visibility})")
|
||
try:
|
||
org = self.gh.get_organization(self.owner)
|
||
repo = None
|
||
with contextlib.suppress(GithubException):
|
||
repo = org.get_repo(name)
|
||
if repo is None:
|
||
logger.info(f"Creating GitHub repo {self.owner}/{name} under organization")
|
||
repo = org.create_repo(
|
||
name=name,
|
||
description=description or "",
|
||
homepage=homepage or "",
|
||
private=(visibility == "private"),
|
||
has_issues=False,
|
||
has_projects=False,
|
||
has_wiki=False,
|
||
auto_init=False
|
||
)
|
||
else:
|
||
repo.edit(
|
||
name=name,
|
||
description=description or "",
|
||
homepage=homepage or "",
|
||
private=(visibility == "private"),
|
||
has_issues=False
|
||
)
|
||
return repo
|
||
except GithubException as e:
|
||
if e.status == 404:
|
||
logger.info(f"Owner {self.owner} not an org or not visible; trying user namespace")
|
||
user = self.gh.get_user(self.owner)
|
||
repo = None
|
||
with contextlib.suppress(GithubException):
|
||
repo = user.get_repo(name)
|
||
if repo is None:
|
||
logger.info(f"Creating GitHub repo {self.owner}/{name} under user")
|
||
repo = user.create_repo(
|
||
name=name,
|
||
description=description or "",
|
||
homepage=homepage or "",
|
||
private=(visibility == "private"),
|
||
has_issues=False,
|
||
auto_init=False
|
||
)
|
||
else:
|
||
repo.edit(
|
||
name=name,
|
||
description=description or "",
|
||
homepage=homepage or "",
|
||
private=(visibility == "private"),
|
||
has_issues=False
|
||
)
|
||
return repo
|
||
logger.error(f"GitHub get_or_create_repo error: {e}")
|
||
raise
|
||
|
||
def enforce_repo_settings(self, repo: Repository, topics: List[str]):
|
||
logger.debug(f"Enforcing repo settings on {repo.full_name} (issues off, merge methods config)")
|
||
try:
|
||
repo.edit(
|
||
allow_merge_commit=True, # keep one enabled
|
||
allow_squash_merge=False,
|
||
allow_rebase_merge=False,
|
||
allow_auto_merge=False,
|
||
has_issues=False,
|
||
)
|
||
except GithubException as e:
|
||
if getattr(e, "status", None) == 422:
|
||
logger.warning(f"{repo.full_name}: 422 no_merge_method; retrying with squash merge method")
|
||
repo.edit(
|
||
allow_merge_commit=False,
|
||
allow_squash_merge=True,
|
||
allow_rebase_merge=False,
|
||
has_issues=False,
|
||
)
|
||
else:
|
||
logger.error(f"Failed to edit repo settings for {repo.full_name}: {e}")
|
||
raise
|
||
if topics:
|
||
with contextlib.suppress(GithubException):
|
||
repo.replace_topics(topics)
|
||
logger.debug(f"Set topics on {repo.full_name}: {topics}")
|
||
|
||
def ensure_webhook(self, repo, url: str, secret: str, events: List[str]):
|
||
desired_cfg = {
|
||
"url": url,
|
||
"content_type": "json",
|
||
"secret": secret,
|
||
"insecure_ssl": "0",
|
||
}
|
||
hooks = list(repo.get_hooks())
|
||
for h in hooks:
|
||
cfg = h.config or {}
|
||
if cfg.get("url") == url:
|
||
logger.info(f"Updating existing webhook on {repo.full_name}")
|
||
h.edit("web", config=desired_cfg, events=events, active=True)
|
||
return
|
||
logger.info(f"Creating webhook on {repo.full_name}")
|
||
repo.create_hook(name="web", config=desired_cfg, events=events, active=True)
|
||
|
||
def ensure_branch_protection(
|
||
self,
|
||
repo,
|
||
branch_name: str,
|
||
include_admins: bool,
|
||
allow_force_pushes: bool,
|
||
allow_deletions: bool,
|
||
restrict_users: list[str],
|
||
restrict_teams: list[str],
|
||
restrict_apps: list[str],
|
||
):
|
||
try:
|
||
repo.get_branch(branch_name)
|
||
except GithubException as e:
|
||
logger.warning(f"Branch {branch_name} not found on {repo.full_name}: {e}")
|
||
return
|
||
|
||
owner = repo.owner.login
|
||
name = repo.name
|
||
base = f"https://api.github.com/repos/{owner}/{name}/branches/{branch_name}"
|
||
headers = {
|
||
"Authorization": f"Bearer {self.token}",
|
||
"Accept": "application/vnd.github+json, application/vnd.github.luke-cage-preview+json",
|
||
"X-GitHub-Api-Version": "2022-11-28",
|
||
}
|
||
|
||
# Base protection
|
||
logger.debug(f"Applying base protection on {repo.full_name}@{branch_name} (enforce_admins={include_admins})")
|
||
r = httpx.put(
|
||
f"{base}/protection",
|
||
headers=headers,
|
||
json={
|
||
"required_status_checks": None,
|
||
"enforce_admins": bool(include_admins),
|
||
"required_pull_request_reviews": None,
|
||
"restrictions": None,
|
||
},
|
||
timeout=30,
|
||
)
|
||
if r.status_code not in (200, 201):
|
||
logger.error(f"Failed to set base protection on {repo.full_name}@{branch_name}: {r.status_code} {r.text}")
|
||
return
|
||
|
||
# Allow force pushes
|
||
r_fp = httpx.put(
|
||
f"{base}/protection/allow_force_pushes",
|
||
headers=headers,
|
||
json={"enabled": bool(allow_force_pushes)},
|
||
timeout=15,
|
||
)
|
||
if r_fp.status_code not in (200, 201):
|
||
logger.warning(f"Failed to set allow_force_pushes on {repo.full_name}@{branch_name}: {r_fp.status_code} {r_fp.text}")
|
||
|
||
# Allow deletions
|
||
r_del = httpx.put(
|
||
f"{base}/protection/allow_deletions",
|
||
headers=headers,
|
||
json={"enabled": bool(allow_deletions)},
|
||
timeout=15,
|
||
)
|
||
if r_del.status_code not in (200, 201):
|
||
logger.warning(f"Failed to set allow_deletions on {repo.full_name}@{branch_name}: {r_del.status_code} {r_del.text}")
|
||
|
||
# If enabling force pushes failed, remove protection to avoid blocking mirror updates
|
||
if allow_force_pushes and r_fp.status_code == 404:
|
||
httpx.delete(f"{base}/protection", headers=headers, timeout=15)
|
||
logger.warning(f"Removed protection on {repo.full_name}@{branch_name} to prevent blocking mirror (force pushes endpoint 404).")
|
||
return
|
||
|
||
# Push restrictions (org repos only)
|
||
is_org = getattr(repo.owner, "type", None) == "Organization"
|
||
if is_org and (restrict_users or restrict_teams or restrict_apps):
|
||
logger.debug(f"Applying push restrictions on {repo.full_name}@{branch_name} (users={restrict_users}, teams={restrict_teams}, apps={restrict_apps})")
|
||
r_res = httpx.put(
|
||
f"{base}/protection/restrictions",
|
||
headers=headers,
|
||
json={"users": restrict_users or [], "teams": restrict_teams or [], "apps": restrict_apps or []},
|
||
timeout=30,
|
||
)
|
||
if r_res.status_code not in (200, 201):
|
||
logger.warning(f"Failed to set push restrictions on {repo.full_name}@{branch_name}: {r_res.status_code} {r_res.text}")
|
||
logger.info(f"Branch protection applied (best effort) on {repo.full_name}@{branch_name}. Owner type: {getattr(repo.owner,'type',None)}")
|
||
|
||
def close_pr_with_comment_and_label(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
|
||
pr = repo.get_pull(pr_number)
|
||
pr.create_issue_comment(comment)
|
||
if label:
|
||
with contextlib.suppress(GithubException):
|
||
repo.create_label(name=label, color="ededed")
|
||
with contextlib.suppress(GithubException):
|
||
pr.as_issue().add_to_labels(label)
|
||
if pr.state != "closed":
|
||
pr.edit(state="closed")
|
||
logger.info(f"PR #{pr_number} closed with comment on {repo.full_name}")
|
||
|
||
def comment_on_pr(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
|
||
pr = repo.get_pull(pr_number)
|
||
pr.create_issue_comment(comment)
|
||
if label:
|
||
with contextlib.suppress(GithubException):
|
||
repo.create_label(name=label, color="ededed")
|
||
with contextlib.suppress(GithubException):
|
||
pr.as_issue().add_to_labels(label)
|
||
logger.info(f"Commented on PR #{pr_number} on {repo.full_name}")
|
||
|
||
# --------------------------
|
||
# Bugzilla integration
|
||
# --------------------------
|
||
|
||
class BugzillaClient:
|
||
def __init__(self, cfg: Dict[str, Any]):
|
||
self.base_url = cfg["base_url"].rstrip("/")
|
||
self.auth_mode = cfg["auth"]["mode"]
|
||
self.username = cfg["auth"].get("username", "")
|
||
self.password = cfg["auth"].get("password", "")
|
||
self.api_key = cfg["auth"].get("api_key", "")
|
||
self.product = cfg["product"]
|
||
self.component_template = cfg["component_template"]
|
||
self.component_fallback = cfg["component_fallback"]
|
||
self.auto_create_component = bool(cfg.get("auto_create_component", False))
|
||
self.groups = cfg.get("groups", [])
|
||
self.attach_diff = bool(cfg.get("attach_diff", True))
|
||
self.templates = cfg["templates"]
|
||
logger.debug("Bugzilla client initialized")
|
||
|
||
def _headers(self) -> Dict[str, str]:
|
||
headers = {"Accept": "application/json"}
|
||
if self.auth_mode == "api_key" and self.api_key:
|
||
headers["X-BUGZILLA-API-KEY"] = self.api_key
|
||
return headers
|
||
|
||
def _auth(self) -> Optional[Tuple[str, str]]:
|
||
if self.auth_mode == "basic":
|
||
return (self.username, self.password)
|
||
return None
|
||
|
||
def create_bug(self, summary: str, description: str, component: str, visibility_groups: Optional[List[str]] = None) -> int:
|
||
url = f"{self.base_url}/rest/bug"
|
||
payload = {
|
||
"product": self.product,
|
||
"component": component,
|
||
"summary": summary,
|
||
"description": description,
|
||
}
|
||
if visibility_groups:
|
||
payload["groups"] = visibility_groups
|
||
logger.info(f"Creating Bugzilla bug (product={self.product}, component={component})")
|
||
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
|
||
if r.status_code not in (200, 201):
|
||
logger.error(f"Bugzilla create bug failed: {r.status_code} {r.text}")
|
||
raise RuntimeError(f"Bugzilla create bug failed: {r.status_code} {r.text}")
|
||
data = r.json()
|
||
bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"])
|
||
if not bug_id:
|
||
logger.error(f"Bugzilla response missing bug id: {data}")
|
||
raise RuntimeError(f"Bugzilla response missing bug id: {data}")
|
||
logger.info(f"Bugzilla bug created: {bug_id}")
|
||
return int(bug_id)
|
||
|
||
def add_attachment(self, bug_id: int, file_name: str, content_type: str, summary: str, data_bytes: bytes):
|
||
url = f"{self.base_url}/rest/bug/{bug_id}/attachment"
|
||
payload = {
|
||
"ids": [bug_id],
|
||
"data": base64.b64encode(data_bytes).decode("ascii"),
|
||
"file_name": file_name,
|
||
"summary": summary,
|
||
"content_type": content_type,
|
||
"is_patch": True
|
||
}
|
||
logger.debug(f"Adding attachment to Bugzilla bug {bug_id}: {file_name}")
|
||
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=120)
|
||
if r.status_code not in (200, 201):
|
||
logger.error(f"Bugzilla add attachment failed: {r.status_code} {r.text}")
|
||
raise RuntimeError(f"Bugzilla add attachment failed: {r.status_code} {r.text}")
|
||
|
||
def add_comment(self, bug_id: int, comment: str):
|
||
url = f"{self.base_url}/rest/bug/{bug_id}/comment"
|
||
payload = {"comment": comment}
|
||
logger.debug(f"Adding comment to Bugzilla bug {bug_id}")
|
||
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
|
||
if r.status_code not in (200, 201):
|
||
logger.error(f"Bugzilla add comment failed: {r.status_code} {r.text}")
|
||
raise RuntimeError(f"Bugzilla add comment failed: {r.status_code} {r.text}")
|
||
|
||
# --------------------------
|
||
# Git mirror operations
|
||
# --------------------------
|
||
|
||
class GitMirror:
|
||
def __init__(self, cache_dir: str):
|
||
self.cache_dir = cache_dir
|
||
ensure_dir(cache_dir)
|
||
|
||
def local_path(self, org: str, repo: str) -> str:
|
||
return safe_join(self.cache_dir, f"{org}--{repo}.git")
|
||
|
||
def ensure_local_mirror(self, org: str, repo: str, gitea_clone_url: str):
|
||
path = self.local_path(org, repo)
|
||
if not os.path.isdir(path):
|
||
ensure_dir(os.path.dirname(path))
|
||
logger.info(f"Cloning bare mirror for {org}/{repo}")
|
||
rc, out, err = run(["git", "clone", "--mirror", gitea_clone_url, path])
|
||
if rc != 0:
|
||
raise RuntimeError(f"git clone --mirror failed: {err.strip()}")
|
||
else:
|
||
# Ensure origin URL is correct and fetch
|
||
logger.debug(f"Updating origin URL for {org}/{repo} mirror")
|
||
rc, out, err = run(["git", "remote", "set-url", "origin", gitea_clone_url], cwd=path)
|
||
if rc != 0:
|
||
logger.warning(f"Could not set origin URL for {org}/{repo}: {err.strip()}")
|
||
logger.debug(f"Fetching updates for {org}/{repo}")
|
||
rc, out, err = run(["git", "fetch", "--prune", "--tags", "--force"], cwd=path)
|
||
if rc != 0:
|
||
raise RuntimeError(f"git fetch failed: {err.strip()}")
|
||
|
||
def push_to_github(self, org: str, repo: str, github_url: str):
|
||
path = self.local_path(org, repo)
|
||
remotes_rc, remotes_out, _ = run(["git", "remote"], cwd=path)
|
||
if remotes_rc != 0:
|
||
raise RuntimeError("git remote list failed")
|
||
remotes = set(remotes_out.strip().splitlines())
|
||
if "github" not in remotes:
|
||
logger.debug(f"Adding github remote for {org}/{repo}")
|
||
rc, out, err = run(["git", "remote", "add", "github", github_url], cwd=path)
|
||
if rc != 0:
|
||
raise RuntimeError(f"git remote add github failed: {err.strip()}")
|
||
else:
|
||
logger.debug(f"Updating github remote URL for {org}/{repo}")
|
||
rc, out, err = run(["git", "remote", "set-url", "github", github_url], cwd=path)
|
||
if rc != 0:
|
||
raise RuntimeError(f"git remote set-url github failed: {err.strip()}")
|
||
for refspec in ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]:
|
||
logger.info(f"Pushing {org}/{repo} refspec {refspec} to GitHub")
|
||
rc, out, err = run(["git", "push", "--prune", "github", refspec], cwd=path)
|
||
if rc != 0:
|
||
raise RuntimeError(f"git push failed for {refspec}: {err.strip()}")
|
||
|
||
# --------------------------
|
||
# Template rendering
|
||
# --------------------------
|
||
|
||
def render_template(tpl: str, vars: Dict[str, Any]) -> str:
|
||
try:
|
||
return tpl.format(**vars)
|
||
except Exception:
|
||
return tpl
|
||
|
||
# --------------------------
|
||
# Sync engine
|
||
# --------------------------
|
||
|
||
@dataclasses.dataclass
|
||
class RepoContext:
|
||
org: str
|
||
repo: str
|
||
gitea_repo: Dict[str, Any]
|
||
github_repo: Repository
|
||
|
||
class SyncEngine:
|
||
def __init__(self, cfg: Dict[str, Any], state: StateStore, gitea: GiteaClient, gh: GitHubClient, mirror: GitMirror):
|
||
self.cfg = cfg
|
||
self.state = state
|
||
self.gitea = gitea
|
||
self.gh = gh
|
||
self.mirror = mirror
|
||
|
||
def _derive_texts(self, org: str, repo: str, gitea_http_url: str) -> Tuple[str, str, str, str]:
|
||
gcfg = self.cfg["sync"]["gitea"]["new_repo_defaults"]
|
||
hcfg = self.cfg["sync"]["github"]["repo_defaults"]
|
||
g_desc = render_template(gcfg.get("description_template", "") or "", {"org": org, "repo": repo})
|
||
g_home = render_template(gcfg.get("homepage_template", "") or "", {"org": org, "repo": repo})
|
||
gh_desc = render_template(hcfg.get("description_template", "") or "", {"gitea_repo_url": gitea_http_url})
|
||
gh_home = render_template(hcfg.get("homepage_template", "") or "", {"gitea_repo_url": gitea_http_url})
|
||
return g_desc, g_home, gh_desc, gh_home
|
||
|
||
def ensure_repos_and_mirror(self, org: str, repo: str) -> RepoContext:
|
||
logger.debug(f"[Stage 1] Checking source repo on Gitea: {org}/{repo}")
|
||
gitea_repo = self.gitea.get_repo(org, repo)
|
||
gcfg = self.cfg["sync"]["gitea"]
|
||
if gitea_repo is None:
|
||
if not gcfg.get("create_if_missing", False):
|
||
raise RuntimeError(f"Gitea repo {org}/{repo} not found and auto-creation disabled")
|
||
vis = gcfg["new_repo_defaults"]["visibility"]
|
||
default_branch = gcfg["new_repo_defaults"]["default_branch"]
|
||
auto_init = bool(gcfg["new_repo_defaults"]["seed_new_repo"])
|
||
g_desc, g_home, _, _ = self._derive_texts(org, repo, f"{self.gitea.base_url}/{org}/{repo}")
|
||
logger.info(f"Creating missing Gitea repo {org}/{repo}")
|
||
gitea_repo = self.gitea.create_org_repo(
|
||
org=org,
|
||
name=repo,
|
||
private=(vis == "private"),
|
||
description=g_desc,
|
||
homepage=g_home,
|
||
default_branch=default_branch,
|
||
auto_init=auto_init
|
||
)
|
||
logger.debug(f"[Stage 1] Completed for {org}/{repo}")
|
||
|
||
gitea_http_clone = gitea_repo.get("clone_url") or f"{self.gitea.base_url}/{org}/{repo}.git"
|
||
gitea_html_url = gitea_repo.get("html_url") or f"{self.gitea.base_url}/{org}/{repo}"
|
||
|
||
logger.debug(f"[Stage 2] Ensuring GitHub repo for {org}/{repo}")
|
||
mirror_vis_policy = self.cfg["sync"]["github"]["repo_defaults"]["mirror_visibility"]
|
||
visibility = "private" if (gitea_repo.get("private") and mirror_vis_policy == "mirror_source") else "public"
|
||
g_desc, g_home, gh_desc, gh_home = self._derive_texts(org, repo, gitea_html_url)
|
||
github_repo = self.gh.get_or_create_repo(name=repo, visibility=visibility, description=gh_desc, homepage=gh_home)
|
||
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
|
||
self.gh.enforce_repo_settings(github_repo, topics=topics)
|
||
logger.debug(f"[Stage 2] Completed for {github_repo.full_name}")
|
||
|
||
logger.debug(f"[Stage 3] Ensuring webhook on {github_repo.full_name}")
|
||
wh = self.cfg["sync"]["github"]["webhook"]
|
||
if wh.get("mode") == "server":
|
||
self.gh.ensure_webhook(github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"]))
|
||
logger.debug(f"[Stage 3] Completed")
|
||
|
||
logger.debug(f"[Stage 4] Mirroring content {org}/{repo} -> {github_repo.full_name}")
|
||
gitea_clone = gitea_http_clone
|
||
if gitea_repo.get("private"):
|
||
token = self.cfg["sync"]["gitea"]["token"]
|
||
if token:
|
||
gitea_clone = gitea_http_clone.replace("://", f"://{token}@")
|
||
self.mirror.ensure_local_mirror(org, repo, gitea_clone)
|
||
gh_push = self.gh.make_push_url(repo)
|
||
self.mirror.push_to_github(org, repo, gh_push)
|
||
logger.debug(f"[Stage 4] Completed")
|
||
|
||
logger.debug(f"[Stage 5] Applying branch protection on {github_repo.full_name}")
|
||
default_branch = gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
|
||
bp = self.cfg["sync"]["github"]["branch_protection"]
|
||
restrict_users = bp["restrict_push"].get("users", [])
|
||
restrict_teams = bp["restrict_push"].get("teams", [])
|
||
restrict_apps = bp["restrict_push"].get("apps", [])
|
||
if "<PAT-username>" in restrict_users and self.gh.auth_user != "<unknown>":
|
||
restrict_users = [u for u in restrict_users if u != "<PAT-username>"] + [self.gh.auth_user]
|
||
self.gh.ensure_branch_protection(
|
||
repo=github_repo,
|
||
branch_name=default_branch,
|
||
include_admins=bp.get("include_admins", True),
|
||
allow_force_pushes=bp.get("allow_force_pushes", True),
|
||
allow_deletions=bp.get("allow_deletions", False),
|
||
restrict_users=restrict_users,
|
||
restrict_teams=restrict_teams,
|
||
restrict_apps=restrict_apps
|
||
)
|
||
logger.debug(f"[Stage 5] Completed")
|
||
|
||
logger.debug(f"[Stage 6] Configuring Gitea push mirror (if enabled)")
|
||
if self.cfg["sync"]["gitea"].get("push_mirror", {}).get("enable", True):
|
||
gh_pat = self.gh.token
|
||
remote_address = f"https://{gh_pat}:x-oauth-basic@github.com/{self.gh.owner}/{repo}.git"
|
||
with contextlib.suppress(Exception):
|
||
self.gitea.create_push_mirror(owner=org, repo=repo, remote_address=remote_address, username=None, password=None, sync_on_commit=True)
|
||
logger.debug(f"[Stage 6] Completed")
|
||
|
||
if gitea_repo.get("id"):
|
||
self.state.set_repo_map(int(gitea_repo["id"]), f"{org}/{repo}", f"{self.gh.owner}/{repo}")
|
||
|
||
logger.info(f"Mirror ensure complete: {org}/{repo} -> {github_repo.full_name}")
|
||
return RepoContext(org=org, repo=repo, gitea_repo=gitea_repo, github_repo=github_repo)
|
||
|
||
def enforce_settings_only(self, ctx: RepoContext):
|
||
logger.debug(f"Enforcing settings-only for {ctx.github_repo.full_name}")
|
||
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
|
||
self.gh.enforce_repo_settings(ctx.github_repo, topics=topics)
|
||
wh = self.cfg["sync"]["github"]["webhook"]
|
||
if wh.get("mode") == "server":
|
||
self.gh.ensure_webhook(ctx.github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"]))
|
||
default_branch = ctx.gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
|
||
bp = self.cfg["sync"]["github"]["branch_protection"]
|
||
restrict_users = bp["restrict_push"].get("users", [])
|
||
if "<PAT-username>" in restrict_users and self.gh.auth_user != "<unknown>":
|
||
restrict_users = [u for u in restrict_users if u != "<PAT-username>"] + [self.gh.auth_user]
|
||
self.gh.ensure_branch_protection(
|
||
repo=ctx.github_repo,
|
||
branch_name=default_branch,
|
||
include_admins=bp.get("include_admins", True),
|
||
allow_force_pushes=bp.get("allow_force_pushes", True),
|
||
allow_deletions=bp.get("allow_deletions", False),
|
||
restrict_users=restrict_users,
|
||
restrict_teams=bp["restrict_push"].get("teams", []),
|
||
restrict_apps=bp["restrict_push"].get("apps", [])
|
||
)
|
||
logger.info(f"Settings enforced on {ctx.github_repo.full_name}")
|
||
|
||
# --------------------------
|
||
# Webhook server (PR autocloser)
|
||
# --------------------------
|
||
|
||
class PRAutocloserServer:
|
||
def __init__(self, cfg: Dict[str, Any], state: StateStore, gh_client: GitHubClient, bz_client: BugzillaClient):
|
||
self.cfg = cfg
|
||
self.state = state
|
||
self.ghc = gh_client
|
||
self.bzc = bz_client
|
||
self.app = FastAPI()
|
||
self._setup_routes()
|
||
logger.info("Webhook server initialized")
|
||
|
||
def _verify_signature(self, secret: str, body: bytes, signature: str):
|
||
expected = hmac_sha256(secret, body)
|
||
if not hmac.compare_digest(expected, signature):
|
||
logger.warning("Webhook signature verification failed")
|
||
raise HTTPException(status_code=401, detail="Invalid signature")
|
||
|
||
async def _handle_pr_event(self, payload: Dict[str, Any]):
|
||
action = payload.get("action")
|
||
if action not in ("opened", "reopened", "synchronize", "ready_for_review"):
|
||
logger.debug(f"Ignored PR action: {action}")
|
||
return JSONResponse({"status": "ignored", "action": action})
|
||
|
||
repo_full = payload["repository"]["full_name"]
|
||
owner = payload["repository"]["owner"]["login"]
|
||
repo = payload["repository"]["name"]
|
||
logger.info(f"Handling PR event '{action}' for {repo_full}")
|
||
|
||
pr = payload["pull_request"]
|
||
pr_number = pr["number"]
|
||
pr_url = pr["html_url"]
|
||
pr_title = pr.get("title") or ""
|
||
pr_body = pr.get("body") or ""
|
||
github_user = pr["user"]["login"]
|
||
github_user_url = pr["user"]["html_url"]
|
||
base = pr["base"]
|
||
head = pr["head"]
|
||
target_branch = base["ref"]
|
||
source_branch = f"{head.get('repo', {}).get('owner', {}).get('login', '')}:{head['ref']}" if head.get("repo") else head["ref"]
|
||
base_sha = base["sha"][:7]
|
||
head_sha = head["sha"][:7]
|
||
created_at = pr.get("created_at") or rfc3339_now()
|
||
labels = [l["name"] for l in pr.get("labels", [])] if pr.get("labels") else []
|
||
compare_url = pr.get("html_url") + "/files"
|
||
org = self.cfg["sync"]["gitea"]["org"]
|
||
gitea_repo_url = f"{self.cfg['sync']['gitea']['base_url']}/{org}/{repo}"
|
||
bugzilla_base_url = self.cfg["bugzilla"]["base_url"]
|
||
|
||
variables = {
|
||
"bugzilla_base_url": bugzilla_base_url,
|
||
"bug_id": "",
|
||
"bug_url": "",
|
||
"org": org,
|
||
"repo": repo,
|
||
"gitea_repo_url": gitea_repo_url,
|
||
"github_owner": owner,
|
||
"pr_number": pr_number,
|
||
"pr_title": pr_title,
|
||
"pr_url": pr_url,
|
||
"github_user": github_user,
|
||
"github_user_url": github_user_url,
|
||
"source_branch": source_branch,
|
||
"target_branch": target_branch,
|
||
"head_sha": head_sha,
|
||
"base_sha": base_sha,
|
||
"created_at": created_at,
|
||
"pr_body": pr_body,
|
||
"commit_count": pr.get("commits", ""),
|
||
"compare_url": compare_url,
|
||
"labels": ", ".join(labels)
|
||
}
|
||
|
||
repo_obj = self.ghc.gh.get_repo(repo_full)
|
||
pr_key = f"{repo_full}#{pr_number}"
|
||
bug_id = self.state.get_pr_bug(pr_key)
|
||
|
||
if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"):
|
||
component = self.cfg["bugzilla"]["component_template"].format(repo=repo)
|
||
bug_summary = render_template(self.cfg["bugzilla"]["templates"]["bug_summary"], variables)
|
||
bug_body = render_template(self.cfg["bugzilla"]["templates"]["bug_body"], variables)
|
||
create_ok = True
|
||
try:
|
||
bug_id = self.bzc.create_bug(bug_summary, bug_body, component, visibility_groups=self.cfg["bugzilla"].get("groups", []))
|
||
self.state.set_pr_bug(pr_key, bug_id)
|
||
logger.info(f"Bugzilla bug {bug_id} created for PR {pr_key}")
|
||
except Exception as e:
|
||
logger.error(f"Bugzilla create bug failed for PR {pr_key}: {e}")
|
||
create_ok = False
|
||
|
||
if create_ok:
|
||
if self.cfg["bugzilla"].get("attach_diff", True):
|
||
try:
|
||
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
|
||
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
|
||
r = httpx.get(api_patch_url, headers=headers, timeout=120)
|
||
r.raise_for_status()
|
||
patch_bytes = r.content
|
||
self.bzc.add_attachment(
|
||
bug_id=bug_id,
|
||
file_name=f"PR-{pr_number}-{head_sha}.patch",
|
||
content_type="text/x-patch",
|
||
summary=f"Patch for PR #{pr_number} ({head_sha})",
|
||
data_bytes=patch_bytes
|
||
)
|
||
logger.debug(f"Attached patch to bug {bug_id} for PR {pr_key}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to attach patch for PR {pr_key} to bug {bug_id}: {e}")
|
||
|
||
variables["bug_id"] = str(bug_id)
|
||
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
|
||
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_success"], variables)
|
||
self.ghc.close_pr_with_comment_and_label(repo_obj, pr_number, comment, label=None)
|
||
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
|
||
else:
|
||
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_failure"], variables)
|
||
label = self.cfg["bugzilla"]["failure_policy"].get("label_on_bugzilla_failure") or None
|
||
self.ghc.comment_on_pr(repo_obj, pr_number, comment, label=label)
|
||
return JSONResponse({"status": "bugzilla_failed", "action": action})
|
||
|
||
elif bug_id is not None and action == "synchronize":
|
||
try:
|
||
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
|
||
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
|
||
r = httpx.get(api_patch_url, headers=headers, timeout=120)
|
||
r.raise_for_status()
|
||
patch_bytes = r.content
|
||
self.bzc.add_attachment(
|
||
bug_id=bug_id,
|
||
file_name=f"PR-{pr_number}-{head_sha}.patch",
|
||
content_type="text/x-patch",
|
||
summary=f"Updated patch for PR #{pr_number} ({head_sha})",
|
||
data_bytes=patch_bytes
|
||
)
|
||
short_tpl = self.cfg["bugzilla"]["templates"].get("pr_sync_short_comment")
|
||
if short_tpl:
|
||
variables["bug_id"] = str(bug_id)
|
||
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
|
||
self.ghc.comment_on_pr(repo_obj, pr_number, render_template(short_tpl, variables), label=None)
|
||
pr_obj = repo_obj.get_pull(pr_number)
|
||
if pr_obj.state != "closed":
|
||
pr_obj.edit(state="closed")
|
||
logger.info(f"Updated patch attached and PR ensured closed (PR #{pr_number})")
|
||
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
|
||
except Exception as e:
|
||
logger.error(f"Failed to attach updated patch for PR {pr_key} to bug {bug_id}: {e}")
|
||
return JSONResponse({"status": "attach_failed", "bug_id": bug_id, "action": action})
|
||
|
||
return JSONResponse({"status": "noop", "action": action})
|
||
|
||
def _setup_routes(self):
|
||
@self.app.get("/healthz")
|
||
async def healthz():
|
||
return PlainTextResponse("ok")
|
||
|
||
@self.app.post("/webhook/github")
|
||
async def github_webhook(request: Request, x_hub_signature_256: str = Header(None), x_github_event: str = Header(None)):
|
||
body = await request.body()
|
||
secret = self.cfg["sync"]["github"]["webhook"]["secret"]
|
||
if not x_hub_signature_256:
|
||
logger.warning("Webhook missing signature")
|
||
raise HTTPException(status_code=401, detail="Missing signature")
|
||
self._verify_signature(secret, body, x_hub_signature_256)
|
||
try:
|
||
payload = json.loads(body.decode("utf-8"))
|
||
except Exception:
|
||
logger.warning("Invalid JSON in webhook")
|
||
raise HTTPException(status_code=400, detail="Invalid JSON")
|
||
if x_github_event != "pull_request":
|
||
logger.debug(f"Ignored event {x_github_event}")
|
||
return JSONResponse({"status": "ignored", "event": x_github_event})
|
||
return await self._handle_pr_event(payload)
|
||
|
||
# --------------------------
|
||
# CLI and workflows
|
||
# --------------------------
|
||
|
||
def load_config(path: Optional[str]) -> Dict[str, Any]:
|
||
cfg = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy
|
||
if path:
|
||
with open(path, "r") as f:
|
||
user_cfg = yaml.safe_load(f) or {}
|
||
cfg = merge_dicts(cfg, user_cfg)
|
||
return cfg
|
||
|
||
def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
|
||
out = dict(a)
|
||
for k, v in b.items():
|
||
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
|
||
out[k] = merge_dicts(out[k], v)
|
||
else:
|
||
out[k] = v
|
||
return out
|
||
|
||
def single_shot(cfg: Dict[str, Any], org_repo: str):
|
||
logger.info(f"Single-shot sync starting for {org_repo}")
|
||
org, repo = org_repo.split("/", 1)
|
||
state = StateStore(cfg["sync"]["state_path"])
|
||
gitea = GiteaClient(cfg["sync"]["gitea"]["base_url"], cfg["sync"]["gitea"]["token"])
|
||
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
|
||
mirror = GitMirror(cfg["sync"]["cache_path"])
|
||
engine = SyncEngine(cfg, state, gitea, gh, mirror)
|
||
ctx = engine.ensure_repos_and_mirror(org, repo)
|
||
logger.info(f"Single-shot sync completed for {org_repo}")
|
||
print(json.dumps({
|
||
"gitea_repo": ctx.gitea_repo.get("full_name") or f"{org}/{repo}",
|
||
"github_repo": ctx.github_repo.full_name
|
||
}, indent=2))
|
||
|
||
def continuous_mode(cfg: Dict[str, Any], interval: int):
|
||
state = StateStore(cfg["sync"]["state_path"])
|
||
gitea_cfg = cfg["sync"]["gitea"]
|
||
gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"])
|
||
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
|
||
mirror = GitMirror(cfg["sync"]["cache_path"])
|
||
engine = SyncEngine(cfg, state, gitea, gh, mirror)
|
||
org = gitea_cfg["org"]
|
||
logger.info(f"Starting continuous sync loop for Gitea org {org} every {interval}s")
|
||
stop = threading.Event()
|
||
|
||
def handle_sig(sig, frame):
|
||
logger.info("Shutting down continuous loop...")
|
||
stop.set()
|
||
signal.signal(signal.SIGINT, handle_sig)
|
||
signal.signal(signal.SIGTERM, handle_sig)
|
||
|
||
while not stop.is_set():
|
||
try:
|
||
repos = gitea.list_org_repos(org)
|
||
for r in repos:
|
||
name = r["name"]
|
||
if cfg["sync"].get("skip_forks", True) and r.get("fork", False):
|
||
logger.debug(f"Skipping fork {org}/{name}")
|
||
continue
|
||
try:
|
||
logger.debug(f"Syncing repo {org}/{name}")
|
||
engine.ensure_repos_and_mirror(org, name)
|
||
except Exception as e:
|
||
logger.error(f"Error syncing {org}/{name}: {e}")
|
||
except Exception as e:
|
||
logger.error(f"Scan error: {e}")
|
||
for _ in range(interval):
|
||
if stop.is_set():
|
||
break
|
||
time.sleep(1)
|
||
|
||
def validate_mode(cfg: Dict[str, Any], dry_run: bool):
|
||
state = StateStore(cfg["sync"]["state_path"])
|
||
gitea_cfg = cfg["sync"]["gitea"]
|
||
gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"])
|
||
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
|
||
org = gitea_cfg["org"]
|
||
logger.info(f"Validation scan starting for Gitea org {org} (dry_run={dry_run})")
|
||
repos = gitea.list_org_repos(org)
|
||
report = []
|
||
for r in repos:
|
||
name = r["name"]
|
||
target_full = f"{gh.owner}/{name}"
|
||
ok_presence = True
|
||
try:
|
||
repo_obj = gh.gh.get_repo(target_full)
|
||
except GithubException:
|
||
ok_presence = False
|
||
repo_obj = None
|
||
item = {
|
||
"repo": f"{org}/{name}",
|
||
"github_repo": target_full,
|
||
"exists_on_github": ok_presence,
|
||
"issues_disabled": None,
|
||
"merge_methods_disabled": None,
|
||
"webhook_present": None,
|
||
"branch_protection": None
|
||
}
|
||
if ok_presence and repo_obj is not None:
|
||
item["issues_disabled"] = (repo_obj.has_issues is False)
|
||
item["merge_methods_disabled"] = (not repo_obj.allow_merge_commit and not repo_obj.allow_squash_merge and not repo_obj.allow_rebase_merge)
|
||
wh_url = cfg["sync"]["github"]["webhook"]["url"]
|
||
item["webhook_present"] = any(h.config.get("url") == wh_url for h in repo_obj.get_hooks())
|
||
default_branch = r.get("default_branch") or cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
|
||
try:
|
||
b = repo_obj.get_branch(default_branch)
|
||
prot = b.protection
|
||
item["branch_protection"] = bool(prot and prot.enabled)
|
||
except GithubException:
|
||
item["branch_protection"] = False
|
||
report.append(item)
|
||
if not dry_run:
|
||
try:
|
||
mirror = GitMirror(cfg["sync"]["cache_path"])
|
||
engine = SyncEngine(cfg, state, gitea, gh, mirror)
|
||
ctx = engine.ensure_repos_and_mirror(org, name)
|
||
engine.enforce_settings_only(ctx)
|
||
except Exception as e:
|
||
logger.error(f"Validation fix failed for {org}/{name}: {e}")
|
||
|
||
print(json.dumps({"report": report}, indent=2))
|
||
logger.info("Validation scan complete")
|
||
|
||
def run_webhook_server(cfg: Dict[str, Any]):
|
||
state = StateStore(cfg["sync"]["state_path"])
|
||
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
|
||
bz = BugzillaClient(cfg["bugzilla"])
|
||
server = PRAutocloserServer(cfg, state, gh, bz)
|
||
url = cfg["sync"]["github"]["webhook"]["url"]
|
||
parsed_port = 8080
|
||
host = "0.0.0.0"
|
||
with contextlib.suppress(Exception):
|
||
from urllib.parse import urlparse
|
||
u = urlparse(url)
|
||
if u.port:
|
||
parsed_port = u.port
|
||
logger.info(f"Starting webhook server on {host}:{parsed_port}")
|
||
uvicorn.run(server.app, host=host, port=parsed_port)
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="Gitea→GitHub mirror and PR autocloser")
|
||
ap.add_argument("--config", help="Path to YAML config", default=None)
|
||
ap.add_argument("--mode", choices=["single", "continuous", "validate", "webhook"], required=True)
|
||
ap.add_argument("--gitea-repo", help="org/repo for single mode")
|
||
ap.add_argument("--interval", type=int, help="Polling interval seconds (continuous)", default=None)
|
||
ap.add_argument("--dry-run", action="store_true", help="Validation dry run")
|
||
ap.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"), help="DEBUG, INFO, WARNING, ERROR")
|
||
args = ap.parse_args()
|
||
|
||
configure_logging(args.log_level)
|
||
|
||
cfg = load_config(args.config)
|
||
|
||
if args.mode == "single":
|
||
if not args.gitea_repo:
|
||
logger.error("--gitea-repo org/repo is required for single mode")
|
||
sys.exit(2)
|
||
single_shot(cfg, args.gitea_repo)
|
||
|
||
elif args.mode == "continuous":
|
||
interval = args.interval or cfg["sync"]["interval"]
|
||
continuous_mode(cfg, interval)
|
||
|
||
elif args.mode == "validate":
|
||
validate_mode(cfg, dry_run=args.dry_run)
|
||
|
||
elif args.mode == "webhook":
|
||
run_webhook_server(cfg)
|
||
|
||
if __name__ == "__main__":
|
||
main() |