#!/usr/bin/env python3 import argparse import base64 import contextlib import dataclasses import datetime as dt import functools import hashlib import hmac import io import json import os import signal import sqlite3 import subprocess import sys import threading import time from typing import Any, Dict, List, Optional, Tuple # Third-party deps: # pip install pyyaml httpx PyGithub fastapi uvicorn import httpx import yaml from fastapi import FastAPI, Header, HTTPException, Request from fastapi.responses import JSONResponse, PlainTextResponse from github import Github, GithubException, Auth from github.Branch import Branch from github.Repository import Repository import uvicorn from urllib.parse import quote as urlquote from datetime import timezone def utc_now() -> dt.datetime: return dt.datetime.now(timezone.utc) def rfc3339_now() -> str: # Example: 2025-09-16T12:10:03Z return utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z") def log(msg: str): print(f"[{rfc3339_now()}] {msg}", flush=True) # -------------------------- # Configuration and defaults # -------------------------- DEFAULT_CONFIG = { "sync": { "source_of_truth": "gitea", "gitea": { "base_url": "https://src.koozali.org", "org": "smeserver", "token": os.environ.get("GITEA_TOKEN", ""), "create_if_missing": True, "new_repo_defaults": { "visibility": "public", "default_branch": "main", "seed_new_repo": True, "seed_commit": { "author_name": "Koozali Mirror Bot", "author_email": "noreply@koozali.org", "message": "Initialize repository (mirror seed)", "readme_content": "This repository is initialized by the Koozali mirror service.\nCanonical development happens on Gitea:\n- https://src.koozali.org/{org}/{repo}\nPlease file issues and patches via Bugzilla:\n- https://bugs.koozali.org\n", }, "description_template": "Canonical repository for {org}/{repo}", "homepage_template": "https://src.koozali.org/{org}/{repo}", }, "push_mirror": { "enable": True } }, "github": { "owner": "Koozali-SME-Server", # set your GitHub org/user "auth": { "mode": "pat", "token": os.environ.get("GITHUB_TOKEN", ""), }, "repo_defaults": { "mirror_visibility": "mirror_source", "topics": ["mirror", "read-only"], "description_template": "Read-only mirror of {gitea_repo_url}. Submit patches via Bugzilla: https://bugs.koozali.org", "homepage_template": "https://bugs.koozali.org", }, "branch_protection": { "apply_to": "default_branch", "include_admins": True, "allow_force_pushes": True, "allow_deletions": False, "restrict_push": { "users": [""], # replaced at runtime "teams": [], "apps": [] }, "required_checks": [], "required_reviews": { "required_approving_review_count": 0 } }, "webhook": { "mode": "server", "url": os.environ.get("GITHUB_WEBHOOK_URL", "http://koozali.bjsystems.co.uk/webhook/github"), "secret": os.environ.get("WEBHOOK_SECRET", "change-me"), "events": ["pull_request"] } }, "cache_path": os.environ.get("SYNC_CACHE_PATH", "./.sync-cache"), "state_path": os.environ.get("SYNC_STATE_PATH", "./sync_state.sqlite"), "concurrency": 4, "interval": 1800, "include": [], "exclude": [], "skip_forks": True }, "bugzilla": { "base_url": "https://bugs.koozali.org", "auth": { "mode": "basic", # or "api_key" "username": "brianr", "password": os.environ.get("BUGZILLA_PASSWORD", ""), }, "product": "SME11", "component_template": "{repo}", "component_fallback": "General", "auto_create_component": False, "groups": [], "attach_diff": True, "failure_policy": { "close_pr_on_bugzilla_failure": False, "label_on_bugzilla_failure": "bugzilla-needed" }, "templates": { "pr_comment_success": "Thanks for the contribution!\n\n" "This repository is a read-only mirror of the canonical repo on Gitea:\n" "- Canonical: {gitea_repo_url}\n" "- Please file and discuss changes in Bugzilla: {bugzilla_base_url}\n\n" "We created Bug {bug_id} to track this proposal:\n" "- {bug_url}\n\n" "This pull request will be closed here. Please follow up on the Bugzilla ticket for review and next steps. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n", "pr_comment_failure": "Thanks for the contribution!\n\n" "This repository is a read-only mirror of the canonical repo on Gitea:\n" "- Canonical: {gitea_repo_url}\n\n" "We were unable to create a Bugzilla ticket automatically at this time.\n" "Please open a bug at {bugzilla_base_url} (Product: SME11, Component: {repo}) and include:\n" "- GitHub PR: {pr_url}\n" "- Target branch: {target_branch}\n" "- Summary and rationale for the change\n\n" "This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n", "bug_summary": "[GH PR #{pr_number}] {org}/{repo}: {pr_title}", "bug_body": "Source\n" "- Canonical repo (Gitea): {gitea_repo_url}\n" "- GitHub mirror PR: {pr_url}\n" "- Submitted by: {github_user} ({github_user_url})\n" "- Opened: {created_at}\n\n" "Target and branch info\n" "- Base branch: {target_branch} ({base_sha})\n" "- Head branch: {source_branch} ({head_sha})\n" "- Commits in PR: {commit_count}\n" "- Compare: {compare_url}\n\n" "Project policy\n" "This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla. Any accepted changes will be applied to the Gitea repository and then mirrored back to GitHub.\n\n" "Submitter’s notes\n" "{pr_body}\n\n" "Attachments\n" "- The PR diff/patch is attached automatically by the mirror service. Subsequent updates to this PR will add new attachments to this bug.\n", "bug_update_comment": "Update from GitHub PR #{pr_number} for {org}/{repo}\n\n" "- New head: {head_sha} (base: {base_sha})\n" "- Commits in update: {commit_count}\n" "- Compare: {compare_url}\n" "- PR: {pr_url}\n\n" "A refreshed patch set has been attached to this bug. Original PR description follows (for context):\n" "{pr_body}\n", "pr_sync_short_comment": "Thanks for the update. We’ve attached a refreshed patch set to Bug {bug_id}: {bug_url}\n" "This PR remains closed; please continue discussion in Bugzilla.\n" } } } # -------------------------- # Utilities # -------------------------- def run(cmd: List[str], cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Tuple[int, str, str]: p = subprocess.Popen(cmd, cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out, err = p.communicate() return p.returncode, out, err def ensure_dir(path: str): os.makedirs(path, exist_ok=True) def safe_join(*parts) -> str: return os.path.normpath(os.path.join(*parts)) def hmac_sha256(secret: str, data: bytes) -> str: return "sha256=" + hmac.new(secret.encode("utf-8"), data, hashlib.sha256).hexdigest() # -------------------------- # State (SQLite) # -------------------------- class StateStore: def __init__(self, path: str): self.path = path ensure_dir(os.path.dirname(os.path.abspath(path)) or ".") self.conn = sqlite3.connect(path, check_same_thread=False) self.conn.execute("PRAGMA journal_mode=WAL;") self._init() def _init(self): cur = self.conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS pr_map ( pr_key TEXT PRIMARY KEY, bug_id INTEGER NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS repo_map ( gitea_repo_id INTEGER PRIMARY KEY, gitea_full_name TEXT NOT NULL, github_full_name TEXT NOT NULL, updated_at TEXT NOT NULL ) """) self.conn.commit() def set_pr_bug(self, pr_key: str, bug_id: int): now = rfc3339_now() cur = self.conn.cursor() cur.execute(""" INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at """, (pr_key, bug_id, now, now)) self.conn.commit() def get_pr_bug(self, pr_key: str) -> Optional[int]: cur = self.conn.cursor() cur.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,)) row = cur.fetchone() return row[0] if row else None def set_repo_map(self, gitea_repo_id: int, gitea_full_name: str, github_full_name: str): now = rfc3339_now() cur = self.conn.cursor() cur.execute(""" INSERT INTO repo_map (gitea_repo_id, gitea_full_name, github_full_name, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(gitea_repo_id) DO UPDATE SET gitea_full_name=excluded.gitea_full_name, github_full_name=excluded.github_full_name, updated_at=excluded.updated_at """, (gitea_repo_id, gitea_full_name, github_full_name, now)) self.conn.commit() def get_repo_map_by_id(self, gitea_repo_id: int) -> Optional[Tuple[str, str]]: cur = self.conn.cursor() cur.execute("SELECT gitea_full_name, github_full_name FROM repo_map WHERE gitea_repo_id=?", (gitea_repo_id,)) row = cur.fetchone() return (row[0], row[1]) if row else None # -------------------------- # Clients # -------------------------- class GiteaClient: def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") self.token = (token or "").strip() import re self.token = re.sub(r'^(token|bearer)\s+', '', (token or ''), flags=re.I).strip() log(f"Gitea token len={len(self.token)} prefix={self.token[:6]+'...' if self.token else ''}") def _url(self, path: str) -> str: return f"{self.base_url}{path}" def _request(self, method: str, path: str, *, json_body=None, params=None, timeout=30) -> httpx.Response: url = self._url(path) # 1) token scheme headers = {"Authorization": f"token {self.token}"} if self.token else {} r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout) if r.status_code == 401 and self.token: # 2) Bearer scheme headers = {"Authorization": f"Bearer {self.token}"} r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout) if r.status_code == 401: # 3) Query param fallback (proxy may strip Authorization) qp = dict(params or {}) qp["access_token"] = self.token r = httpx.request(method, url, json=json_body, params=qp, timeout=timeout) return r def get_repo(self, owner: str, repo: str): r = self._request("GET", f"/api/v1/repos/{owner}/{repo}") if r.status_code == 404: return None if r.status_code == 401: raise RuntimeError(f"Gitea 401 for GET /repos/{owner}/{repo}. Token user may lack access to org '{owner}', or proxy is stripping Authorization.") r.raise_for_status() return r.json() def create_org_repo(self, org: str, name: str, private: bool, description: str, homepage: str, default_branch: str, auto_init: bool): payload = { "name": name, "private": private, "description": description, "website": homepage, "default_branch": default_branch, "auto_init": auto_init } r = self._request("POST", f"/api/v1/orgs/{org}/repos", json_body=payload, timeout=60) if r.status_code == 401: raise RuntimeError(f"Gitea 401 creating repo in org '{org}'. Ensure token user is org member with create permissions and token scopes include write:organization.") if r.status_code not in (200, 201): raise RuntimeError(f"Gitea create repo failed: {r.status_code} {r.text}") return r.json() def list_org_repos(self, org: str): out, page = [], 1 while True: r = self._request("GET", f"/api/v1/orgs/{org}/repos", params={"limit": 50, "page": page}) r.raise_for_status() items = r.json() if not items: break out.extend(items) page += 1 return out def get_push_mirrors(self, owner: str, repo: str): r = self._request("GET", f"/api/v1/repos/{owner}/{repo}/push_mirrors") if r.status_code == 404: return [] r.raise_for_status() return r.json() def create_push_mirror(self, owner: str, repo: str, remote_address: str, username: Optional[str], password: Optional[str], sync_on_commit: bool = True, interval: str = "24h0m0s") -> bool: url = self._url(f"/api/v1/repos/{owner}/{repo}/push_mirrors") payload = { "remote_address": remote_address, "remote_username": username or "", "remote_password": password or "", "sync_on_commit": sync_on_commit, "interval": interval, # required by Gitea 1.24 } r = httpx.post(url, headers=self._headers_token_first(), json=payload, timeout=30) if r.status_code in (200, 201): return True if r.status_code == 404: log("Gitea push mirror API not available on this server/version.") return False log(f"Failed to create push mirror on Gitea: {r.status_code} {r.text}") return False class GitHubClient: def __init__(self, owner: str, token: str): self.owner = owner self.gh = Github(auth=Auth.Token(token), per_page=100) self.token = token log(f"Github token {token} ") try: self.auth_user = self.gh.get_user().login log(f"login to github {self.auth_user} {self.owner}") except Exception: self.auth_user = "" def make_push_url(self, repo_name: str) -> str: user = self.auth_user or "git" return f"https://{urlquote(user)}:{urlquote(self.token)}@github.com/{self.owner}/{repo_name}.git" def get_or_create_repo(self, name: str, visibility: str, description: str, homepage: str) -> Repository: try: org = self.gh.get_organization(self.owner) repo = None with contextlib.suppress(GithubException): repo = org.get_repo(name) if repo is None: repo = org.create_repo( name=name, description=description or "", homepage=homepage or "", private=(visibility == "private"), has_issues=False, has_projects=False, has_wiki=False, auto_init=False ) else: repo.edit( name=name, description=description or "", homepage=homepage or "", private=(visibility == "private"), has_issues=False ) return repo except GithubException as e: # If owner is a user, not an org if e.status == 404: user = self.gh.get_user(self.owner) repo = None with contextlib.suppress(GithubException): repo = user.get_repo(name) if repo is None: repo = user.create_repo( name=name, description=description or "", homepage=homepage or "", private=(visibility == "private"), has_issues=False, auto_init=False ) else: repo.edit( name=name, description=description or "", homepage=homepage or "", private=(visibility == "private"), has_issues=False ) return repo raise def enforce_repo_settings(self, repo: Repository, topics: List[str]): # Ensure at least one merge method is enabled (GitHub requirement) try: repo.edit( allow_merge_commit=True, # keep one enabled allow_squash_merge=False, allow_rebase_merge=False, allow_auto_merge=False, # optional explicit has_issues=False, ) except GithubException as e: # Fallback in case older API fields differ, or we hit the 422 anyway if getattr(e, "status", None) == 422: # Retry with a different single merge method just in case repo.edit( allow_merge_commit=False, allow_squash_merge=True, allow_rebase_merge=False, has_issues=False, ) else: raise # Topics if topics: with contextlib.suppress(GithubException): repo.replace_topics(topics) def ensure_webhook(self, repo, url: str, secret: str, events: list[str]): desired_cfg = { "url": url, "content_type": "json", "secret": secret, # always set; GitHub won’t echo it back "insecure_ssl": "0", # "0" or "1" as strings } hooks = list(repo.get_hooks()) for h in hooks: cfg = h.config or {} if cfg.get("url") == url: # Update in place. Must pass name="web" as first arg. try: h.edit("web", config=desired_cfg, events=events, active=True) except Exception as e: # Some PyGithub versions need add/remove events instead; fallback to setting full list h.edit("web", config=desired_cfg, events=events, active=True) return # Create if missing repo.create_hook( name="web", config=desired_cfg, events=events, active=True, ) def ensure_branch_protection( self, repo, branch_name: str, include_admins: bool, allow_force_pushes: bool, allow_deletions: bool, restrict_users: list[str], restrict_teams: list[str], restrict_apps: list[str], ): # Ensure branch exists try: repo.get_branch(branch_name) except GithubException as e: log(f"Branch {branch_name} not found on {repo.full_name}: {e}") return owner = repo.owner.login name = repo.name base = f"https://api.github.com/repos/{owner}/{name}/branches/{branch_name}" headers = { "Authorization": f"Bearer {self.token}", "Accept": "application/vnd.github+json, application/vnd.github.luke-cage-preview+json", "X-GitHub-Api-Version": "2022-11-28", } # 1) Base protection r = httpx.put( f"{base}/protection", headers=headers, json={ "required_status_checks": None, "enforce_admins": bool(include_admins), "required_pull_request_reviews": None, "restrictions": None, }, timeout=30, ) if r.status_code not in (200, 201): log(f"Failed to set base protection on {repo.full_name}@{branch_name}: {r.status_code} {r.text}") return # 2) Allow force pushes r_fp = httpx.put( f"{base}/protection/allow_force_pushes", headers=headers, json={"enabled": bool(allow_force_pushes)}, timeout=15, ) if r_fp.status_code not in (200, 201): log(f"Failed to set allow_force_pushes on {repo.full_name}@{branch_name}: {r_fp.status_code} {r_fp.text}") # 3) Allow deletions r_del = httpx.put( f"{base}/protection/allow_deletions", headers=headers, json={"enabled": bool(allow_deletions)}, timeout=15, ) if r_del.status_code not in (200, 201): log(f"Failed to set allow_deletions on {repo.full_name}@{branch_name}: {r_del.status_code} {r_del.text}") # If we couldn’t enable force pushes, remove protection to avoid blocking the mirror if allow_force_pushes and r_fp.status_code == 404: httpx.delete(f"{base}/protection", headers=headers, timeout=15) log(f"Removed protection on {repo.full_name}@{branch_name} to prevent blocking mirror (force pushes endpoint 404).") return # 4) Restrictions (org repos only) is_org = getattr(repo.owner, "type", None) == "Organization" if is_org and (restrict_users or restrict_teams or restrict_apps): r_res = httpx.put( f"{base}/protection/restrictions", headers=headers, json={"users": restrict_users or [], "teams": restrict_teams or [], "apps": restrict_apps or []}, timeout=30, ) if r_res.status_code not in (200, 201): log(f"Failed to set push restrictions on {repo.full_name}@{branch_name}: {r_res.status_code} {r_res.text}") log(f"Repo owner type for {repo.full_name} is {getattr(repo.owner,'type',None)}") def close_pr_with_comment_and_label(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None): pr = repo.get_pull(pr_number) pr.create_issue_comment(comment) if label: with contextlib.suppress(GithubException): repo.create_label(name=label, color="ededed") with contextlib.suppress(GithubException): pr.as_issue().add_to_labels(label) if pr.state != "closed": pr.edit(state="closed") def comment_on_pr(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None): pr = repo.get_pull(pr_number) pr.create_issue_comment(comment) if label: with contextlib.suppress(GithubException): repo.create_label(name=label, color="ededed") with contextlib.suppress(GithubException): pr.as_issue().add_to_labels(label) # -------------------------- # Bugzilla integration # -------------------------- class BugzillaClient: def __init__(self, cfg: Dict[str, Any]): self.base_url = cfg["base_url"].rstrip("/") self.auth_mode = cfg["auth"]["mode"] self.username = cfg["auth"].get("username", "") self.password = cfg["auth"].get("password", "") self.api_key = cfg["auth"].get("api_key", "") self.product = cfg["product"] self.component_template = cfg["component_template"] self.component_fallback = cfg["component_fallback"] self.auto_create_component = bool(cfg.get("auto_create_component", False)) self.groups = cfg.get("groups", []) self.attach_diff = bool(cfg.get("attach_diff", True)) self.templates = cfg["templates"] def _headers(self) -> Dict[str, str]: headers = {"Accept": "application/json"} if self.auth_mode == "api_key" and self.api_key: headers["X-BUGZILLA-API-KEY"] = self.api_key return headers def _auth(self) -> Optional[Tuple[str, str]]: if self.auth_mode == "basic": return (self.username, self.password) return None def create_bug(self, summary: str, description: str, component: str, visibility_groups: Optional[List[str]] = None) -> int: url = f"{self.base_url}/rest/bug" payload = { "product": self.product, "component": component, "summary": summary, "description": description, } if visibility_groups: payload["groups"] = visibility_groups r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60) if r.status_code not in (200, 201): raise RuntimeError(f"Bugzilla create bug failed: {r.status_code} {r.text}") data = r.json() bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"]) if not bug_id: raise RuntimeError(f"Bugzilla response missing bug id: {data}") return int(bug_id) def add_attachment(self, bug_id: int, file_name: str, content_type: str, summary: str, data_bytes: bytes): url = f"{self.base_url}/rest/bug/{bug_id}/attachment" payload = { "ids": [bug_id], "data": base64.b64encode(data_bytes).decode("ascii"), "file_name": file_name, "summary": summary, "content_type": content_type, "is_patch": True } r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=120) if r.status_code not in (200, 201): raise RuntimeError(f"Bugzilla add attachment failed: {r.status_code} {r.text}") def add_comment(self, bug_id: int, comment: str): url = f"{self.base_url}/rest/bug/{bug_id}/comment" payload = {"comment": comment} r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60) if r.status_code not in (200, 201): raise RuntimeError(f"Bugzilla add comment failed: {r.status_code} {r.text}") # -------------------------- # Git mirror operations # -------------------------- class GitMirror: def __init__(self, cache_dir: str): self.cache_dir = cache_dir ensure_dir(cache_dir) def local_path(self, org: str, repo: str) -> str: return safe_join(self.cache_dir, f"{org}--{repo}.git") def ensure_local_mirror(self, org: str, repo: str, gitea_clone_url: str): path = self.local_path(org, repo) if not os.path.isdir(path): ensure_dir(os.path.dirname(path)) rc, out, err = run(["git", "clone", "--mirror", gitea_clone_url, path]) if rc != 0: raise RuntimeError(f"git clone --mirror failed: {err.strip()}") else: # Ensure origin URL is correct and fetch rc, out, err = run(["git", "remote", "set-url", "origin", gitea_clone_url], cwd=path) if rc != 0: log(f"Warning: could not set origin URL: {err.strip()}") # Fetch rc, out, err = run(["git", "fetch", "--prune", "--tags", "--force"], cwd=path) if rc != 0: raise RuntimeError(f"git fetch failed: {err.strip()}") def push_to_github(self, org: str, repo: str, github_url: str): path = self.local_path(org, repo) # Add or update 'github' remote remotes_rc, remotes_out, _ = run(["git", "remote"], cwd=path) if remotes_rc != 0: raise RuntimeError("git remote list failed") remotes = set(remotes_out.strip().splitlines()) if "github" not in remotes: rc, out, err = run(["git", "remote", "add", "github", github_url], cwd=path) if rc != 0: raise RuntimeError(f"git remote add github failed: {err.strip()}") else: rc, out, err = run(["git", "remote", "set-url", "github", github_url], cwd=path) if rc != 0: raise RuntimeError(f"git remote set-url github failed: {err.strip()}") # Push refspecs for refspec in ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]: rc, out, err = run(["git", "push", "--prune", "github", refspec], cwd=path) if rc != 0: raise RuntimeError(f"git push failed for {refspec}: {err.strip()}") # -------------------------- # Template rendering # -------------------------- def render_template(tpl: str, vars: Dict[str, Any]) -> str: try: return tpl.format(**vars) except Exception: # Fallback: leave placeholders as-is return tpl # -------------------------- # Sync engine # -------------------------- @dataclasses.dataclass class RepoContext: org: str repo: str gitea_repo: Dict[str, Any] github_repo: Repository class SyncEngine: def __init__(self, cfg: Dict[str, Any], state: StateStore, gitea: GiteaClient, gh: GitHubClient, mirror: GitMirror): self.cfg = cfg self.state = state self.gitea = gitea self.gh = gh self.mirror = mirror def _derive_texts(self, org: str, repo: str, gitea_http_url: str) -> Tuple[str, str, str, str]: gcfg = self.cfg["sync"]["gitea"]["new_repo_defaults"] hcfg = self.cfg["sync"]["github"]["repo_defaults"] g_desc = render_template(gcfg.get("description_template", "") or "", {"org": org, "repo": repo}) g_home = render_template(gcfg.get("homepage_template", "") or "", {"org": org, "repo": repo}) gh_desc = render_template(hcfg.get("description_template", "") or "", {"gitea_repo_url": gitea_http_url}) gh_home = render_template(hcfg.get("homepage_template", "") or "", {"gitea_repo_url": gitea_http_url}) return g_desc, g_home, gh_desc, gh_home def ensure_repos_and_mirror(self, org: str, repo: str) -> RepoContext: # 1) Gitea: get or create gitea_repo = self.gitea.get_repo(org, repo) gcfg = self.cfg["sync"]["gitea"] if gitea_repo is None: if not gcfg.get("create_if_missing", False): raise RuntimeError(f"Gitea repo {org}/{repo} not found and auto-creation disabled") vis = gcfg["new_repo_defaults"]["visibility"] default_branch = gcfg["new_repo_defaults"]["default_branch"] auto_init = bool(gcfg["new_repo_defaults"]["seed_new_repo"]) g_desc, g_home, _, _ = self._derive_texts(org, repo, f"{self.gitea.base_url}/{org}/{repo}") log(f"Creating Gitea repo {org}/{repo} (visibility={vis}, auto_init={auto_init})") gitea_repo = self.gitea.create_org_repo( org=org, name=repo, private=(vis == "private"), description=g_desc, homepage=g_home, default_branch=default_branch, auto_init=auto_init ) # Build Gitea clone URLs gitea_http_clone = gitea_repo.get("clone_url") or f"{self.gitea.base_url}/{org}/{repo}.git" gitea_ssh_clone = gitea_repo.get("ssh_url") or gitea_http_clone gitea_html_url = gitea_repo.get("html_url") or f"{self.gitea.base_url}/{org}/{repo}" # 2) GitHub: ensure repo mirror_vis_policy = self.cfg["sync"]["github"]["repo_defaults"]["mirror_visibility"] visibility = "private" if (gitea_repo.get("private") and mirror_vis_policy == "mirror_source") else "public" g_desc, g_home, gh_desc, gh_home = self._derive_texts(org, repo, gitea_html_url) github_repo = self.gh.get_or_create_repo(name=repo, visibility=visibility, description=gh_desc, homepage=gh_home) topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", []) self.gh.enforce_repo_settings(github_repo, topics=topics) # 3) Apply webhook wh = self.cfg["sync"]["github"]["webhook"] if wh.get("mode") == "server": self.gh.ensure_webhook(github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"])) # 4) Mirror content # Prefer HTTPS clone on Gitea with token for private repos; but for fetch we can use anonymous if public gitea_clone = gitea_http_clone if gitea_repo.get("private"): # embed token for read (not ideal; better to use SSH deploy key) token = self.cfg["sync"]["gitea"]["token"] if token: gitea_clone = gitea_http_clone.replace("://", f"://{token}@") self.mirror.ensure_local_mirror(org, repo, gitea_clone) # GitHub push URL with PAT gh_push = self.gh.make_push_url(repo) #f"https://{self.auth_user}:{self.token}@github.com/{self.owner}/{repo}.git" self.mirror.push_to_github(org, repo, gh_push) # 5) Default branch and protection default_branch = gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"] bp = self.cfg["sync"]["github"]["branch_protection"] # Replace placeholder in restrict list restrict_users = bp["restrict_push"].get("users", []) restrict_teams = bp["restrict_push"].get("teams", []) restrict_apps = bp["restrict_push"].get("apps", []) if "" in restrict_users and self.gh.auth_user != "": restrict_users = [u for u in restrict_users if u != ""] + [self.gh.auth_user] self.gh.ensure_branch_protection( repo=github_repo, branch_name=default_branch, include_admins=bp.get("include_admins", True), allow_force_pushes=bp.get("allow_force_pushes", True), allow_deletions=bp.get("allow_deletions", False), restrict_users=restrict_users, restrict_teams=restrict_teams, restrict_apps=restrict_apps ) # 6) Gitea push mirror (optional) if self.cfg["sync"]["gitea"].get("push_mirror", {}).get("enable", True): # Use HTTPS with PAT to GitHub. Use PAT user/token gh_pat = self.gh.token remote_address = f"https://{gh_pat}:x-oauth-basic@github.com/{self.gh.owner}/{repo}.git" with contextlib.suppress(Exception): self.gitea.create_push_mirror(owner=org, repo=repo, remote_address=remote_address, username=None, password=None, sync_on_commit=True) # Map repo IDs for rename detection later if gitea_repo.get("id"): self.state.set_repo_map(int(gitea_repo["id"]), f"{org}/{repo}", f"{self.gh.owner}/{repo}") return RepoContext(org=org, repo=repo, gitea_repo=gitea_repo, github_repo=github_repo) def enforce_settings_only(self, ctx: RepoContext): topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", []) self.gh.enforce_repo_settings(ctx.github_repo, topics=topics) wh = self.cfg["sync"]["github"]["webhook"] if wh.get("mode") == "server": self.gh.ensure_webhook(ctx.github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"])) default_branch = ctx.gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"] bp = self.cfg["sync"]["github"]["branch_protection"] restrict_users = bp["restrict_push"].get("users", []) if "" in restrict_users and self.gh.auth_user != "": restrict_users = [u for u in restrict_users if u != ""] + [self.gh.auth_user] self.gh.ensure_branch_protection( repo=ctx.github_repo, branch_name=default_branch, include_admins=bp.get("include_admins", True), allow_force_pushes=bp.get("allow_force_pushes", True), allow_deletions=bp.get("allow_deletions", False), restrict_users=restrict_users, restrict_teams=bp["restrict_push"].get("teams", []), restrict_apps=bp["restrict_push"].get("apps", []) ) # -------------------------- # Webhook server (PR autocloser) # -------------------------- class PRAutocloserServer: def __init__(self, cfg: Dict[str, Any], state: StateStore, gh_client: GitHubClient, bz_client: BugzillaClient): self.cfg = cfg self.state = state self.ghc = gh_client self.bzc = bz_client self.app = FastAPI() self._setup_routes() def _verify_signature(self, secret: str, body: bytes, signature: str): expected = hmac_sha256(secret, body) if not hmac.compare_digest(expected, signature): raise HTTPException(status_code=401, detail="Invalid signature") async def _handle_pr_event(self, payload: Dict[str, Any]): action = payload.get("action") if action not in ("opened", "reopened", "synchronize", "ready_for_review"): return JSONResponse({"status": "ignored", "action": action}) repo_full = payload["repository"]["full_name"] # owner/repo on GitHub owner = payload["repository"]["owner"]["login"] repo = payload["repository"]["name"] pr = payload["pull_request"] pr_number = pr["number"] pr_state = pr["state"] pr_url = pr["html_url"] pr_title = pr.get("title") or "" pr_body = pr.get("body") or "" github_user = pr["user"]["login"] github_user_url = pr["user"]["html_url"] base = pr["base"] head = pr["head"] target_branch = base["ref"] source_branch = f"{head.get('repo', {}).get('owner', {}).get('login', '')}:{head['ref']}" if head.get("repo") else head["ref"] base_sha = base["sha"][:7] head_sha = head["sha"][:7] created_at = pr.get("created_at") or rfc3339_now() labels = [l["name"] for l in pr.get("labels", [])] if pr.get("labels") else [] compare_url = pr.get("html_url") + "/files" org = self.cfg["sync"]["gitea"]["org"] gitea_repo_url = f"{self.cfg['sync']['gitea']['base_url']}/{org}/{repo}" bugzilla_base_url = self.cfg["bugzilla"]["base_url"] variables = { "bugzilla_base_url": bugzilla_base_url, "bug_id": "", "bug_url": "", "org": org, "repo": repo, "gitea_repo_url": gitea_repo_url, "github_owner": owner, "pr_number": pr_number, "pr_title": pr_title, "pr_url": pr_url, "github_user": github_user, "github_user_url": github_user_url, "source_branch": source_branch, "target_branch": target_branch, "head_sha": head_sha, "base_sha": base_sha, "created_at": created_at, "pr_body": pr_body, "commit_count": pr.get("commits", ""), "compare_url": compare_url, "labels": ", ".join(labels) } # Fetch repo via PyGithub repo_obj = self.ghc.gh.get_repo(repo_full) # PR key pr_key = f"{repo_full}#{pr_number}" # Ensure bug mapping or create bug (on opened/reopened/ready_for_review; on synchronize attach) bug_id = self.state.get_pr_bug(pr_key) if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"): # Attempt to create bug component = self.cfg["bugzilla"]["component_template"].format(repo=repo) bug_summary = render_template(self.cfg["bugzilla"]["templates"]["bug_summary"], variables) bug_body = render_template(self.cfg["bugzilla"]["templates"]["bug_body"], variables) create_ok = True try: bug_id = self.bzc.create_bug(bug_summary, bug_body, component, visibility_groups=self.cfg["bugzilla"].get("groups", [])) self.state.set_pr_bug(pr_key, bug_id) log(f"Created Bugzilla bug {bug_id} for PR {pr_key}") except Exception as e: log(f"Bugzilla create bug failed for PR {pr_key}: {e}") create_ok = False if create_ok: # Attach diff/patch if enabled if self.cfg["bugzilla"].get("attach_diff", True): try: # Pull .patch from GitHub API # Using GitHub’s patch endpoint requires auth if private; use token from gh client api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}" headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"} r = httpx.get(api_patch_url, headers=headers, timeout=120) r.raise_for_status() patch_bytes = r.content self.bzc.add_attachment( bug_id=bug_id, file_name=f"PR-{pr_number}-{head_sha}.patch", content_type="text/x-patch", summary=f"Patch for PR #{pr_number} ({head_sha})", data_bytes=patch_bytes ) except Exception as e: log(f"Failed to attach patch for PR {pr_key} to bug {bug_id}: {e}") # Post success comment and close PR variables["bug_id"] = str(bug_id) variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}" comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_success"], variables) self.ghc.close_pr_with_comment_and_label(repo_obj, pr_number, comment, label=None) return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action}) else: # Post failure comment and keep PR open (policy) comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_failure"], variables) label = self.cfg["bugzilla"]["failure_policy"].get("label_on_bugzilla_failure") or None self.ghc.comment_on_pr(repo_obj, pr_number, comment, label=label) return JSONResponse({"status": "bugzilla_failed", "action": action}) elif bug_id is not None and action == "synchronize": # Attach updated diff and keep PR closed with optional short comment try: api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}" headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"} r = httpx.get(api_patch_url, headers=headers, timeout=120) r.raise_for_status() patch_bytes = r.content self.bzc.add_attachment( bug_id=bug_id, file_name=f"PR-{pr_number}-{head_sha}.patch", content_type="text/x-patch", summary=f"Updated patch for PR #{pr_number} ({head_sha})", data_bytes=patch_bytes ) # Optional brief PR comment short_tpl = self.cfg["bugzilla"]["templates"].get("pr_sync_short_comment") if short_tpl: variables["bug_id"] = str(bug_id) variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}" self.ghc.comment_on_pr(repo_obj, pr_number, render_template(short_tpl, variables), label=None) # Ensure PR is closed (in case re-opened) pr_obj = repo_obj.get_pull(pr_number) if pr_obj.state != "closed": pr_obj.edit(state="closed") return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action}) except Exception as e: log(f"Failed to attach updated patch for PR {pr_key} to bug {bug_id}: {e}") return JSONResponse({"status": "attach_failed", "bug_id": bug_id, "action": action}) return JSONResponse({"status": "noop", "action": action}) def _setup_routes(self): @self.app.get("/healthz") async def healthz(): return PlainTextResponse("ok") @self.app.post("/webhook/github") async def github_webhook(request: Request, x_hub_signature_256: str = Header(None), x_github_event: str = Header(None)): body = await request.body() secret = self.cfg["sync"]["github"]["webhook"]["secret"] if not x_hub_signature_256: raise HTTPException(status_code=401, detail="Missing signature") self._verify_signature(secret, body, x_hub_signature_256) try: payload = json.loads(body.decode("utf-8")) except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") if x_github_event != "pull_request": return JSONResponse({"status": "ignored", "event": x_github_event}) return await self._handle_pr_event(payload) # -------------------------- # CLI and workflows # -------------------------- def load_config(path: Optional[str]) -> Dict[str, Any]: cfg = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy if path: with open(path, "r") as f: user_cfg = yaml.safe_load(f) or {} cfg = merge_dicts(cfg, user_cfg) # Ensure derived defaults return cfg def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: out = dict(a) for k, v in b.items(): if k in out and isinstance(out[k], dict) and isinstance(v, dict): out[k] = merge_dicts(out[k], v) else: out[k] = v return out def single_shot(cfg: Dict[str, Any], org_repo: str): org, repo = org_repo.split("/", 1) state = StateStore(cfg["sync"]["state_path"]) gitea = GiteaClient(cfg["sync"]["gitea"]["base_url"], cfg["sync"]["gitea"]["token"]) gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) mirror = GitMirror(cfg["sync"]["cache_path"]) engine = SyncEngine(cfg, state, gitea, gh, mirror) ctx = engine.ensure_repos_and_mirror(org, repo) # Print summary print(json.dumps({ "gitea_repo": ctx.gitea_repo.get("full_name") or f"{org}/{repo}", "github_repo": ctx.github_repo.full_name }, indent=2)) def continuous_mode(cfg: Dict[str, Any], interval: int): state = StateStore(cfg["sync"]["state_path"]) gitea_cfg = cfg["sync"]["gitea"] gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"]) gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) mirror = GitMirror(cfg["sync"]["cache_path"]) engine = SyncEngine(cfg, state, gitea, gh, mirror) org = gitea_cfg["org"] log(f"Starting continuous sync loop for Gitea org {org} every {interval}s") stop = threading.Event() def handle_sig(sig, frame): log("Shutting down continuous loop...") stop.set() signal.signal(signal.SIGINT, handle_sig) signal.signal(signal.SIGTERM, handle_sig) while not stop.is_set(): try: repos = gitea.list_org_repos(org) for r in repos: name = r["name"] if cfg["sync"].get("skip_forks", True) and r.get("fork", False): continue try: engine.ensure_repos_and_mirror(org, name) except Exception as e: log(f"Error syncing {org}/{name}: {e}") except Exception as e: log(f"Scan error: {e}") # Sleep with stop check for _ in range(interval): if stop.is_set(): break time.sleep(1) def validate_mode(cfg: Dict[str, Any], dry_run: bool): state = StateStore(cfg["sync"]["state_path"]) gitea_cfg = cfg["sync"]["gitea"] gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"]) gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) org = gitea_cfg["org"] repos = gitea.list_org_repos(org) report = [] for r in repos: name = r["name"] target_full = f"{gh.owner}/{name}" ok_presence = True try: repo_obj = gh.gh.get_repo(target_full) except GithubException: ok_presence = False repo_obj = None item = { "repo": f"{org}/{name}", "github_repo": target_full, "exists_on_github": ok_presence, "issues_disabled": None, "merge_methods_disabled": None, "webhook_present": None, "branch_protection": None } if ok_presence and repo_obj is not None: item["issues_disabled"] = (repo_obj.has_issues is False) item["merge_methods_disabled"] = (not repo_obj.allow_merge_commit and not repo_obj.allow_squash_merge and not repo_obj.allow_rebase_merge) # Webhook presence wh_url = cfg["sync"]["github"]["webhook"]["url"] item["webhook_present"] = any(h.config.get("url") == wh_url for h in repo_obj.get_hooks()) # Branch protection check default_branch = r.get("default_branch") or cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"] try: b = repo_obj.get_branch(default_branch) prot = b.protection item["branch_protection"] = bool(prot and prot.enabled) except GithubException: item["branch_protection"] = False report.append(item) # Optionally fix drift (not fully exhaustive to keep concise) if not dry_run: try: mirror = GitMirror(cfg["sync"]["cache_path"]) engine = SyncEngine(cfg, state, gitea, gh, mirror) ctx = engine.ensure_repos_and_mirror(org, name) engine.enforce_settings_only(ctx) except Exception as e: log(f"Validation fix failed for {org}/{name}: {e}") print(json.dumps({"report": report}, indent=2)) def run_webhook_server(cfg: Dict[str, Any]): state = StateStore(cfg["sync"]["state_path"]) gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) bz = BugzillaClient(cfg["bugzilla"]) server = PRAutocloserServer(cfg, state, gh, bz) url = cfg["sync"]["github"]["webhook"]["url"] parsed_port = 8080 host = "0.0.0.0" # Infer port from URL if provided (basic parsing) with contextlib.suppress(Exception): from urllib.parse import urlparse u = urlparse(url) if u.port: parsed_port = u.port log(f"Starting webhook server on port {parsed_port}") uvicorn.run(server.app, host=host, port=parsed_port) def main(): ap = argparse.ArgumentParser(description="Gitea→GitHub mirror and PR autocloser") ap.add_argument("--config", help="Path to YAML config", default=None) ap.add_argument("--mode", choices=["single", "continuous", "validate", "webhook"], required=True) ap.add_argument("--gitea-repo", help="org/repo for single mode") ap.add_argument("--interval", type=int, help="Polling interval seconds (continuous)", default=None) ap.add_argument("--dry-run", action="store_true", help="Validation dry run") args = ap.parse_args() cfg = load_config(args.config) if args.mode == "single": if not args.gitea_repo: print("--gitea-repo org/repo is required for single mode", file=sys.stderr) sys.exit(2) single_shot(cfg, args.gitea_repo) elif args.mode == "continuous": interval = args.interval or cfg["sync"]["interval"] continuous_mode(cfg, interval) elif args.mode == "validate": validate_mode(cfg, dry_run=args.dry_run) elif args.mode == "webhook": run_webhook_server(cfg) if __name__ == "__main__": main()