Files
smeserver-gitutils/gitea_to_github_sync.py
Brian Read 761045bd62 Initial upload of gitea to github sync python3 program
Creates github repo ok, not yet fully tested
2025-09-16 14:58:42 +02:00

1203 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import base64
import contextlib
import dataclasses
import datetime as dt
import functools
import hashlib
import hmac
import io
import json
import os
import signal
import sqlite3
import subprocess
import sys
import threading
import time
from typing import Any, Dict, List, Optional, Tuple
# Third-party deps:
# pip install pyyaml httpx PyGithub fastapi uvicorn
import httpx
import yaml
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from github import Github, GithubException, Auth
from github.Branch import Branch
from github.Repository import Repository
import uvicorn
from urllib.parse import quote as urlquote
from datetime import timezone
def utc_now() -> dt.datetime:
return dt.datetime.now(timezone.utc)
def rfc3339_now() -> str:
# Example: 2025-09-16T12:10:03Z
return utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
def log(msg: str):
print(f"[{rfc3339_now()}] {msg}", flush=True)
# --------------------------
# Configuration and defaults
# --------------------------
DEFAULT_CONFIG = {
"sync": {
"source_of_truth": "gitea",
"gitea": {
"base_url": "https://src.koozali.org",
"org": "smeserver",
"token": os.environ.get("GITEA_TOKEN", ""),
"create_if_missing": True,
"new_repo_defaults": {
"visibility": "public",
"default_branch": "main",
"seed_new_repo": True,
"seed_commit": {
"author_name": "Koozali Mirror Bot",
"author_email": "noreply@koozali.org",
"message": "Initialize repository (mirror seed)",
"readme_content": "This repository is initialized by the Koozali mirror service.\nCanonical development happens on Gitea:\n- https://src.koozali.org/{org}/{repo}\nPlease file issues and patches via Bugzilla:\n- https://bugs.koozali.org\n",
},
"description_template": "Canonical repository for {org}/{repo}",
"homepage_template": "https://src.koozali.org/{org}/{repo}",
},
"push_mirror": {
"enable": True
}
},
"github": {
"owner": "Koozali-SME-Server", # set your GitHub org/user
"auth": {
"mode": "pat",
"token": os.environ.get("GITHUB_TOKEN", ""),
},
"repo_defaults": {
"mirror_visibility": "mirror_source",
"topics": ["mirror", "read-only"],
"description_template": "Read-only mirror of {gitea_repo_url}. Submit patches via Bugzilla: https://bugs.koozali.org",
"homepage_template": "https://bugs.koozali.org",
},
"branch_protection": {
"apply_to": "default_branch",
"include_admins": True,
"allow_force_pushes": True,
"allow_deletions": False,
"restrict_push": {
"users": ["<PAT-username>"], # replaced at runtime
"teams": [],
"apps": []
},
"required_checks": [],
"required_reviews": {
"required_approving_review_count": 0
}
},
"webhook": {
"mode": "server",
"url": os.environ.get("GITHUB_WEBHOOK_URL", "http://koozali.bjsystems.co.uk/webhook/github"),
"secret": os.environ.get("WEBHOOK_SECRET", "change-me"),
"events": ["pull_request"]
}
},
"cache_path": os.environ.get("SYNC_CACHE_PATH", "./.sync-cache"),
"state_path": os.environ.get("SYNC_STATE_PATH", "./sync_state.sqlite"),
"concurrency": 4,
"interval": 1800,
"include": [],
"exclude": [],
"skip_forks": True
},
"bugzilla": {
"base_url": "https://bugs.koozali.org",
"auth": {
"mode": "basic", # or "api_key"
"username": "brianr",
"password": os.environ.get("BUGZILLA_PASSWORD", ""),
},
"product": "SME11",
"component_template": "{repo}",
"component_fallback": "General",
"auto_create_component": False,
"groups": [],
"attach_diff": True,
"failure_policy": {
"close_pr_on_bugzilla_failure": False,
"label_on_bugzilla_failure": "bugzilla-needed"
},
"templates": {
"pr_comment_success":
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
"- Canonical: {gitea_repo_url}\n"
"- Please file and discuss changes in Bugzilla: {bugzilla_base_url}\n\n"
"We created Bug {bug_id} to track this proposal:\n"
"- {bug_url}\n\n"
"This pull request will be closed here. Please follow up on the Bugzilla ticket for review and next steps. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n",
"pr_comment_failure":
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
"- Canonical: {gitea_repo_url}\n\n"
"We were unable to create a Bugzilla ticket automatically at this time.\n"
"Please open a bug at {bugzilla_base_url} (Product: SME11, Component: {repo}) and include:\n"
"- GitHub PR: {pr_url}\n"
"- Target branch: {target_branch}\n"
"- Summary and rationale for the change\n\n"
"This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n",
"bug_summary": "[GH PR #{pr_number}] {org}/{repo}: {pr_title}",
"bug_body":
"Source\n"
"- Canonical repo (Gitea): {gitea_repo_url}\n"
"- GitHub mirror PR: {pr_url}\n"
"- Submitted by: {github_user} ({github_user_url})\n"
"- Opened: {created_at}\n\n"
"Target and branch info\n"
"- Base branch: {target_branch} ({base_sha})\n"
"- Head branch: {source_branch} ({head_sha})\n"
"- Commits in PR: {commit_count}\n"
"- Compare: {compare_url}\n\n"
"Project policy\n"
"This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla. Any accepted changes will be applied to the Gitea repository and then mirrored back to GitHub.\n\n"
"Submitters notes\n"
"{pr_body}\n\n"
"Attachments\n"
"- The PR diff/patch is attached automatically by the mirror service. Subsequent updates to this PR will add new attachments to this bug.\n",
"bug_update_comment":
"Update from GitHub PR #{pr_number} for {org}/{repo}\n\n"
"- New head: {head_sha} (base: {base_sha})\n"
"- Commits in update: {commit_count}\n"
"- Compare: {compare_url}\n"
"- PR: {pr_url}\n\n"
"A refreshed patch set has been attached to this bug. Original PR description follows (for context):\n"
"{pr_body}\n",
"pr_sync_short_comment":
"Thanks for the update. Weve attached a refreshed patch set to Bug {bug_id}: {bug_url}\n"
"This PR remains closed; please continue discussion in Bugzilla.\n"
}
}
}
# --------------------------
# Utilities
# --------------------------
def run(cmd: List[str], cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Tuple[int, str, str]:
p = subprocess.Popen(cmd, cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
out, err = p.communicate()
return p.returncode, out, err
def ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def safe_join(*parts) -> str:
return os.path.normpath(os.path.join(*parts))
def hmac_sha256(secret: str, data: bytes) -> str:
return "sha256=" + hmac.new(secret.encode("utf-8"), data, hashlib.sha256).hexdigest()
# --------------------------
# State (SQLite)
# --------------------------
class StateStore:
def __init__(self, path: str):
self.path = path
ensure_dir(os.path.dirname(os.path.abspath(path)) or ".")
self.conn = sqlite3.connect(path, check_same_thread=False)
self.conn.execute("PRAGMA journal_mode=WAL;")
self._init()
def _init(self):
cur = self.conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS pr_map (
pr_key TEXT PRIMARY KEY,
bug_id INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS repo_map (
gitea_repo_id INTEGER PRIMARY KEY,
gitea_full_name TEXT NOT NULL,
github_full_name TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
self.conn.commit()
def set_pr_bug(self, pr_key: str, bug_id: int):
now = rfc3339_now()
cur = self.conn.cursor()
cur.execute("""
INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at
""", (pr_key, bug_id, now, now))
self.conn.commit()
def get_pr_bug(self, pr_key: str) -> Optional[int]:
cur = self.conn.cursor()
cur.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,))
row = cur.fetchone()
return row[0] if row else None
def set_repo_map(self, gitea_repo_id: int, gitea_full_name: str, github_full_name: str):
now = rfc3339_now()
cur = self.conn.cursor()
cur.execute("""
INSERT INTO repo_map (gitea_repo_id, gitea_full_name, github_full_name, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(gitea_repo_id) DO UPDATE SET gitea_full_name=excluded.gitea_full_name, github_full_name=excluded.github_full_name, updated_at=excluded.updated_at
""", (gitea_repo_id, gitea_full_name, github_full_name, now))
self.conn.commit()
def get_repo_map_by_id(self, gitea_repo_id: int) -> Optional[Tuple[str, str]]:
cur = self.conn.cursor()
cur.execute("SELECT gitea_full_name, github_full_name FROM repo_map WHERE gitea_repo_id=?", (gitea_repo_id,))
row = cur.fetchone()
return (row[0], row[1]) if row else None
# --------------------------
# Clients
# --------------------------
class GiteaClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.token = (token or "").strip()
import re
self.token = re.sub(r'^(token|bearer)\s+', '', (token or ''), flags=re.I).strip()
log(f"Gitea token len={len(self.token)} prefix={self.token[:6]+'...' if self.token else '<empty>'}")
def _url(self, path: str) -> str:
return f"{self.base_url}{path}"
def _request(self, method: str, path: str, *, json_body=None, params=None, timeout=30) -> httpx.Response:
url = self._url(path)
# 1) token scheme
headers = {"Authorization": f"token {self.token}"} if self.token else {}
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout)
if r.status_code == 401 and self.token:
# 2) Bearer scheme
headers = {"Authorization": f"Bearer {self.token}"}
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout)
if r.status_code == 401:
# 3) Query param fallback (proxy may strip Authorization)
qp = dict(params or {})
qp["access_token"] = self.token
r = httpx.request(method, url, json=json_body, params=qp, timeout=timeout)
return r
def get_repo(self, owner: str, repo: str):
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}")
if r.status_code == 404:
return None
if r.status_code == 401:
raise RuntimeError(f"Gitea 401 for GET /repos/{owner}/{repo}. Token user may lack access to org '{owner}', or proxy is stripping Authorization.")
r.raise_for_status()
return r.json()
def create_org_repo(self, org: str, name: str, private: bool, description: str, homepage: str, default_branch: str, auto_init: bool):
payload = {
"name": name,
"private": private,
"description": description,
"website": homepage,
"default_branch": default_branch,
"auto_init": auto_init
}
r = self._request("POST", f"/api/v1/orgs/{org}/repos", json_body=payload, timeout=60)
if r.status_code == 401:
raise RuntimeError(f"Gitea 401 creating repo in org '{org}'. Ensure token user is org member with create permissions and token scopes include write:organization.")
if r.status_code not in (200, 201):
raise RuntimeError(f"Gitea create repo failed: {r.status_code} {r.text}")
return r.json()
def list_org_repos(self, org: str):
out, page = [], 1
while True:
r = self._request("GET", f"/api/v1/orgs/{org}/repos", params={"limit": 50, "page": page})
r.raise_for_status()
items = r.json()
if not items:
break
out.extend(items)
page += 1
return out
def get_push_mirrors(self, owner: str, repo: str):
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}/push_mirrors")
if r.status_code == 404:
return []
r.raise_for_status()
return r.json()
def create_push_mirror(self, owner: str, repo: str, remote_address: str, username: Optional[str], password: Optional[str], sync_on_commit: bool = True, interval: str = "24h0m0s") -> bool:
url = self._url(f"/api/v1/repos/{owner}/{repo}/push_mirrors")
payload = {
"remote_address": remote_address,
"remote_username": username or "",
"remote_password": password or "",
"sync_on_commit": sync_on_commit,
"interval": interval, # required by Gitea 1.24
}
r = httpx.post(url, headers=self._headers_token_first(), json=payload, timeout=30)
if r.status_code in (200, 201):
return True
if r.status_code == 404:
log("Gitea push mirror API not available on this server/version.")
return False
log(f"Failed to create push mirror on Gitea: {r.status_code} {r.text}")
return False
class GitHubClient:
def __init__(self, owner: str, token: str):
self.owner = owner
self.gh = Github(auth=Auth.Token(token), per_page=100)
self.token = token
log(f"Github token {token} ")
try:
self.auth_user = self.gh.get_user().login
log(f"login to github {self.auth_user} {self.owner}")
except Exception:
self.auth_user = "<unknown>"
def make_push_url(self, repo_name: str) -> str:
user = self.auth_user or "git"
return f"https://{urlquote(user)}:{urlquote(self.token)}@github.com/{self.owner}/{repo_name}.git"
def get_or_create_repo(self, name: str, visibility: str, description: str, homepage: str) -> Repository:
try:
org = self.gh.get_organization(self.owner)
repo = None
with contextlib.suppress(GithubException):
repo = org.get_repo(name)
if repo is None:
repo = org.create_repo(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False,
has_projects=False,
has_wiki=False,
auto_init=False
)
else:
repo.edit(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False
)
return repo
except GithubException as e:
# If owner is a user, not an org
if e.status == 404:
user = self.gh.get_user(self.owner)
repo = None
with contextlib.suppress(GithubException):
repo = user.get_repo(name)
if repo is None:
repo = user.create_repo(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False,
auto_init=False
)
else:
repo.edit(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False
)
return repo
raise
def enforce_repo_settings(self, repo: Repository, topics: List[str]):
# Ensure at least one merge method is enabled (GitHub requirement)
try:
repo.edit(
allow_merge_commit=True, # keep one enabled
allow_squash_merge=False,
allow_rebase_merge=False,
allow_auto_merge=False, # optional explicit
has_issues=False,
)
except GithubException as e:
# Fallback in case older API fields differ, or we hit the 422 anyway
if getattr(e, "status", None) == 422:
# Retry with a different single merge method just in case
repo.edit(
allow_merge_commit=False,
allow_squash_merge=True,
allow_rebase_merge=False,
has_issues=False,
)
else:
raise
# Topics
if topics:
with contextlib.suppress(GithubException):
repo.replace_topics(topics)
def ensure_webhook(self, repo, url: str, secret: str, events: list[str]):
desired_cfg = {
"url": url,
"content_type": "json",
"secret": secret, # always set; GitHub wont echo it back
"insecure_ssl": "0", # "0" or "1" as strings
}
hooks = list(repo.get_hooks())
for h in hooks:
cfg = h.config or {}
if cfg.get("url") == url:
# Update in place. Must pass name="web" as first arg.
try:
h.edit("web", config=desired_cfg, events=events, active=True)
except Exception as e:
# Some PyGithub versions need add/remove events instead; fallback to setting full list
h.edit("web", config=desired_cfg, events=events, active=True)
return
# Create if missing
repo.create_hook(
name="web",
config=desired_cfg,
events=events,
active=True,
)
def ensure_branch_protection(
self,
repo,
branch_name: str,
include_admins: bool,
allow_force_pushes: bool,
allow_deletions: bool,
restrict_users: list[str],
restrict_teams: list[str],
restrict_apps: list[str],
):
# Ensure branch exists
try:
repo.get_branch(branch_name)
except GithubException as e:
log(f"Branch {branch_name} not found on {repo.full_name}: {e}")
return
owner = repo.owner.login
name = repo.name
base = f"https://api.github.com/repos/{owner}/{name}/branches/{branch_name}"
headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json, application/vnd.github.luke-cage-preview+json",
"X-GitHub-Api-Version": "2022-11-28",
}
# 1) Base protection
r = httpx.put(
f"{base}/protection",
headers=headers,
json={
"required_status_checks": None,
"enforce_admins": bool(include_admins),
"required_pull_request_reviews": None,
"restrictions": None,
},
timeout=30,
)
if r.status_code not in (200, 201):
log(f"Failed to set base protection on {repo.full_name}@{branch_name}: {r.status_code} {r.text}")
return
# 2) Allow force pushes
r_fp = httpx.put(
f"{base}/protection/allow_force_pushes",
headers=headers,
json={"enabled": bool(allow_force_pushes)},
timeout=15,
)
if r_fp.status_code not in (200, 201):
log(f"Failed to set allow_force_pushes on {repo.full_name}@{branch_name}: {r_fp.status_code} {r_fp.text}")
# 3) Allow deletions
r_del = httpx.put(
f"{base}/protection/allow_deletions",
headers=headers,
json={"enabled": bool(allow_deletions)},
timeout=15,
)
if r_del.status_code not in (200, 201):
log(f"Failed to set allow_deletions on {repo.full_name}@{branch_name}: {r_del.status_code} {r_del.text}")
# If we couldnt enable force pushes, remove protection to avoid blocking the mirror
if allow_force_pushes and r_fp.status_code == 404:
httpx.delete(f"{base}/protection", headers=headers, timeout=15)
log(f"Removed protection on {repo.full_name}@{branch_name} to prevent blocking mirror (force pushes endpoint 404).")
return
# 4) Restrictions (org repos only)
is_org = getattr(repo.owner, "type", None) == "Organization"
if is_org and (restrict_users or restrict_teams or restrict_apps):
r_res = httpx.put(
f"{base}/protection/restrictions",
headers=headers,
json={"users": restrict_users or [], "teams": restrict_teams or [], "apps": restrict_apps or []},
timeout=30,
)
if r_res.status_code not in (200, 201):
log(f"Failed to set push restrictions on {repo.full_name}@{branch_name}: {r_res.status_code} {r_res.text}")
log(f"Repo owner type for {repo.full_name} is {getattr(repo.owner,'type',None)}")
def close_pr_with_comment_and_label(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
pr = repo.get_pull(pr_number)
pr.create_issue_comment(comment)
if label:
with contextlib.suppress(GithubException):
repo.create_label(name=label, color="ededed")
with contextlib.suppress(GithubException):
pr.as_issue().add_to_labels(label)
if pr.state != "closed":
pr.edit(state="closed")
def comment_on_pr(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
pr = repo.get_pull(pr_number)
pr.create_issue_comment(comment)
if label:
with contextlib.suppress(GithubException):
repo.create_label(name=label, color="ededed")
with contextlib.suppress(GithubException):
pr.as_issue().add_to_labels(label)
# --------------------------
# Bugzilla integration
# --------------------------
class BugzillaClient:
def __init__(self, cfg: Dict[str, Any]):
self.base_url = cfg["base_url"].rstrip("/")
self.auth_mode = cfg["auth"]["mode"]
self.username = cfg["auth"].get("username", "")
self.password = cfg["auth"].get("password", "")
self.api_key = cfg["auth"].get("api_key", "")
self.product = cfg["product"]
self.component_template = cfg["component_template"]
self.component_fallback = cfg["component_fallback"]
self.auto_create_component = bool(cfg.get("auto_create_component", False))
self.groups = cfg.get("groups", [])
self.attach_diff = bool(cfg.get("attach_diff", True))
self.templates = cfg["templates"]
def _headers(self) -> Dict[str, str]:
headers = {"Accept": "application/json"}
if self.auth_mode == "api_key" and self.api_key:
headers["X-BUGZILLA-API-KEY"] = self.api_key
return headers
def _auth(self) -> Optional[Tuple[str, str]]:
if self.auth_mode == "basic":
return (self.username, self.password)
return None
def create_bug(self, summary: str, description: str, component: str, visibility_groups: Optional[List[str]] = None) -> int:
url = f"{self.base_url}/rest/bug"
payload = {
"product": self.product,
"component": component,
"summary": summary,
"description": description,
}
if visibility_groups:
payload["groups"] = visibility_groups
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
if r.status_code not in (200, 201):
raise RuntimeError(f"Bugzilla create bug failed: {r.status_code} {r.text}")
data = r.json()
bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"])
if not bug_id:
raise RuntimeError(f"Bugzilla response missing bug id: {data}")
return int(bug_id)
def add_attachment(self, bug_id: int, file_name: str, content_type: str, summary: str, data_bytes: bytes):
url = f"{self.base_url}/rest/bug/{bug_id}/attachment"
payload = {
"ids": [bug_id],
"data": base64.b64encode(data_bytes).decode("ascii"),
"file_name": file_name,
"summary": summary,
"content_type": content_type,
"is_patch": True
}
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=120)
if r.status_code not in (200, 201):
raise RuntimeError(f"Bugzilla add attachment failed: {r.status_code} {r.text}")
def add_comment(self, bug_id: int, comment: str):
url = f"{self.base_url}/rest/bug/{bug_id}/comment"
payload = {"comment": comment}
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
if r.status_code not in (200, 201):
raise RuntimeError(f"Bugzilla add comment failed: {r.status_code} {r.text}")
# --------------------------
# Git mirror operations
# --------------------------
class GitMirror:
def __init__(self, cache_dir: str):
self.cache_dir = cache_dir
ensure_dir(cache_dir)
def local_path(self, org: str, repo: str) -> str:
return safe_join(self.cache_dir, f"{org}--{repo}.git")
def ensure_local_mirror(self, org: str, repo: str, gitea_clone_url: str):
path = self.local_path(org, repo)
if not os.path.isdir(path):
ensure_dir(os.path.dirname(path))
rc, out, err = run(["git", "clone", "--mirror", gitea_clone_url, path])
if rc != 0:
raise RuntimeError(f"git clone --mirror failed: {err.strip()}")
else:
# Ensure origin URL is correct and fetch
rc, out, err = run(["git", "remote", "set-url", "origin", gitea_clone_url], cwd=path)
if rc != 0:
log(f"Warning: could not set origin URL: {err.strip()}")
# Fetch
rc, out, err = run(["git", "fetch", "--prune", "--tags", "--force"], cwd=path)
if rc != 0:
raise RuntimeError(f"git fetch failed: {err.strip()}")
def push_to_github(self, org: str, repo: str, github_url: str):
path = self.local_path(org, repo)
# Add or update 'github' remote
remotes_rc, remotes_out, _ = run(["git", "remote"], cwd=path)
if remotes_rc != 0:
raise RuntimeError("git remote list failed")
remotes = set(remotes_out.strip().splitlines())
if "github" not in remotes:
rc, out, err = run(["git", "remote", "add", "github", github_url], cwd=path)
if rc != 0:
raise RuntimeError(f"git remote add github failed: {err.strip()}")
else:
rc, out, err = run(["git", "remote", "set-url", "github", github_url], cwd=path)
if rc != 0:
raise RuntimeError(f"git remote set-url github failed: {err.strip()}")
# Push refspecs
for refspec in ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]:
rc, out, err = run(["git", "push", "--prune", "github", refspec], cwd=path)
if rc != 0:
raise RuntimeError(f"git push failed for {refspec}: {err.strip()}")
# --------------------------
# Template rendering
# --------------------------
def render_template(tpl: str, vars: Dict[str, Any]) -> str:
try:
return tpl.format(**vars)
except Exception:
# Fallback: leave placeholders as-is
return tpl
# --------------------------
# Sync engine
# --------------------------
@dataclasses.dataclass
class RepoContext:
org: str
repo: str
gitea_repo: Dict[str, Any]
github_repo: Repository
class SyncEngine:
def __init__(self, cfg: Dict[str, Any], state: StateStore, gitea: GiteaClient, gh: GitHubClient, mirror: GitMirror):
self.cfg = cfg
self.state = state
self.gitea = gitea
self.gh = gh
self.mirror = mirror
def _derive_texts(self, org: str, repo: str, gitea_http_url: str) -> Tuple[str, str, str, str]:
gcfg = self.cfg["sync"]["gitea"]["new_repo_defaults"]
hcfg = self.cfg["sync"]["github"]["repo_defaults"]
g_desc = render_template(gcfg.get("description_template", "") or "", {"org": org, "repo": repo})
g_home = render_template(gcfg.get("homepage_template", "") or "", {"org": org, "repo": repo})
gh_desc = render_template(hcfg.get("description_template", "") or "", {"gitea_repo_url": gitea_http_url})
gh_home = render_template(hcfg.get("homepage_template", "") or "", {"gitea_repo_url": gitea_http_url})
return g_desc, g_home, gh_desc, gh_home
def ensure_repos_and_mirror(self, org: str, repo: str) -> RepoContext:
# 1) Gitea: get or create
gitea_repo = self.gitea.get_repo(org, repo)
gcfg = self.cfg["sync"]["gitea"]
if gitea_repo is None:
if not gcfg.get("create_if_missing", False):
raise RuntimeError(f"Gitea repo {org}/{repo} not found and auto-creation disabled")
vis = gcfg["new_repo_defaults"]["visibility"]
default_branch = gcfg["new_repo_defaults"]["default_branch"]
auto_init = bool(gcfg["new_repo_defaults"]["seed_new_repo"])
g_desc, g_home, _, _ = self._derive_texts(org, repo, f"{self.gitea.base_url}/{org}/{repo}")
log(f"Creating Gitea repo {org}/{repo} (visibility={vis}, auto_init={auto_init})")
gitea_repo = self.gitea.create_org_repo(
org=org,
name=repo,
private=(vis == "private"),
description=g_desc,
homepage=g_home,
default_branch=default_branch,
auto_init=auto_init
)
# Build Gitea clone URLs
gitea_http_clone = gitea_repo.get("clone_url") or f"{self.gitea.base_url}/{org}/{repo}.git"
gitea_ssh_clone = gitea_repo.get("ssh_url") or gitea_http_clone
gitea_html_url = gitea_repo.get("html_url") or f"{self.gitea.base_url}/{org}/{repo}"
# 2) GitHub: ensure repo
mirror_vis_policy = self.cfg["sync"]["github"]["repo_defaults"]["mirror_visibility"]
visibility = "private" if (gitea_repo.get("private") and mirror_vis_policy == "mirror_source") else "public"
g_desc, g_home, gh_desc, gh_home = self._derive_texts(org, repo, gitea_html_url)
github_repo = self.gh.get_or_create_repo(name=repo, visibility=visibility, description=gh_desc, homepage=gh_home)
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
self.gh.enforce_repo_settings(github_repo, topics=topics)
# 3) Apply webhook
wh = self.cfg["sync"]["github"]["webhook"]
if wh.get("mode") == "server":
self.gh.ensure_webhook(github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"]))
# 4) Mirror content
# Prefer HTTPS clone on Gitea with token for private repos; but for fetch we can use anonymous if public
gitea_clone = gitea_http_clone
if gitea_repo.get("private"):
# embed token for read (not ideal; better to use SSH deploy key)
token = self.cfg["sync"]["gitea"]["token"]
if token:
gitea_clone = gitea_http_clone.replace("://", f"://{token}@")
self.mirror.ensure_local_mirror(org, repo, gitea_clone)
# GitHub push URL with PAT
gh_push = self.gh.make_push_url(repo) #f"https://{self.auth_user}:{self.token}@github.com/{self.owner}/{repo}.git"
self.mirror.push_to_github(org, repo, gh_push)
# 5) Default branch and protection
default_branch = gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
bp = self.cfg["sync"]["github"]["branch_protection"]
# Replace placeholder in restrict list
restrict_users = bp["restrict_push"].get("users", [])
restrict_teams = bp["restrict_push"].get("teams", [])
restrict_apps = bp["restrict_push"].get("apps", [])
if "<PAT-username>" in restrict_users and self.gh.auth_user != "<unknown>":
restrict_users = [u for u in restrict_users if u != "<PAT-username>"] + [self.gh.auth_user]
self.gh.ensure_branch_protection(
repo=github_repo,
branch_name=default_branch,
include_admins=bp.get("include_admins", True),
allow_force_pushes=bp.get("allow_force_pushes", True),
allow_deletions=bp.get("allow_deletions", False),
restrict_users=restrict_users,
restrict_teams=restrict_teams,
restrict_apps=restrict_apps
)
# 6) Gitea push mirror (optional)
if self.cfg["sync"]["gitea"].get("push_mirror", {}).get("enable", True):
# Use HTTPS with PAT to GitHub. Use PAT user/token
gh_pat = self.gh.token
remote_address = f"https://{gh_pat}:x-oauth-basic@github.com/{self.gh.owner}/{repo}.git"
with contextlib.suppress(Exception):
self.gitea.create_push_mirror(owner=org, repo=repo, remote_address=remote_address, username=None, password=None, sync_on_commit=True)
# Map repo IDs for rename detection later
if gitea_repo.get("id"):
self.state.set_repo_map(int(gitea_repo["id"]), f"{org}/{repo}", f"{self.gh.owner}/{repo}")
return RepoContext(org=org, repo=repo, gitea_repo=gitea_repo, github_repo=github_repo)
def enforce_settings_only(self, ctx: RepoContext):
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
self.gh.enforce_repo_settings(ctx.github_repo, topics=topics)
wh = self.cfg["sync"]["github"]["webhook"]
if wh.get("mode") == "server":
self.gh.ensure_webhook(ctx.github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"]))
default_branch = ctx.gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
bp = self.cfg["sync"]["github"]["branch_protection"]
restrict_users = bp["restrict_push"].get("users", [])
if "<PAT-username>" in restrict_users and self.gh.auth_user != "<unknown>":
restrict_users = [u for u in restrict_users if u != "<PAT-username>"] + [self.gh.auth_user]
self.gh.ensure_branch_protection(
repo=ctx.github_repo,
branch_name=default_branch,
include_admins=bp.get("include_admins", True),
allow_force_pushes=bp.get("allow_force_pushes", True),
allow_deletions=bp.get("allow_deletions", False),
restrict_users=restrict_users,
restrict_teams=bp["restrict_push"].get("teams", []),
restrict_apps=bp["restrict_push"].get("apps", [])
)
# --------------------------
# Webhook server (PR autocloser)
# --------------------------
class PRAutocloserServer:
def __init__(self, cfg: Dict[str, Any], state: StateStore, gh_client: GitHubClient, bz_client: BugzillaClient):
self.cfg = cfg
self.state = state
self.ghc = gh_client
self.bzc = bz_client
self.app = FastAPI()
self._setup_routes()
def _verify_signature(self, secret: str, body: bytes, signature: str):
expected = hmac_sha256(secret, body)
if not hmac.compare_digest(expected, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
async def _handle_pr_event(self, payload: Dict[str, Any]):
action = payload.get("action")
if action not in ("opened", "reopened", "synchronize", "ready_for_review"):
return JSONResponse({"status": "ignored", "action": action})
repo_full = payload["repository"]["full_name"] # owner/repo on GitHub
owner = payload["repository"]["owner"]["login"]
repo = payload["repository"]["name"]
pr = payload["pull_request"]
pr_number = pr["number"]
pr_state = pr["state"]
pr_url = pr["html_url"]
pr_title = pr.get("title") or ""
pr_body = pr.get("body") or ""
github_user = pr["user"]["login"]
github_user_url = pr["user"]["html_url"]
base = pr["base"]
head = pr["head"]
target_branch = base["ref"]
source_branch = f"{head.get('repo', {}).get('owner', {}).get('login', '')}:{head['ref']}" if head.get("repo") else head["ref"]
base_sha = base["sha"][:7]
head_sha = head["sha"][:7]
created_at = pr.get("created_at") or rfc3339_now()
labels = [l["name"] for l in pr.get("labels", [])] if pr.get("labels") else []
compare_url = pr.get("html_url") + "/files"
org = self.cfg["sync"]["gitea"]["org"]
gitea_repo_url = f"{self.cfg['sync']['gitea']['base_url']}/{org}/{repo}"
bugzilla_base_url = self.cfg["bugzilla"]["base_url"]
variables = {
"bugzilla_base_url": bugzilla_base_url,
"bug_id": "",
"bug_url": "",
"org": org,
"repo": repo,
"gitea_repo_url": gitea_repo_url,
"github_owner": owner,
"pr_number": pr_number,
"pr_title": pr_title,
"pr_url": pr_url,
"github_user": github_user,
"github_user_url": github_user_url,
"source_branch": source_branch,
"target_branch": target_branch,
"head_sha": head_sha,
"base_sha": base_sha,
"created_at": created_at,
"pr_body": pr_body,
"commit_count": pr.get("commits", ""),
"compare_url": compare_url,
"labels": ", ".join(labels)
}
# Fetch repo via PyGithub
repo_obj = self.ghc.gh.get_repo(repo_full)
# PR key
pr_key = f"{repo_full}#{pr_number}"
# Ensure bug mapping or create bug (on opened/reopened/ready_for_review; on synchronize attach)
bug_id = self.state.get_pr_bug(pr_key)
if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"):
# Attempt to create bug
component = self.cfg["bugzilla"]["component_template"].format(repo=repo)
bug_summary = render_template(self.cfg["bugzilla"]["templates"]["bug_summary"], variables)
bug_body = render_template(self.cfg["bugzilla"]["templates"]["bug_body"], variables)
create_ok = True
try:
bug_id = self.bzc.create_bug(bug_summary, bug_body, component, visibility_groups=self.cfg["bugzilla"].get("groups", []))
self.state.set_pr_bug(pr_key, bug_id)
log(f"Created Bugzilla bug {bug_id} for PR {pr_key}")
except Exception as e:
log(f"Bugzilla create bug failed for PR {pr_key}: {e}")
create_ok = False
if create_ok:
# Attach diff/patch if enabled
if self.cfg["bugzilla"].get("attach_diff", True):
try:
# Pull .patch from GitHub API
# Using GitHubs patch endpoint requires auth if private; use token from gh client
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
r = httpx.get(api_patch_url, headers=headers, timeout=120)
r.raise_for_status()
patch_bytes = r.content
self.bzc.add_attachment(
bug_id=bug_id,
file_name=f"PR-{pr_number}-{head_sha}.patch",
content_type="text/x-patch",
summary=f"Patch for PR #{pr_number} ({head_sha})",
data_bytes=patch_bytes
)
except Exception as e:
log(f"Failed to attach patch for PR {pr_key} to bug {bug_id}: {e}")
# Post success comment and close PR
variables["bug_id"] = str(bug_id)
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_success"], variables)
self.ghc.close_pr_with_comment_and_label(repo_obj, pr_number, comment, label=None)
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
else:
# Post failure comment and keep PR open (policy)
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_failure"], variables)
label = self.cfg["bugzilla"]["failure_policy"].get("label_on_bugzilla_failure") or None
self.ghc.comment_on_pr(repo_obj, pr_number, comment, label=label)
return JSONResponse({"status": "bugzilla_failed", "action": action})
elif bug_id is not None and action == "synchronize":
# Attach updated diff and keep PR closed with optional short comment
try:
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
r = httpx.get(api_patch_url, headers=headers, timeout=120)
r.raise_for_status()
patch_bytes = r.content
self.bzc.add_attachment(
bug_id=bug_id,
file_name=f"PR-{pr_number}-{head_sha}.patch",
content_type="text/x-patch",
summary=f"Updated patch for PR #{pr_number} ({head_sha})",
data_bytes=patch_bytes
)
# Optional brief PR comment
short_tpl = self.cfg["bugzilla"]["templates"].get("pr_sync_short_comment")
if short_tpl:
variables["bug_id"] = str(bug_id)
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
self.ghc.comment_on_pr(repo_obj, pr_number, render_template(short_tpl, variables), label=None)
# Ensure PR is closed (in case re-opened)
pr_obj = repo_obj.get_pull(pr_number)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
except Exception as e:
log(f"Failed to attach updated patch for PR {pr_key} to bug {bug_id}: {e}")
return JSONResponse({"status": "attach_failed", "bug_id": bug_id, "action": action})
return JSONResponse({"status": "noop", "action": action})
def _setup_routes(self):
@self.app.get("/healthz")
async def healthz():
return PlainTextResponse("ok")
@self.app.post("/webhook/github")
async def github_webhook(request: Request, x_hub_signature_256: str = Header(None), x_github_event: str = Header(None)):
body = await request.body()
secret = self.cfg["sync"]["github"]["webhook"]["secret"]
if not x_hub_signature_256:
raise HTTPException(status_code=401, detail="Missing signature")
self._verify_signature(secret, body, x_hub_signature_256)
try:
payload = json.loads(body.decode("utf-8"))
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
if x_github_event != "pull_request":
return JSONResponse({"status": "ignored", "event": x_github_event})
return await self._handle_pr_event(payload)
# --------------------------
# CLI and workflows
# --------------------------
def load_config(path: Optional[str]) -> Dict[str, Any]:
cfg = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy
if path:
with open(path, "r") as f:
user_cfg = yaml.safe_load(f) or {}
cfg = merge_dicts(cfg, user_cfg)
# Ensure derived defaults
return cfg
def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
out = dict(a)
for k, v in b.items():
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
out[k] = merge_dicts(out[k], v)
else:
out[k] = v
return out
def single_shot(cfg: Dict[str, Any], org_repo: str):
org, repo = org_repo.split("/", 1)
state = StateStore(cfg["sync"]["state_path"])
gitea = GiteaClient(cfg["sync"]["gitea"]["base_url"], cfg["sync"]["gitea"]["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
ctx = engine.ensure_repos_and_mirror(org, repo)
# Print summary
print(json.dumps({
"gitea_repo": ctx.gitea_repo.get("full_name") or f"{org}/{repo}",
"github_repo": ctx.github_repo.full_name
}, indent=2))
def continuous_mode(cfg: Dict[str, Any], interval: int):
state = StateStore(cfg["sync"]["state_path"])
gitea_cfg = cfg["sync"]["gitea"]
gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
org = gitea_cfg["org"]
log(f"Starting continuous sync loop for Gitea org {org} every {interval}s")
stop = threading.Event()
def handle_sig(sig, frame):
log("Shutting down continuous loop...")
stop.set()
signal.signal(signal.SIGINT, handle_sig)
signal.signal(signal.SIGTERM, handle_sig)
while not stop.is_set():
try:
repos = gitea.list_org_repos(org)
for r in repos:
name = r["name"]
if cfg["sync"].get("skip_forks", True) and r.get("fork", False):
continue
try:
engine.ensure_repos_and_mirror(org, name)
except Exception as e:
log(f"Error syncing {org}/{name}: {e}")
except Exception as e:
log(f"Scan error: {e}")
# Sleep with stop check
for _ in range(interval):
if stop.is_set():
break
time.sleep(1)
def validate_mode(cfg: Dict[str, Any], dry_run: bool):
state = StateStore(cfg["sync"]["state_path"])
gitea_cfg = cfg["sync"]["gitea"]
gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
org = gitea_cfg["org"]
repos = gitea.list_org_repos(org)
report = []
for r in repos:
name = r["name"]
target_full = f"{gh.owner}/{name}"
ok_presence = True
try:
repo_obj = gh.gh.get_repo(target_full)
except GithubException:
ok_presence = False
repo_obj = None
item = {
"repo": f"{org}/{name}",
"github_repo": target_full,
"exists_on_github": ok_presence,
"issues_disabled": None,
"merge_methods_disabled": None,
"webhook_present": None,
"branch_protection": None
}
if ok_presence and repo_obj is not None:
item["issues_disabled"] = (repo_obj.has_issues is False)
item["merge_methods_disabled"] = (not repo_obj.allow_merge_commit and not repo_obj.allow_squash_merge and not repo_obj.allow_rebase_merge)
# Webhook presence
wh_url = cfg["sync"]["github"]["webhook"]["url"]
item["webhook_present"] = any(h.config.get("url") == wh_url for h in repo_obj.get_hooks())
# Branch protection check
default_branch = r.get("default_branch") or cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
try:
b = repo_obj.get_branch(default_branch)
prot = b.protection
item["branch_protection"] = bool(prot and prot.enabled)
except GithubException:
item["branch_protection"] = False
report.append(item)
# Optionally fix drift (not fully exhaustive to keep concise)
if not dry_run:
try:
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
ctx = engine.ensure_repos_and_mirror(org, name)
engine.enforce_settings_only(ctx)
except Exception as e:
log(f"Validation fix failed for {org}/{name}: {e}")
print(json.dumps({"report": report}, indent=2))
def run_webhook_server(cfg: Dict[str, Any]):
state = StateStore(cfg["sync"]["state_path"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
bz = BugzillaClient(cfg["bugzilla"])
server = PRAutocloserServer(cfg, state, gh, bz)
url = cfg["sync"]["github"]["webhook"]["url"]
parsed_port = 8080
host = "0.0.0.0"
# Infer port from URL if provided (basic parsing)
with contextlib.suppress(Exception):
from urllib.parse import urlparse
u = urlparse(url)
if u.port:
parsed_port = u.port
log(f"Starting webhook server on port {parsed_port}")
uvicorn.run(server.app, host=host, port=parsed_port)
def main():
ap = argparse.ArgumentParser(description="Gitea→GitHub mirror and PR autocloser")
ap.add_argument("--config", help="Path to YAML config", default=None)
ap.add_argument("--mode", choices=["single", "continuous", "validate", "webhook"], required=True)
ap.add_argument("--gitea-repo", help="org/repo for single mode")
ap.add_argument("--interval", type=int, help="Polling interval seconds (continuous)", default=None)
ap.add_argument("--dry-run", action="store_true", help="Validation dry run")
args = ap.parse_args()
cfg = load_config(args.config)
if args.mode == "single":
if not args.gitea_repo:
print("--gitea-repo org/repo is required for single mode", file=sys.stderr)
sys.exit(2)
single_shot(cfg, args.gitea_repo)
elif args.mode == "continuous":
interval = args.interval or cfg["sync"]["interval"]
continuous_mode(cfg, interval)
elif args.mode == "validate":
validate_mode(cfg, dry_run=args.dry_run)
elif args.mode == "webhook":
run_webhook_server(cfg)
if __name__ == "__main__":
main()