From 761045bd6291a0ccdfa4253750b129441aedcadd Mon Sep 17 00:00:00 2001 From: Brian Read Date: Tue, 16 Sep 2025 14:58:42 +0200 Subject: [PATCH] Initial upload of gitea to github sync python3 program Creates github repo ok, not yet fully tested --- gitea_to_github_sync.py | 1203 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1203 insertions(+) create mode 100644 gitea_to_github_sync.py diff --git a/gitea_to_github_sync.py b/gitea_to_github_sync.py new file mode 100644 index 0000000..1fdd9bd --- /dev/null +++ b/gitea_to_github_sync.py @@ -0,0 +1,1203 @@ +#!/usr/bin/env python3 +import argparse +import base64 +import contextlib +import dataclasses +import datetime as dt +import functools +import hashlib +import hmac +import io +import json +import os +import signal +import sqlite3 +import subprocess +import sys +import threading +import time +from typing import Any, Dict, List, Optional, Tuple + +# Third-party deps: +# pip install pyyaml httpx PyGithub fastapi uvicorn +import httpx +import yaml +from fastapi import FastAPI, Header, HTTPException, Request +from fastapi.responses import JSONResponse, PlainTextResponse +from github import Github, GithubException, Auth +from github.Branch import Branch +from github.Repository import Repository +import uvicorn +from urllib.parse import quote as urlquote +from datetime import timezone + +def utc_now() -> dt.datetime: + return dt.datetime.now(timezone.utc) + +def rfc3339_now() -> str: + # Example: 2025-09-16T12:10:03Z + return utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z") + +def log(msg: str): + print(f"[{rfc3339_now()}] {msg}", flush=True) + +# -------------------------- +# Configuration and defaults +# -------------------------- + +DEFAULT_CONFIG = { + "sync": { + "source_of_truth": "gitea", + "gitea": { + "base_url": "https://src.koozali.org", + "org": "smeserver", + "token": os.environ.get("GITEA_TOKEN", ""), + "create_if_missing": True, + "new_repo_defaults": { + "visibility": "public", + "default_branch": "main", + "seed_new_repo": True, + "seed_commit": { + "author_name": "Koozali Mirror Bot", + "author_email": "noreply@koozali.org", + "message": "Initialize repository (mirror seed)", + "readme_content": "This repository is initialized by the Koozali mirror service.\nCanonical development happens on Gitea:\n- https://src.koozali.org/{org}/{repo}\nPlease file issues and patches via Bugzilla:\n- https://bugs.koozali.org\n", + }, + "description_template": "Canonical repository for {org}/{repo}", + "homepage_template": "https://src.koozali.org/{org}/{repo}", + }, + "push_mirror": { + "enable": True + } + }, + "github": { + "owner": "Koozali-SME-Server", # set your GitHub org/user + "auth": { + "mode": "pat", + "token": os.environ.get("GITHUB_TOKEN", ""), + }, + "repo_defaults": { + "mirror_visibility": "mirror_source", + "topics": ["mirror", "read-only"], + "description_template": "Read-only mirror of {gitea_repo_url}. Submit patches via Bugzilla: https://bugs.koozali.org", + "homepage_template": "https://bugs.koozali.org", + }, + "branch_protection": { + "apply_to": "default_branch", + "include_admins": True, + "allow_force_pushes": True, + "allow_deletions": False, + "restrict_push": { + "users": [""], # replaced at runtime + "teams": [], + "apps": [] + }, + "required_checks": [], + "required_reviews": { + "required_approving_review_count": 0 + } + }, + "webhook": { + "mode": "server", + "url": os.environ.get("GITHUB_WEBHOOK_URL", "http://koozali.bjsystems.co.uk/webhook/github"), + "secret": os.environ.get("WEBHOOK_SECRET", "change-me"), + "events": ["pull_request"] + } + }, + "cache_path": os.environ.get("SYNC_CACHE_PATH", "./.sync-cache"), + "state_path": os.environ.get("SYNC_STATE_PATH", "./sync_state.sqlite"), + "concurrency": 4, + "interval": 1800, + "include": [], + "exclude": [], + "skip_forks": True + }, + "bugzilla": { + "base_url": "https://bugs.koozali.org", + "auth": { + "mode": "basic", # or "api_key" + "username": "brianr", + "password": os.environ.get("BUGZILLA_PASSWORD", ""), + }, + "product": "SME11", + "component_template": "{repo}", + "component_fallback": "General", + "auto_create_component": False, + "groups": [], + "attach_diff": True, + "failure_policy": { + "close_pr_on_bugzilla_failure": False, + "label_on_bugzilla_failure": "bugzilla-needed" + }, + "templates": { + "pr_comment_success": + "Thanks for the contribution!\n\n" + "This repository is a read-only mirror of the canonical repo on Gitea:\n" + "- Canonical: {gitea_repo_url}\n" + "- Please file and discuss changes in Bugzilla: {bugzilla_base_url}\n\n" + "We created Bug {bug_id} to track this proposal:\n" + "- {bug_url}\n\n" + "This pull request will be closed here. Please follow up on the Bugzilla ticket for review and next steps. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n", + "pr_comment_failure": + "Thanks for the contribution!\n\n" + "This repository is a read-only mirror of the canonical repo on Gitea:\n" + "- Canonical: {gitea_repo_url}\n\n" + "We were unable to create a Bugzilla ticket automatically at this time.\n" + "Please open a bug at {bugzilla_base_url} (Product: SME11, Component: {repo}) and include:\n" + "- GitHub PR: {pr_url}\n" + "- Target branch: {target_branch}\n" + "- Summary and rationale for the change\n\n" + "This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n", + "bug_summary": "[GH PR #{pr_number}] {org}/{repo}: {pr_title}", + "bug_body": + "Source\n" + "- Canonical repo (Gitea): {gitea_repo_url}\n" + "- GitHub mirror PR: {pr_url}\n" + "- Submitted by: {github_user} ({github_user_url})\n" + "- Opened: {created_at}\n\n" + "Target and branch info\n" + "- Base branch: {target_branch} ({base_sha})\n" + "- Head branch: {source_branch} ({head_sha})\n" + "- Commits in PR: {commit_count}\n" + "- Compare: {compare_url}\n\n" + "Project policy\n" + "This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla. Any accepted changes will be applied to the Gitea repository and then mirrored back to GitHub.\n\n" + "Submitter’s notes\n" + "{pr_body}\n\n" + "Attachments\n" + "- The PR diff/patch is attached automatically by the mirror service. Subsequent updates to this PR will add new attachments to this bug.\n", + "bug_update_comment": + "Update from GitHub PR #{pr_number} for {org}/{repo}\n\n" + "- New head: {head_sha} (base: {base_sha})\n" + "- Commits in update: {commit_count}\n" + "- Compare: {compare_url}\n" + "- PR: {pr_url}\n\n" + "A refreshed patch set has been attached to this bug. Original PR description follows (for context):\n" + "{pr_body}\n", + "pr_sync_short_comment": + "Thanks for the update. We’ve attached a refreshed patch set to Bug {bug_id}: {bug_url}\n" + "This PR remains closed; please continue discussion in Bugzilla.\n" + } + } +} + +# -------------------------- +# Utilities +# -------------------------- + +def run(cmd: List[str], cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Tuple[int, str, str]: + p = subprocess.Popen(cmd, cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + out, err = p.communicate() + return p.returncode, out, err + +def ensure_dir(path: str): + os.makedirs(path, exist_ok=True) + +def safe_join(*parts) -> str: + return os.path.normpath(os.path.join(*parts)) + +def hmac_sha256(secret: str, data: bytes) -> str: + return "sha256=" + hmac.new(secret.encode("utf-8"), data, hashlib.sha256).hexdigest() + +# -------------------------- +# State (SQLite) +# -------------------------- + +class StateStore: + def __init__(self, path: str): + self.path = path + ensure_dir(os.path.dirname(os.path.abspath(path)) or ".") + self.conn = sqlite3.connect(path, check_same_thread=False) + self.conn.execute("PRAGMA journal_mode=WAL;") + self._init() + + def _init(self): + cur = self.conn.cursor() + cur.execute(""" + CREATE TABLE IF NOT EXISTS pr_map ( + pr_key TEXT PRIMARY KEY, + bug_id INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + cur.execute(""" + CREATE TABLE IF NOT EXISTS repo_map ( + gitea_repo_id INTEGER PRIMARY KEY, + gitea_full_name TEXT NOT NULL, + github_full_name TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + self.conn.commit() + + def set_pr_bug(self, pr_key: str, bug_id: int): + now = rfc3339_now() + cur = self.conn.cursor() + cur.execute(""" + INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at + """, (pr_key, bug_id, now, now)) + self.conn.commit() + + def get_pr_bug(self, pr_key: str) -> Optional[int]: + cur = self.conn.cursor() + cur.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,)) + row = cur.fetchone() + return row[0] if row else None + + def set_repo_map(self, gitea_repo_id: int, gitea_full_name: str, github_full_name: str): + now = rfc3339_now() + cur = self.conn.cursor() + cur.execute(""" + INSERT INTO repo_map (gitea_repo_id, gitea_full_name, github_full_name, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(gitea_repo_id) DO UPDATE SET gitea_full_name=excluded.gitea_full_name, github_full_name=excluded.github_full_name, updated_at=excluded.updated_at + """, (gitea_repo_id, gitea_full_name, github_full_name, now)) + self.conn.commit() + + def get_repo_map_by_id(self, gitea_repo_id: int) -> Optional[Tuple[str, str]]: + cur = self.conn.cursor() + cur.execute("SELECT gitea_full_name, github_full_name FROM repo_map WHERE gitea_repo_id=?", (gitea_repo_id,)) + row = cur.fetchone() + return (row[0], row[1]) if row else None + +# -------------------------- +# Clients +# -------------------------- + +class GiteaClient: + def __init__(self, base_url: str, token: str): + self.base_url = base_url.rstrip("/") + self.token = (token or "").strip() + import re + self.token = re.sub(r'^(token|bearer)\s+', '', (token or ''), flags=re.I).strip() + log(f"Gitea token len={len(self.token)} prefix={self.token[:6]+'...' if self.token else ''}") + + def _url(self, path: str) -> str: + return f"{self.base_url}{path}" + + def _request(self, method: str, path: str, *, json_body=None, params=None, timeout=30) -> httpx.Response: + url = self._url(path) + # 1) token scheme + headers = {"Authorization": f"token {self.token}"} if self.token else {} + r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout) + if r.status_code == 401 and self.token: + # 2) Bearer scheme + headers = {"Authorization": f"Bearer {self.token}"} + r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout) + if r.status_code == 401: + # 3) Query param fallback (proxy may strip Authorization) + qp = dict(params or {}) + qp["access_token"] = self.token + r = httpx.request(method, url, json=json_body, params=qp, timeout=timeout) + return r + + def get_repo(self, owner: str, repo: str): + r = self._request("GET", f"/api/v1/repos/{owner}/{repo}") + if r.status_code == 404: + return None + if r.status_code == 401: + raise RuntimeError(f"Gitea 401 for GET /repos/{owner}/{repo}. Token user may lack access to org '{owner}', or proxy is stripping Authorization.") + r.raise_for_status() + return r.json() + + def create_org_repo(self, org: str, name: str, private: bool, description: str, homepage: str, default_branch: str, auto_init: bool): + payload = { + "name": name, + "private": private, + "description": description, + "website": homepage, + "default_branch": default_branch, + "auto_init": auto_init + } + r = self._request("POST", f"/api/v1/orgs/{org}/repos", json_body=payload, timeout=60) + if r.status_code == 401: + raise RuntimeError(f"Gitea 401 creating repo in org '{org}'. Ensure token user is org member with create permissions and token scopes include write:organization.") + if r.status_code not in (200, 201): + raise RuntimeError(f"Gitea create repo failed: {r.status_code} {r.text}") + return r.json() + + def list_org_repos(self, org: str): + out, page = [], 1 + while True: + r = self._request("GET", f"/api/v1/orgs/{org}/repos", params={"limit": 50, "page": page}) + r.raise_for_status() + items = r.json() + if not items: + break + out.extend(items) + page += 1 + return out + + def get_push_mirrors(self, owner: str, repo: str): + r = self._request("GET", f"/api/v1/repos/{owner}/{repo}/push_mirrors") + if r.status_code == 404: + return [] + r.raise_for_status() + return r.json() + + def create_push_mirror(self, owner: str, repo: str, remote_address: str, username: Optional[str], password: Optional[str], sync_on_commit: bool = True, interval: str = "24h0m0s") -> bool: + url = self._url(f"/api/v1/repos/{owner}/{repo}/push_mirrors") + payload = { + "remote_address": remote_address, + "remote_username": username or "", + "remote_password": password or "", + "sync_on_commit": sync_on_commit, + "interval": interval, # required by Gitea 1.24 + } + r = httpx.post(url, headers=self._headers_token_first(), json=payload, timeout=30) + if r.status_code in (200, 201): + return True + if r.status_code == 404: + log("Gitea push mirror API not available on this server/version.") + return False + log(f"Failed to create push mirror on Gitea: {r.status_code} {r.text}") + return False + +class GitHubClient: + def __init__(self, owner: str, token: str): + self.owner = owner + self.gh = Github(auth=Auth.Token(token), per_page=100) + self.token = token + log(f"Github token {token} ") + try: + self.auth_user = self.gh.get_user().login + log(f"login to github {self.auth_user} {self.owner}") + except Exception: + self.auth_user = "" + + def make_push_url(self, repo_name: str) -> str: + user = self.auth_user or "git" + return f"https://{urlquote(user)}:{urlquote(self.token)}@github.com/{self.owner}/{repo_name}.git" + + def get_or_create_repo(self, name: str, visibility: str, description: str, homepage: str) -> Repository: + try: + org = self.gh.get_organization(self.owner) + repo = None + with contextlib.suppress(GithubException): + repo = org.get_repo(name) + if repo is None: + repo = org.create_repo( + name=name, + description=description or "", + homepage=homepage or "", + private=(visibility == "private"), + has_issues=False, + has_projects=False, + has_wiki=False, + auto_init=False + ) + else: + repo.edit( + name=name, + description=description or "", + homepage=homepage or "", + private=(visibility == "private"), + has_issues=False + ) + return repo + except GithubException as e: + # If owner is a user, not an org + if e.status == 404: + user = self.gh.get_user(self.owner) + repo = None + with contextlib.suppress(GithubException): + repo = user.get_repo(name) + if repo is None: + repo = user.create_repo( + name=name, + description=description or "", + homepage=homepage or "", + private=(visibility == "private"), + has_issues=False, + auto_init=False + ) + else: + repo.edit( + name=name, + description=description or "", + homepage=homepage or "", + private=(visibility == "private"), + has_issues=False + ) + return repo + raise + + def enforce_repo_settings(self, repo: Repository, topics: List[str]): + # Ensure at least one merge method is enabled (GitHub requirement) + try: + repo.edit( + allow_merge_commit=True, # keep one enabled + allow_squash_merge=False, + allow_rebase_merge=False, + allow_auto_merge=False, # optional explicit + has_issues=False, + ) + except GithubException as e: + # Fallback in case older API fields differ, or we hit the 422 anyway + if getattr(e, "status", None) == 422: + # Retry with a different single merge method just in case + repo.edit( + allow_merge_commit=False, + allow_squash_merge=True, + allow_rebase_merge=False, + has_issues=False, + ) + else: + raise + # Topics + if topics: + with contextlib.suppress(GithubException): + repo.replace_topics(topics) + + def ensure_webhook(self, repo, url: str, secret: str, events: list[str]): + desired_cfg = { + "url": url, + "content_type": "json", + "secret": secret, # always set; GitHub won’t echo it back + "insecure_ssl": "0", # "0" or "1" as strings + } + hooks = list(repo.get_hooks()) + for h in hooks: + cfg = h.config or {} + if cfg.get("url") == url: + # Update in place. Must pass name="web" as first arg. + try: + h.edit("web", config=desired_cfg, events=events, active=True) + except Exception as e: + # Some PyGithub versions need add/remove events instead; fallback to setting full list + h.edit("web", config=desired_cfg, events=events, active=True) + return + # Create if missing + repo.create_hook( + name="web", + config=desired_cfg, + events=events, + active=True, + ) + + def ensure_branch_protection( + self, + repo, + branch_name: str, + include_admins: bool, + allow_force_pushes: bool, + allow_deletions: bool, + restrict_users: list[str], + restrict_teams: list[str], + restrict_apps: list[str], + ): + # Ensure branch exists + try: + repo.get_branch(branch_name) + except GithubException as e: + log(f"Branch {branch_name} not found on {repo.full_name}: {e}") + return + + owner = repo.owner.login + name = repo.name + base = f"https://api.github.com/repos/{owner}/{name}/branches/{branch_name}" + headers = { + "Authorization": f"Bearer {self.token}", + "Accept": "application/vnd.github+json, application/vnd.github.luke-cage-preview+json", + "X-GitHub-Api-Version": "2022-11-28", + } + + # 1) Base protection + r = httpx.put( + f"{base}/protection", + headers=headers, + json={ + "required_status_checks": None, + "enforce_admins": bool(include_admins), + "required_pull_request_reviews": None, + "restrictions": None, + }, + timeout=30, + ) + if r.status_code not in (200, 201): + log(f"Failed to set base protection on {repo.full_name}@{branch_name}: {r.status_code} {r.text}") + return + + # 2) Allow force pushes + r_fp = httpx.put( + f"{base}/protection/allow_force_pushes", + headers=headers, + json={"enabled": bool(allow_force_pushes)}, + timeout=15, + ) + if r_fp.status_code not in (200, 201): + log(f"Failed to set allow_force_pushes on {repo.full_name}@{branch_name}: {r_fp.status_code} {r_fp.text}") + + # 3) Allow deletions + r_del = httpx.put( + f"{base}/protection/allow_deletions", + headers=headers, + json={"enabled": bool(allow_deletions)}, + timeout=15, + ) + if r_del.status_code not in (200, 201): + log(f"Failed to set allow_deletions on {repo.full_name}@{branch_name}: {r_del.status_code} {r_del.text}") + + # If we couldn’t enable force pushes, remove protection to avoid blocking the mirror + if allow_force_pushes and r_fp.status_code == 404: + httpx.delete(f"{base}/protection", headers=headers, timeout=15) + log(f"Removed protection on {repo.full_name}@{branch_name} to prevent blocking mirror (force pushes endpoint 404).") + return + + # 4) Restrictions (org repos only) + is_org = getattr(repo.owner, "type", None) == "Organization" + if is_org and (restrict_users or restrict_teams or restrict_apps): + r_res = httpx.put( + f"{base}/protection/restrictions", + headers=headers, + json={"users": restrict_users or [], "teams": restrict_teams or [], "apps": restrict_apps or []}, + timeout=30, + ) + if r_res.status_code not in (200, 201): + log(f"Failed to set push restrictions on {repo.full_name}@{branch_name}: {r_res.status_code} {r_res.text}") + log(f"Repo owner type for {repo.full_name} is {getattr(repo.owner,'type',None)}") + + def close_pr_with_comment_and_label(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None): + pr = repo.get_pull(pr_number) + pr.create_issue_comment(comment) + if label: + with contextlib.suppress(GithubException): + repo.create_label(name=label, color="ededed") + with contextlib.suppress(GithubException): + pr.as_issue().add_to_labels(label) + if pr.state != "closed": + pr.edit(state="closed") + + def comment_on_pr(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None): + pr = repo.get_pull(pr_number) + pr.create_issue_comment(comment) + if label: + with contextlib.suppress(GithubException): + repo.create_label(name=label, color="ededed") + with contextlib.suppress(GithubException): + pr.as_issue().add_to_labels(label) + +# -------------------------- +# Bugzilla integration +# -------------------------- + +class BugzillaClient: + def __init__(self, cfg: Dict[str, Any]): + self.base_url = cfg["base_url"].rstrip("/") + self.auth_mode = cfg["auth"]["mode"] + self.username = cfg["auth"].get("username", "") + self.password = cfg["auth"].get("password", "") + self.api_key = cfg["auth"].get("api_key", "") + self.product = cfg["product"] + self.component_template = cfg["component_template"] + self.component_fallback = cfg["component_fallback"] + self.auto_create_component = bool(cfg.get("auto_create_component", False)) + self.groups = cfg.get("groups", []) + self.attach_diff = bool(cfg.get("attach_diff", True)) + self.templates = cfg["templates"] + + def _headers(self) -> Dict[str, str]: + headers = {"Accept": "application/json"} + if self.auth_mode == "api_key" and self.api_key: + headers["X-BUGZILLA-API-KEY"] = self.api_key + return headers + + def _auth(self) -> Optional[Tuple[str, str]]: + if self.auth_mode == "basic": + return (self.username, self.password) + return None + + def create_bug(self, summary: str, description: str, component: str, visibility_groups: Optional[List[str]] = None) -> int: + url = f"{self.base_url}/rest/bug" + payload = { + "product": self.product, + "component": component, + "summary": summary, + "description": description, + } + if visibility_groups: + payload["groups"] = visibility_groups + r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60) + if r.status_code not in (200, 201): + raise RuntimeError(f"Bugzilla create bug failed: {r.status_code} {r.text}") + data = r.json() + bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"]) + if not bug_id: + raise RuntimeError(f"Bugzilla response missing bug id: {data}") + return int(bug_id) + + def add_attachment(self, bug_id: int, file_name: str, content_type: str, summary: str, data_bytes: bytes): + url = f"{self.base_url}/rest/bug/{bug_id}/attachment" + payload = { + "ids": [bug_id], + "data": base64.b64encode(data_bytes).decode("ascii"), + "file_name": file_name, + "summary": summary, + "content_type": content_type, + "is_patch": True + } + r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=120) + if r.status_code not in (200, 201): + raise RuntimeError(f"Bugzilla add attachment failed: {r.status_code} {r.text}") + + def add_comment(self, bug_id: int, comment: str): + url = f"{self.base_url}/rest/bug/{bug_id}/comment" + payload = {"comment": comment} + r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60) + if r.status_code not in (200, 201): + raise RuntimeError(f"Bugzilla add comment failed: {r.status_code} {r.text}") + +# -------------------------- +# Git mirror operations +# -------------------------- + +class GitMirror: + def __init__(self, cache_dir: str): + self.cache_dir = cache_dir + ensure_dir(cache_dir) + + def local_path(self, org: str, repo: str) -> str: + return safe_join(self.cache_dir, f"{org}--{repo}.git") + + def ensure_local_mirror(self, org: str, repo: str, gitea_clone_url: str): + path = self.local_path(org, repo) + if not os.path.isdir(path): + ensure_dir(os.path.dirname(path)) + rc, out, err = run(["git", "clone", "--mirror", gitea_clone_url, path]) + if rc != 0: + raise RuntimeError(f"git clone --mirror failed: {err.strip()}") + else: + # Ensure origin URL is correct and fetch + rc, out, err = run(["git", "remote", "set-url", "origin", gitea_clone_url], cwd=path) + if rc != 0: + log(f"Warning: could not set origin URL: {err.strip()}") + # Fetch + rc, out, err = run(["git", "fetch", "--prune", "--tags", "--force"], cwd=path) + if rc != 0: + raise RuntimeError(f"git fetch failed: {err.strip()}") + + def push_to_github(self, org: str, repo: str, github_url: str): + path = self.local_path(org, repo) + # Add or update 'github' remote + remotes_rc, remotes_out, _ = run(["git", "remote"], cwd=path) + if remotes_rc != 0: + raise RuntimeError("git remote list failed") + remotes = set(remotes_out.strip().splitlines()) + if "github" not in remotes: + rc, out, err = run(["git", "remote", "add", "github", github_url], cwd=path) + if rc != 0: + raise RuntimeError(f"git remote add github failed: {err.strip()}") + else: + rc, out, err = run(["git", "remote", "set-url", "github", github_url], cwd=path) + if rc != 0: + raise RuntimeError(f"git remote set-url github failed: {err.strip()}") + # Push refspecs + for refspec in ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]: + rc, out, err = run(["git", "push", "--prune", "github", refspec], cwd=path) + if rc != 0: + raise RuntimeError(f"git push failed for {refspec}: {err.strip()}") + +# -------------------------- +# Template rendering +# -------------------------- + +def render_template(tpl: str, vars: Dict[str, Any]) -> str: + try: + return tpl.format(**vars) + except Exception: + # Fallback: leave placeholders as-is + return tpl + +# -------------------------- +# Sync engine +# -------------------------- + +@dataclasses.dataclass +class RepoContext: + org: str + repo: str + gitea_repo: Dict[str, Any] + github_repo: Repository + +class SyncEngine: + def __init__(self, cfg: Dict[str, Any], state: StateStore, gitea: GiteaClient, gh: GitHubClient, mirror: GitMirror): + self.cfg = cfg + self.state = state + self.gitea = gitea + self.gh = gh + self.mirror = mirror + + def _derive_texts(self, org: str, repo: str, gitea_http_url: str) -> Tuple[str, str, str, str]: + gcfg = self.cfg["sync"]["gitea"]["new_repo_defaults"] + hcfg = self.cfg["sync"]["github"]["repo_defaults"] + g_desc = render_template(gcfg.get("description_template", "") or "", {"org": org, "repo": repo}) + g_home = render_template(gcfg.get("homepage_template", "") or "", {"org": org, "repo": repo}) + gh_desc = render_template(hcfg.get("description_template", "") or "", {"gitea_repo_url": gitea_http_url}) + gh_home = render_template(hcfg.get("homepage_template", "") or "", {"gitea_repo_url": gitea_http_url}) + return g_desc, g_home, gh_desc, gh_home + + def ensure_repos_and_mirror(self, org: str, repo: str) -> RepoContext: + # 1) Gitea: get or create + gitea_repo = self.gitea.get_repo(org, repo) + gcfg = self.cfg["sync"]["gitea"] + if gitea_repo is None: + if not gcfg.get("create_if_missing", False): + raise RuntimeError(f"Gitea repo {org}/{repo} not found and auto-creation disabled") + vis = gcfg["new_repo_defaults"]["visibility"] + default_branch = gcfg["new_repo_defaults"]["default_branch"] + auto_init = bool(gcfg["new_repo_defaults"]["seed_new_repo"]) + g_desc, g_home, _, _ = self._derive_texts(org, repo, f"{self.gitea.base_url}/{org}/{repo}") + log(f"Creating Gitea repo {org}/{repo} (visibility={vis}, auto_init={auto_init})") + gitea_repo = self.gitea.create_org_repo( + org=org, + name=repo, + private=(vis == "private"), + description=g_desc, + homepage=g_home, + default_branch=default_branch, + auto_init=auto_init + ) + + # Build Gitea clone URLs + gitea_http_clone = gitea_repo.get("clone_url") or f"{self.gitea.base_url}/{org}/{repo}.git" + gitea_ssh_clone = gitea_repo.get("ssh_url") or gitea_http_clone + gitea_html_url = gitea_repo.get("html_url") or f"{self.gitea.base_url}/{org}/{repo}" + + # 2) GitHub: ensure repo + mirror_vis_policy = self.cfg["sync"]["github"]["repo_defaults"]["mirror_visibility"] + visibility = "private" if (gitea_repo.get("private") and mirror_vis_policy == "mirror_source") else "public" + g_desc, g_home, gh_desc, gh_home = self._derive_texts(org, repo, gitea_html_url) + github_repo = self.gh.get_or_create_repo(name=repo, visibility=visibility, description=gh_desc, homepage=gh_home) + topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", []) + self.gh.enforce_repo_settings(github_repo, topics=topics) + + # 3) Apply webhook + wh = self.cfg["sync"]["github"]["webhook"] + if wh.get("mode") == "server": + self.gh.ensure_webhook(github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"])) + + # 4) Mirror content + # Prefer HTTPS clone on Gitea with token for private repos; but for fetch we can use anonymous if public + gitea_clone = gitea_http_clone + if gitea_repo.get("private"): + # embed token for read (not ideal; better to use SSH deploy key) + token = self.cfg["sync"]["gitea"]["token"] + if token: + gitea_clone = gitea_http_clone.replace("://", f"://{token}@") + self.mirror.ensure_local_mirror(org, repo, gitea_clone) + + # GitHub push URL with PAT + gh_push = self.gh.make_push_url(repo) #f"https://{self.auth_user}:{self.token}@github.com/{self.owner}/{repo}.git" + self.mirror.push_to_github(org, repo, gh_push) + + # 5) Default branch and protection + default_branch = gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"] + bp = self.cfg["sync"]["github"]["branch_protection"] + # Replace placeholder in restrict list + restrict_users = bp["restrict_push"].get("users", []) + restrict_teams = bp["restrict_push"].get("teams", []) + restrict_apps = bp["restrict_push"].get("apps", []) + if "" in restrict_users and self.gh.auth_user != "": + restrict_users = [u for u in restrict_users if u != ""] + [self.gh.auth_user] + self.gh.ensure_branch_protection( + repo=github_repo, + branch_name=default_branch, + include_admins=bp.get("include_admins", True), + allow_force_pushes=bp.get("allow_force_pushes", True), + allow_deletions=bp.get("allow_deletions", False), + restrict_users=restrict_users, + restrict_teams=restrict_teams, + restrict_apps=restrict_apps + ) + + # 6) Gitea push mirror (optional) + if self.cfg["sync"]["gitea"].get("push_mirror", {}).get("enable", True): + # Use HTTPS with PAT to GitHub. Use PAT user/token + gh_pat = self.gh.token + remote_address = f"https://{gh_pat}:x-oauth-basic@github.com/{self.gh.owner}/{repo}.git" + with contextlib.suppress(Exception): + self.gitea.create_push_mirror(owner=org, repo=repo, remote_address=remote_address, username=None, password=None, sync_on_commit=True) + + # Map repo IDs for rename detection later + if gitea_repo.get("id"): + self.state.set_repo_map(int(gitea_repo["id"]), f"{org}/{repo}", f"{self.gh.owner}/{repo}") + + return RepoContext(org=org, repo=repo, gitea_repo=gitea_repo, github_repo=github_repo) + + def enforce_settings_only(self, ctx: RepoContext): + topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", []) + self.gh.enforce_repo_settings(ctx.github_repo, topics=topics) + wh = self.cfg["sync"]["github"]["webhook"] + if wh.get("mode") == "server": + self.gh.ensure_webhook(ctx.github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"])) + default_branch = ctx.gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"] + bp = self.cfg["sync"]["github"]["branch_protection"] + restrict_users = bp["restrict_push"].get("users", []) + if "" in restrict_users and self.gh.auth_user != "": + restrict_users = [u for u in restrict_users if u != ""] + [self.gh.auth_user] + self.gh.ensure_branch_protection( + repo=ctx.github_repo, + branch_name=default_branch, + include_admins=bp.get("include_admins", True), + allow_force_pushes=bp.get("allow_force_pushes", True), + allow_deletions=bp.get("allow_deletions", False), + restrict_users=restrict_users, + restrict_teams=bp["restrict_push"].get("teams", []), + restrict_apps=bp["restrict_push"].get("apps", []) + ) + +# -------------------------- +# Webhook server (PR autocloser) +# -------------------------- + +class PRAutocloserServer: + def __init__(self, cfg: Dict[str, Any], state: StateStore, gh_client: GitHubClient, bz_client: BugzillaClient): + self.cfg = cfg + self.state = state + self.ghc = gh_client + self.bzc = bz_client + self.app = FastAPI() + self._setup_routes() + + def _verify_signature(self, secret: str, body: bytes, signature: str): + expected = hmac_sha256(secret, body) + if not hmac.compare_digest(expected, signature): + raise HTTPException(status_code=401, detail="Invalid signature") + + async def _handle_pr_event(self, payload: Dict[str, Any]): + action = payload.get("action") + if action not in ("opened", "reopened", "synchronize", "ready_for_review"): + return JSONResponse({"status": "ignored", "action": action}) + + repo_full = payload["repository"]["full_name"] # owner/repo on GitHub + owner = payload["repository"]["owner"]["login"] + repo = payload["repository"]["name"] + pr = payload["pull_request"] + pr_number = pr["number"] + pr_state = pr["state"] + pr_url = pr["html_url"] + pr_title = pr.get("title") or "" + pr_body = pr.get("body") or "" + github_user = pr["user"]["login"] + github_user_url = pr["user"]["html_url"] + base = pr["base"] + head = pr["head"] + target_branch = base["ref"] + source_branch = f"{head.get('repo', {}).get('owner', {}).get('login', '')}:{head['ref']}" if head.get("repo") else head["ref"] + base_sha = base["sha"][:7] + head_sha = head["sha"][:7] + created_at = pr.get("created_at") or rfc3339_now() + labels = [l["name"] for l in pr.get("labels", [])] if pr.get("labels") else [] + compare_url = pr.get("html_url") + "/files" + org = self.cfg["sync"]["gitea"]["org"] + gitea_repo_url = f"{self.cfg['sync']['gitea']['base_url']}/{org}/{repo}" + bugzilla_base_url = self.cfg["bugzilla"]["base_url"] + + variables = { + "bugzilla_base_url": bugzilla_base_url, + "bug_id": "", + "bug_url": "", + "org": org, + "repo": repo, + "gitea_repo_url": gitea_repo_url, + "github_owner": owner, + "pr_number": pr_number, + "pr_title": pr_title, + "pr_url": pr_url, + "github_user": github_user, + "github_user_url": github_user_url, + "source_branch": source_branch, + "target_branch": target_branch, + "head_sha": head_sha, + "base_sha": base_sha, + "created_at": created_at, + "pr_body": pr_body, + "commit_count": pr.get("commits", ""), + "compare_url": compare_url, + "labels": ", ".join(labels) + } + + # Fetch repo via PyGithub + repo_obj = self.ghc.gh.get_repo(repo_full) + + # PR key + pr_key = f"{repo_full}#{pr_number}" + + # Ensure bug mapping or create bug (on opened/reopened/ready_for_review; on synchronize attach) + bug_id = self.state.get_pr_bug(pr_key) + + if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"): + # Attempt to create bug + component = self.cfg["bugzilla"]["component_template"].format(repo=repo) + bug_summary = render_template(self.cfg["bugzilla"]["templates"]["bug_summary"], variables) + bug_body = render_template(self.cfg["bugzilla"]["templates"]["bug_body"], variables) + create_ok = True + try: + bug_id = self.bzc.create_bug(bug_summary, bug_body, component, visibility_groups=self.cfg["bugzilla"].get("groups", [])) + self.state.set_pr_bug(pr_key, bug_id) + log(f"Created Bugzilla bug {bug_id} for PR {pr_key}") + except Exception as e: + log(f"Bugzilla create bug failed for PR {pr_key}: {e}") + create_ok = False + + if create_ok: + # Attach diff/patch if enabled + if self.cfg["bugzilla"].get("attach_diff", True): + try: + # Pull .patch from GitHub API + # Using GitHub’s patch endpoint requires auth if private; use token from gh client + api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}" + headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"} + r = httpx.get(api_patch_url, headers=headers, timeout=120) + r.raise_for_status() + patch_bytes = r.content + self.bzc.add_attachment( + bug_id=bug_id, + file_name=f"PR-{pr_number}-{head_sha}.patch", + content_type="text/x-patch", + summary=f"Patch for PR #{pr_number} ({head_sha})", + data_bytes=patch_bytes + ) + except Exception as e: + log(f"Failed to attach patch for PR {pr_key} to bug {bug_id}: {e}") + + # Post success comment and close PR + variables["bug_id"] = str(bug_id) + variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}" + comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_success"], variables) + self.ghc.close_pr_with_comment_and_label(repo_obj, pr_number, comment, label=None) + return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action}) + else: + # Post failure comment and keep PR open (policy) + comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_failure"], variables) + label = self.cfg["bugzilla"]["failure_policy"].get("label_on_bugzilla_failure") or None + self.ghc.comment_on_pr(repo_obj, pr_number, comment, label=label) + return JSONResponse({"status": "bugzilla_failed", "action": action}) + + elif bug_id is not None and action == "synchronize": + # Attach updated diff and keep PR closed with optional short comment + try: + api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}" + headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"} + r = httpx.get(api_patch_url, headers=headers, timeout=120) + r.raise_for_status() + patch_bytes = r.content + self.bzc.add_attachment( + bug_id=bug_id, + file_name=f"PR-{pr_number}-{head_sha}.patch", + content_type="text/x-patch", + summary=f"Updated patch for PR #{pr_number} ({head_sha})", + data_bytes=patch_bytes + ) + # Optional brief PR comment + short_tpl = self.cfg["bugzilla"]["templates"].get("pr_sync_short_comment") + if short_tpl: + variables["bug_id"] = str(bug_id) + variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}" + self.ghc.comment_on_pr(repo_obj, pr_number, render_template(short_tpl, variables), label=None) + # Ensure PR is closed (in case re-opened) + pr_obj = repo_obj.get_pull(pr_number) + if pr_obj.state != "closed": + pr_obj.edit(state="closed") + return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action}) + except Exception as e: + log(f"Failed to attach updated patch for PR {pr_key} to bug {bug_id}: {e}") + return JSONResponse({"status": "attach_failed", "bug_id": bug_id, "action": action}) + + return JSONResponse({"status": "noop", "action": action}) + + def _setup_routes(self): + @self.app.get("/healthz") + async def healthz(): + return PlainTextResponse("ok") + + @self.app.post("/webhook/github") + async def github_webhook(request: Request, x_hub_signature_256: str = Header(None), x_github_event: str = Header(None)): + body = await request.body() + secret = self.cfg["sync"]["github"]["webhook"]["secret"] + if not x_hub_signature_256: + raise HTTPException(status_code=401, detail="Missing signature") + self._verify_signature(secret, body, x_hub_signature_256) + try: + payload = json.loads(body.decode("utf-8")) + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON") + if x_github_event != "pull_request": + return JSONResponse({"status": "ignored", "event": x_github_event}) + return await self._handle_pr_event(payload) + +# -------------------------- +# CLI and workflows +# -------------------------- + +def load_config(path: Optional[str]) -> Dict[str, Any]: + cfg = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy + if path: + with open(path, "r") as f: + user_cfg = yaml.safe_load(f) or {} + cfg = merge_dicts(cfg, user_cfg) + # Ensure derived defaults + return cfg + +def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: + out = dict(a) + for k, v in b.items(): + if k in out and isinstance(out[k], dict) and isinstance(v, dict): + out[k] = merge_dicts(out[k], v) + else: + out[k] = v + return out + +def single_shot(cfg: Dict[str, Any], org_repo: str): + org, repo = org_repo.split("/", 1) + state = StateStore(cfg["sync"]["state_path"]) + gitea = GiteaClient(cfg["sync"]["gitea"]["base_url"], cfg["sync"]["gitea"]["token"]) + + gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) + mirror = GitMirror(cfg["sync"]["cache_path"]) + engine = SyncEngine(cfg, state, gitea, gh, mirror) + ctx = engine.ensure_repos_and_mirror(org, repo) + # Print summary + print(json.dumps({ + "gitea_repo": ctx.gitea_repo.get("full_name") or f"{org}/{repo}", + "github_repo": ctx.github_repo.full_name + }, indent=2)) + +def continuous_mode(cfg: Dict[str, Any], interval: int): + state = StateStore(cfg["sync"]["state_path"]) + gitea_cfg = cfg["sync"]["gitea"] + gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"]) + gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) + mirror = GitMirror(cfg["sync"]["cache_path"]) + engine = SyncEngine(cfg, state, gitea, gh, mirror) + org = gitea_cfg["org"] + log(f"Starting continuous sync loop for Gitea org {org} every {interval}s") + stop = threading.Event() + + def handle_sig(sig, frame): + log("Shutting down continuous loop...") + stop.set() + signal.signal(signal.SIGINT, handle_sig) + signal.signal(signal.SIGTERM, handle_sig) + + while not stop.is_set(): + try: + repos = gitea.list_org_repos(org) + for r in repos: + name = r["name"] + if cfg["sync"].get("skip_forks", True) and r.get("fork", False): + continue + try: + engine.ensure_repos_and_mirror(org, name) + except Exception as e: + log(f"Error syncing {org}/{name}: {e}") + except Exception as e: + log(f"Scan error: {e}") + # Sleep with stop check + for _ in range(interval): + if stop.is_set(): + break + time.sleep(1) + +def validate_mode(cfg: Dict[str, Any], dry_run: bool): + state = StateStore(cfg["sync"]["state_path"]) + gitea_cfg = cfg["sync"]["gitea"] + gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"]) + gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) + org = gitea_cfg["org"] + repos = gitea.list_org_repos(org) + report = [] + for r in repos: + name = r["name"] + target_full = f"{gh.owner}/{name}" + ok_presence = True + try: + repo_obj = gh.gh.get_repo(target_full) + except GithubException: + ok_presence = False + repo_obj = None + item = { + "repo": f"{org}/{name}", + "github_repo": target_full, + "exists_on_github": ok_presence, + "issues_disabled": None, + "merge_methods_disabled": None, + "webhook_present": None, + "branch_protection": None + } + if ok_presence and repo_obj is not None: + item["issues_disabled"] = (repo_obj.has_issues is False) + item["merge_methods_disabled"] = (not repo_obj.allow_merge_commit and not repo_obj.allow_squash_merge and not repo_obj.allow_rebase_merge) + # Webhook presence + wh_url = cfg["sync"]["github"]["webhook"]["url"] + item["webhook_present"] = any(h.config.get("url") == wh_url for h in repo_obj.get_hooks()) + # Branch protection check + default_branch = r.get("default_branch") or cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"] + try: + b = repo_obj.get_branch(default_branch) + prot = b.protection + item["branch_protection"] = bool(prot and prot.enabled) + except GithubException: + item["branch_protection"] = False + report.append(item) + # Optionally fix drift (not fully exhaustive to keep concise) + if not dry_run: + try: + mirror = GitMirror(cfg["sync"]["cache_path"]) + engine = SyncEngine(cfg, state, gitea, gh, mirror) + ctx = engine.ensure_repos_and_mirror(org, name) + engine.enforce_settings_only(ctx) + except Exception as e: + log(f"Validation fix failed for {org}/{name}: {e}") + + print(json.dumps({"report": report}, indent=2)) + +def run_webhook_server(cfg: Dict[str, Any]): + state = StateStore(cfg["sync"]["state_path"]) + gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"]) + bz = BugzillaClient(cfg["bugzilla"]) + server = PRAutocloserServer(cfg, state, gh, bz) + url = cfg["sync"]["github"]["webhook"]["url"] + parsed_port = 8080 + host = "0.0.0.0" + # Infer port from URL if provided (basic parsing) + with contextlib.suppress(Exception): + from urllib.parse import urlparse + u = urlparse(url) + if u.port: + parsed_port = u.port + log(f"Starting webhook server on port {parsed_port}") + uvicorn.run(server.app, host=host, port=parsed_port) + +def main(): + ap = argparse.ArgumentParser(description="Gitea→GitHub mirror and PR autocloser") + ap.add_argument("--config", help="Path to YAML config", default=None) + ap.add_argument("--mode", choices=["single", "continuous", "validate", "webhook"], required=True) + ap.add_argument("--gitea-repo", help="org/repo for single mode") + ap.add_argument("--interval", type=int, help="Polling interval seconds (continuous)", default=None) + ap.add_argument("--dry-run", action="store_true", help="Validation dry run") + args = ap.parse_args() + + cfg = load_config(args.config) + + if args.mode == "single": + if not args.gitea_repo: + print("--gitea-repo org/repo is required for single mode", file=sys.stderr) + sys.exit(2) + single_shot(cfg, args.gitea_repo) + + elif args.mode == "continuous": + interval = args.interval or cfg["sync"]["interval"] + continuous_mode(cfg, interval) + + elif args.mode == "validate": + validate_mode(cfg, dry_run=args.dry_run) + + elif args.mode == "webhook": + run_webhook_server(cfg) + +if __name__ == "__main__": + main() \ No newline at end of file