Files
smeserver-gitutils/gitea_to_github_sync.py

1254 lines
57 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import base64
import contextlib
import dataclasses
import datetime as dt
import functools
import hashlib
import hmac
import io
import json
import logging
import os
import signal
import sqlite3
import subprocess
import sys
import threading
import time
from typing import Any, Dict, List, Optional, Tuple
# Third-party deps:
# pip install PyYAML httpx PyGithub fastapi uvicorn
import httpx
import yaml
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from github import Github, GithubException, Auth
from github.Branch import Branch
from github.Repository import Repository
import uvicorn
from urllib.parse import quote as urlquote
from datetime import timezone
# --------------------------
# Time helpers
# --------------------------
def utc_now() -> dt.datetime:
return dt.datetime.now(timezone.utc)
def rfc3339_now() -> str:
return utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
# --------------------------
# Logging setup
# --------------------------
class UTCFormatter(logging.Formatter):
converter = time.gmtime
def formatTime(self, record, datefmt=None):
ts = dt.datetime.fromtimestamp(record.created, tz=timezone.utc)
return ts.isoformat(timespec="seconds").replace("+00:00", "Z")
def configure_logging(level_name: str = "INFO"):
level = getattr(logging, (level_name or "INFO").upper(), logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(UTCFormatter("[%(asctime)s] %(levelname)s %(message)s"))
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(level)
logger = logging.getLogger(__name__)
# --------------------------
# Configuration and defaults
# --------------------------
DEFAULT_CONFIG = {
"sync": {
"source_of_truth": "gitea",
"gitea": {
"base_url": "https://src.koozali.org",
"org": "smeserver",
"token": os.environ.get("GITEA_TOKEN", ""),
"create_if_missing": True,
"new_repo_defaults": {
"visibility": "public",
"default_branch": "main",
"seed_new_repo": True,
"seed_commit": {
"author_name": "Koozali Mirror Bot",
"author_email": "noreply@koozali.org",
"message": "Initialize repository (mirror seed)",
"readme_content": "This repository is initialized by the Koozali mirror service.\nCanonical development happens on Gitea:\n- https://src.koozali.org/{org}/{repo}\nPlease file issues and patches via Bugzilla:\n- https://bugs.koozali.org\n",
},
"description_template": "Canonical repository for {org}/{repo}",
"homepage_template": "https://src.koozali.org/{org}/{repo}",
},
"push_mirror": {
"enable": True
}
},
"github": {
"owner": "Koozali-SME-Server", # set your GitHub org/user
"auth": {
"mode": "pat",
"token": os.environ.get("GITHUB_TOKEN", ""),
},
"repo_defaults": {
"mirror_visibility": "mirror_source",
"topics": ["mirror", "read-only"],
"description_template": "Read-only mirror of {gitea_repo_url}. Submit patches via Bugzilla: https://bugs.koozali.org",
"homepage_template": "https://bugs.koozali.org",
},
"branch_protection": {
"apply_to": "default_branch",
"include_admins": True,
"allow_force_pushes": True,
"allow_deletions": False,
"restrict_push": {
"users": ["<PAT-username>"], # replaced at runtime
"teams": [],
"apps": []
},
"required_checks": [],
"required_reviews": {
"required_approving_review_count": 0
}
},
"webhook": {
"mode": "server",
"url": os.environ.get("GITHUB_WEBHOOK_URL", "http://koozali.bjsystems.co.uk/webhook/github"),
"secret": os.environ.get("WEBHOOK_SECRET", "change-me"),
"events": ["pull_request"]
}
},
"cache_path": os.environ.get("SYNC_CACHE_PATH", "./.sync-cache"),
"state_path": os.environ.get("SYNC_STATE_PATH", "./sync_state.sqlite"),
"concurrency": 4,
"interval": 1800,
"include": [],
"exclude": [],
"skip_forks": True
},
"bugzilla": {
"base_url": "https://bugs.koozali.org",
"auth": {
"mode": "basic", # or "api_key"
"username": "brianr",
"password": os.environ.get("BUGZILLA_PASSWORD", ""),
},
"product": "SME11",
"component_template": "{repo}",
"component_fallback": "General",
"auto_create_component": False,
"groups": [],
"attach_diff": True,
"failure_policy": {
"close_pr_on_bugzilla_failure": False,
"label_on_bugzilla_failure": "bugzilla-needed"
},
"templates": {
"pr_comment_success":
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
"- Canonical: {gitea_repo_url}\n"
"- Please file and discuss changes in Bugzilla: {bugzilla_base_url}\n\n"
"We created Bug {bug_id} to track this proposal:\n"
"- {bug_url}\n\n"
"This pull request will be closed here. Please follow up on the Bugzilla ticket for review and next steps. Further pushes to this PR branch will be mirrored as updated attachments on the Bug.\n",
"pr_comment_failure":
"Thanks for the contribution!\n\n"
"This repository is a read-only mirror of the canonical repo on Gitea:\n"
"- Canonical: {gitea_repo_url}\n\n"
"We were unable to create a Bugzilla ticket automatically at this time.\n"
"Please open a bug at {bugzilla_base_url} (Product: SME11, Component: {repo}) and include:\n"
"- GitHub PR: {pr_url}\n"
"- Target branch: {target_branch}\n"
"- Summary and rationale for the change\n\n"
"This pull request will remain open for now. Once a Bugzilla ticket exists, our maintainers will reference it here and proceed with review on Bugzilla.\n",
"bug_summary": "[GH PR #{pr_number}] {org}/{repo}: {pr_title}",
"bug_body":
"Source\n"
"- Canonical repo (Gitea): {gitea_repo_url}\n"
"- GitHub mirror PR: {pr_url}\n"
"- Submitted by: {github_user} ({github_user_url})\n"
"- Opened: {created_at}\n\n"
"Target and branch info\n"
"- Base branch: {target_branch} ({base_sha})\n"
"- Head branch: {source_branch} ({head_sha})\n"
"- Commits in PR: {commit_count}\n"
"- Compare: {compare_url}\n\n"
"Project policy\n"
"This GitHub repository is a read-only mirror. Reviews and decisions happen in Bugzilla. Any accepted changes will be applied to the Gitea repository and then mirrored back to GitHub.\n\n"
"Submitters notes\n"
"{pr_body}\n\n"
"Attachments\n"
"- The PR diff/patch is attached automatically by the mirror service. Subsequent updates to this PR will add new attachments to this bug.\n",
"bug_update_comment":
"Update from GitHub PR #{pr_number} for {org}/{repo}\n\n"
"- New head: {head_sha} (base: {base_sha})\n"
"- Commits in update: {commit_count}\n"
"- Compare: {compare_url}\n"
"- PR: {pr_url}\n\n"
"A refreshed patch set has been attached to this bug. Original PR description follows (for context):\n"
"{pr_body}\n",
"pr_sync_short_comment":
"Thanks for the update. Weve attached a refreshed patch set to Bug {bug_id}: {bug_url}\n"
"This PR remains closed; please continue discussion in Bugzilla.\n"
}
}
}
# --------------------------
# Utilities
# --------------------------
def run(cmd: List[str], cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Tuple[int, str, str]:
logger.debug(f"Running command: {' '.join(cmd)} (cwd={cwd})")
p = subprocess.Popen(cmd, cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
out, err = p.communicate()
if p.returncode != 0:
logger.error(f"Command failed ({p.returncode}): {' '.join(cmd)}\nstderr: {err.strip()}")
else:
logger.debug(f"Command succeeded: {' '.join(cmd)}")
return p.returncode, out, err
def ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def safe_join(*parts) -> str:
return os.path.normpath(os.path.join(*parts))
def hmac_sha256(secret: str, data: bytes) -> str:
return "sha256=" + hmac.new(secret.encode("utf-8"), data, hashlib.sha256).hexdigest()
# --------------------------
# State (SQLite)
# --------------------------
class StateStore:
def __init__(self, path: str):
self.path = path
ensure_dir(os.path.dirname(os.path.abspath(path)) or ".")
self.conn = sqlite3.connect(path, check_same_thread=False)
self.conn.execute("PRAGMA journal_mode=WAL;")
self._init()
logger.debug(f"StateStore initialized at {path}")
def _init(self):
cur = self.conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS pr_map (
pr_key TEXT PRIMARY KEY,
bug_id INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS repo_map (
gitea_repo_id INTEGER PRIMARY KEY,
gitea_full_name TEXT NOT NULL,
github_full_name TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
self.conn.commit()
def set_pr_bug(self, pr_key: str, bug_id: int):
now = rfc3339_now()
cur = self.conn.cursor()
cur.execute("""
INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at
""", (pr_key, bug_id, now, now))
self.conn.commit()
logger.debug(f"PR→Bug mapping set {pr_key} -> {bug_id}")
def get_pr_bug(self, pr_key: str) -> Optional[int]:
cur = self.conn.cursor()
cur.execute("SELECT bug_id FROM pr_map WHERE pr_key=?", (pr_key,))
row = cur.fetchone()
return row[0] if row else None
def set_repo_map(self, gitea_repo_id: int, gitea_full_name: str, github_full_name: str):
now = rfc3339_now()
cur = self.conn.cursor()
cur.execute("""
INSERT INTO repo_map (gitea_repo_id, gitea_full_name, github_full_name, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(gitea_repo_id) DO UPDATE SET gitea_full_name=excluded.gitea_full_name, github_full_name=excluded.github_full_name, updated_at=excluded.updated_at
""", (gitea_repo_id, gitea_full_name, github_full_name, now))
self.conn.commit()
logger.debug(f"Repo mapping set {gitea_full_name} (id={gitea_repo_id}) -> {github_full_name}")
def get_repo_map_by_id(self, gitea_repo_id: int) -> Optional[Tuple[str, str]]:
cur = self.conn.cursor()
cur.execute("SELECT gitea_full_name, github_full_name FROM repo_map WHERE gitea_repo_id=?", (gitea_repo_id,))
row = cur.fetchone()
return (row[0], row[1]) if row else None
# --------------------------
# Clients
# --------------------------
class GiteaClient:
def __init__(self, base_url: str, token: str):
import re
self.base_url = base_url.rstrip("/")
self.token = re.sub(r'^(token|bearer)\s+', '', (token or ''), flags=re.I).strip()
logger.debug(f"Gitea token len={len(self.token)} prefix={(self.token[:6] + '...') if self.token else '<empty>'}")
def _url(self, path: str) -> str:
return f"{self.base_url}{path}"
def _request(self, method: str, path: str, *, json_body=None, params=None, timeout=30) -> httpx.Response:
url = self._url(path)
# 1) token scheme
headers = {"Authorization": f"token {self.token}"} if self.token else {}
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout, trust_env=False)
logger.debug(f"Gitea {method} {path} auth=token -> {r.status_code}")
if r.status_code == 401 and self.token:
# 2) Bearer scheme
headers = {"Authorization": f"Bearer {self.token}"}
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout, trust_env=False)
logger.debug(f"Gitea {method} {path} auth=bearer -> {r.status_code}")
if r.status_code == 401:
# 3) Query param fallback (proxy may strip Authorization)
qp = dict(params or {})
qp["access_token"] = self.token
r = httpx.request(method, url, json=json_body, params=qp, timeout=timeout, trust_env=False)
logger.debug(f"Gitea {method} {path} auth=query -> {r.status_code}")
return r
def get_repo(self, owner: str, repo: str):
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}")
if r.status_code == 404:
logger.info(f"Gitea repo {owner}/{repo} not found (404)")
return None
if r.status_code == 401:
logger.error(f"Gitea 401 for GET /repos/{owner}/{repo}. Token may lack access or proxy strips Authorization.")
raise RuntimeError(f"Gitea 401 for GET /repos/{owner}/{repo}. Token user may lack access to org '{owner}', or proxy is stripping Authorization.")
r.raise_for_status()
logger.debug(f"Gitea repo {owner}/{repo} fetched OK")
return r.json()
def create_org_repo(self, org: str, name: str, private: bool, description: str, homepage: str, default_branch: str, auto_init: bool):
payload = {
"name": name,
"private": private,
"description": description,
"website": homepage,
"default_branch": default_branch,
"auto_init": auto_init
}
logger.info(f"Creating Gitea repo {org}/{name} (private={private}, auto_init={auto_init}, default_branch={default_branch})")
r = self._request("POST", f"/api/v1/orgs/{org}/repos", json_body=payload, timeout=60)
if r.status_code == 401:
logger.error(f"Gitea 401 creating repo in org '{org}'")
raise RuntimeError(f"Gitea 401 creating repo in org '{org}'. Ensure token user has permissions and token scopes include write:organization.")
if r.status_code not in (200, 201):
logger.error(f"Gitea create repo failed: {r.status_code} {r.text}")
raise RuntimeError(f"Gitea create repo failed: {r.status_code} {r.text}")
logger.info(f"Gitea repo {org}/{name} created")
return r.json()
def list_org_repos(self, org: str):
out, page = [], 1
logger.debug(f"Listing Gitea repos for org {org}")
while True:
r = self._request("GET", f"/api/v1/orgs/{org}/repos", params={"limit": 50, "page": page})
r.raise_for_status()
items = r.json()
if not items:
break
out.extend(items)
page += 1
logger.info(f"Found {len(out)} repos in Gitea org {org}")
return out
def get_push_mirrors(self, owner: str, repo: str):
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}/push_mirrors")
if r.status_code == 404:
logger.debug(f"Gitea push mirror list not available or none for {owner}/{repo}")
return []
r.raise_for_status()
return r.json()
def create_push_mirror(self, owner: str, repo: str, remote_address: str, username: Optional[str], password: Optional[str], sync_on_commit: bool = True, interval: str = "24h0m0s") -> bool:
payload = {
"remote_address": remote_address,
"remote_username": username or "",
"remote_password": password or "",
"sync_on_commit": sync_on_commit,
"interval": interval,
}
logger.info(f"Configuring Gitea push mirror for {owner}/{repo} -> GitHub (interval={interval}, sync_on_commit={sync_on_commit})")
r = self._request("POST", f"/api/v1/repos/{owner}/{repo}/push_mirrors", json_body=payload, timeout=30)
if r.status_code in (200, 201):
logger.info(f"Gitea push mirror created for {owner}/{repo}")
return True
if r.status_code == 404:
logger.warning("Gitea push mirror API not available on this server/version.")
return False
logger.error(f"Failed to create push mirror on Gitea {owner}/{repo}: {r.status_code} {r.text}")
return False
class GitHubClient:
def __init__(self, owner: str, token: str):
self.owner = owner
self.gh = Github(auth=Auth.Token(token), per_page=100)
self.token = token
# Redacted token logging
red_prefix = (token[:4] + '...') if token else '<empty>'
logger.debug(f"GitHub token prefix={red_prefix} len={len(token) if token else 0}")
try:
self.auth_user = self.gh.get_user().login
logger.info(f"Authenticated to GitHub as {self.auth_user}, target owner={self.owner}")
except Exception as e:
logger.error(f"Failed to authenticate to GitHub: {e}")
self.auth_user = "<unknown>"
def make_push_url(self, repo_name: str) -> str:
user = self.auth_user or "git"
return f"https://{urlquote(user)}:{urlquote(self.token)}@github.com/{self.owner}/{repo_name}.git"
def get_or_create_repo(self, name: str, visibility: str, description: str, homepage: str) -> Repository:
logger.debug(f"Ensuring GitHub repo {self.owner}/{name} (visibility={visibility})")
try:
org = self.gh.get_organization(self.owner)
repo = None
with contextlib.suppress(GithubException):
repo = org.get_repo(name)
if repo is None:
logger.info(f"Creating GitHub repo {self.owner}/{name} under organization")
repo = org.create_repo(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False,
has_projects=False,
has_wiki=False,
auto_init=False
)
else:
repo.edit(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False
)
return repo
except GithubException as e:
if e.status == 404:
logger.info(f"Owner {self.owner} not an org or not visible; trying user namespace")
user = self.gh.get_user(self.owner)
repo = None
with contextlib.suppress(GithubException):
repo = user.get_repo(name)
if repo is None:
logger.info(f"Creating GitHub repo {self.owner}/{name} under user")
repo = user.create_repo(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False,
auto_init=False
)
else:
repo.edit(
name=name,
description=description or "",
homepage=homepage or "",
private=(visibility == "private"),
has_issues=False
)
return repo
logger.error(f"GitHub get_or_create_repo error: {e}")
raise
def enforce_repo_settings(self, repo: Repository, topics: List[str]):
logger.debug(f"Enforcing repo settings on {repo.full_name} (issues off, merge methods config)")
try:
repo.edit(
allow_merge_commit=True, # keep one enabled
allow_squash_merge=False,
allow_rebase_merge=False,
allow_auto_merge=False,
has_issues=False,
)
except GithubException as e:
if getattr(e, "status", None) == 422:
logger.warning(f"{repo.full_name}: 422 no_merge_method; retrying with squash merge method")
repo.edit(
allow_merge_commit=False,
allow_squash_merge=True,
allow_rebase_merge=False,
has_issues=False,
)
else:
logger.error(f"Failed to edit repo settings for {repo.full_name}: {e}")
raise
if topics:
with contextlib.suppress(GithubException):
repo.replace_topics(topics)
logger.debug(f"Set topics on {repo.full_name}: {topics}")
def ensure_webhook(self, repo, url: str, secret: str, events: List[str]):
desired_cfg = {
"url": url,
"content_type": "json",
"secret": secret,
"insecure_ssl": "0",
}
hooks = list(repo.get_hooks())
for h in hooks:
cfg = h.config or {}
if cfg.get("url") == url:
logger.info(f"Updating existing webhook on {repo.full_name}")
h.edit("web", config=desired_cfg, events=events, active=True)
return
logger.info(f"Creating webhook on {repo.full_name}")
repo.create_hook(name="web", config=desired_cfg, events=events, active=True)
def ensure_branch_protection(
self,
repo,
branch_name: str,
include_admins: bool,
allow_force_pushes: bool,
allow_deletions: bool,
restrict_users: list[str],
restrict_teams: list[str],
restrict_apps: list[str],
):
try:
repo.get_branch(branch_name)
except GithubException as e:
logger.warning(f"Branch {branch_name} not found on {repo.full_name}: {e}")
return
owner = repo.owner.login
name = repo.name
base = f"https://api.github.com/repos/{owner}/{name}/branches/{branch_name}"
headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json, application/vnd.github.luke-cage-preview+json",
"X-GitHub-Api-Version": "2022-11-28",
}
# Base protection
logger.debug(f"Applying base protection on {repo.full_name}@{branch_name} (enforce_admins={include_admins})")
r = httpx.put(
f"{base}/protection",
headers=headers,
json={
"required_status_checks": None,
"enforce_admins": bool(include_admins),
"required_pull_request_reviews": None,
"restrictions": None,
},
timeout=30,
)
if r.status_code not in (200, 201):
logger.error(f"Failed to set base protection on {repo.full_name}@{branch_name}: {r.status_code} {r.text}")
return
# Allow force pushes
r_fp = httpx.put(
f"{base}/protection/allow_force_pushes",
headers=headers,
json={"enabled": bool(allow_force_pushes)},
timeout=15,
)
if r_fp.status_code not in (200, 201):
logger.warning(f"Failed to set allow_force_pushes on {repo.full_name}@{branch_name}: {r_fp.status_code} {r_fp.text}")
# Allow deletions
r_del = httpx.put(
f"{base}/protection/allow_deletions",
headers=headers,
json={"enabled": bool(allow_deletions)},
timeout=15,
)
if r_del.status_code not in (200, 201):
logger.warning(f"Failed to set allow_deletions on {repo.full_name}@{branch_name}: {r_del.status_code} {r_del.text}")
# If enabling force pushes failed, remove protection to avoid blocking mirror updates
if allow_force_pushes and r_fp.status_code == 404:
httpx.delete(f"{base}/protection", headers=headers, timeout=15)
logger.warning(f"Removed protection on {repo.full_name}@{branch_name} to prevent blocking mirror (force pushes endpoint 404).")
return
# Push restrictions (org repos only)
is_org = getattr(repo.owner, "type", None) == "Organization"
if is_org and (restrict_users or restrict_teams or restrict_apps):
logger.debug(f"Applying push restrictions on {repo.full_name}@{branch_name} (users={restrict_users}, teams={restrict_teams}, apps={restrict_apps})")
r_res = httpx.put(
f"{base}/protection/restrictions",
headers=headers,
json={"users": restrict_users or [], "teams": restrict_teams or [], "apps": restrict_apps or []},
timeout=30,
)
if r_res.status_code not in (200, 201):
logger.warning(f"Failed to set push restrictions on {repo.full_name}@{branch_name}: {r_res.status_code} {r_res.text}")
logger.info(f"Branch protection applied (best effort) on {repo.full_name}@{branch_name}. Owner type: {getattr(repo.owner,'type',None)}")
def close_pr_with_comment_and_label(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
pr = repo.get_pull(pr_number)
pr.create_issue_comment(comment)
if label:
with contextlib.suppress(GithubException):
repo.create_label(name=label, color="ededed")
with contextlib.suppress(GithubException):
pr.as_issue().add_to_labels(label)
if pr.state != "closed":
pr.edit(state="closed")
logger.info(f"PR #{pr_number} closed with comment on {repo.full_name}")
def comment_on_pr(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
pr = repo.get_pull(pr_number)
pr.create_issue_comment(comment)
if label:
with contextlib.suppress(GithubException):
repo.create_label(name=label, color="ededed")
with contextlib.suppress(GithubException):
pr.as_issue().add_to_labels(label)
logger.info(f"Commented on PR #{pr_number} on {repo.full_name}")
# --------------------------
# Bugzilla integration
# --------------------------
class BugzillaClient:
def __init__(self, cfg: Dict[str, Any]):
self.base_url = cfg["base_url"].rstrip("/")
self.auth_mode = cfg["auth"]["mode"]
self.username = cfg["auth"].get("username", "")
self.password = cfg["auth"].get("password", "")
self.api_key = cfg["auth"].get("api_key", "")
self.product = cfg["product"]
self.component_template = cfg["component_template"]
self.component_fallback = cfg["component_fallback"]
self.auto_create_component = bool(cfg.get("auto_create_component", False))
self.groups = cfg.get("groups", [])
self.attach_diff = bool(cfg.get("attach_diff", True))
self.templates = cfg["templates"]
logger.debug("Bugzilla client initialized")
def _headers(self) -> Dict[str, str]:
headers = {"Accept": "application/json"}
if self.auth_mode == "api_key" and self.api_key:
headers["X-BUGZILLA-API-KEY"] = self.api_key
return headers
def _auth(self) -> Optional[Tuple[str, str]]:
if self.auth_mode == "basic":
return (self.username, self.password)
return None
def create_bug(self, summary: str, description: str, component: str, visibility_groups: Optional[List[str]] = None) -> int:
url = f"{self.base_url}/rest/bug"
payload = {
"product": self.product,
"component": component,
"summary": summary,
"description": description,
}
if visibility_groups:
payload["groups"] = visibility_groups
logger.info(f"Creating Bugzilla bug (product={self.product}, component={component})")
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
if r.status_code not in (200, 201):
logger.error(f"Bugzilla create bug failed: {r.status_code} {r.text}")
raise RuntimeError(f"Bugzilla create bug failed: {r.status_code} {r.text}")
data = r.json()
bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"])
if not bug_id:
logger.error(f"Bugzilla response missing bug id: {data}")
raise RuntimeError(f"Bugzilla response missing bug id: {data}")
logger.info(f"Bugzilla bug created: {bug_id}")
return int(bug_id)
def add_attachment(self, bug_id: int, file_name: str, content_type: str, summary: str, data_bytes: bytes):
url = f"{self.base_url}/rest/bug/{bug_id}/attachment"
payload = {
"ids": [bug_id],
"data": base64.b64encode(data_bytes).decode("ascii"),
"file_name": file_name,
"summary": summary,
"content_type": content_type,
"is_patch": True
}
logger.debug(f"Adding attachment to Bugzilla bug {bug_id}: {file_name}")
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=120)
if r.status_code not in (200, 201):
logger.error(f"Bugzilla add attachment failed: {r.status_code} {r.text}")
raise RuntimeError(f"Bugzilla add attachment failed: {r.status_code} {r.text}")
def add_comment(self, bug_id: int, comment: str):
url = f"{self.base_url}/rest/bug/{bug_id}/comment"
payload = {"comment": comment}
logger.debug(f"Adding comment to Bugzilla bug {bug_id}")
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
if r.status_code not in (200, 201):
logger.error(f"Bugzilla add comment failed: {r.status_code} {r.text}")
raise RuntimeError(f"Bugzilla add comment failed: {r.status_code} {r.text}")
# --------------------------
# Git mirror operations
# --------------------------
class GitMirror:
def __init__(self, cache_dir: str):
self.cache_dir = cache_dir
ensure_dir(cache_dir)
def local_path(self, org: str, repo: str) -> str:
return safe_join(self.cache_dir, f"{org}--{repo}.git")
def ensure_local_mirror(self, org: str, repo: str, gitea_clone_url: str):
path = self.local_path(org, repo)
if not os.path.isdir(path):
ensure_dir(os.path.dirname(path))
logger.info(f"Cloning bare mirror for {org}/{repo}")
rc, out, err = run(["git", "clone", "--mirror", gitea_clone_url, path])
if rc != 0:
raise RuntimeError(f"git clone --mirror failed: {err.strip()}")
else:
# Ensure origin URL is correct and fetch
logger.debug(f"Updating origin URL for {org}/{repo} mirror")
rc, out, err = run(["git", "remote", "set-url", "origin", gitea_clone_url], cwd=path)
if rc != 0:
logger.warning(f"Could not set origin URL for {org}/{repo}: {err.strip()}")
logger.debug(f"Fetching updates for {org}/{repo}")
rc, out, err = run(["git", "fetch", "--prune", "--tags", "--force"], cwd=path)
if rc != 0:
raise RuntimeError(f"git fetch failed: {err.strip()}")
def push_to_github(self, org: str, repo: str, github_url: str):
path = self.local_path(org, repo)
remotes_rc, remotes_out, _ = run(["git", "remote"], cwd=path)
if remotes_rc != 0:
raise RuntimeError("git remote list failed")
remotes = set(remotes_out.strip().splitlines())
if "github" not in remotes:
logger.debug(f"Adding github remote for {org}/{repo}")
rc, out, err = run(["git", "remote", "add", "github", github_url], cwd=path)
if rc != 0:
raise RuntimeError(f"git remote add github failed: {err.strip()}")
else:
logger.debug(f"Updating github remote URL for {org}/{repo}")
rc, out, err = run(["git", "remote", "set-url", "github", github_url], cwd=path)
if rc != 0:
raise RuntimeError(f"git remote set-url github failed: {err.strip()}")
for refspec in ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]:
logger.info(f"Pushing {org}/{repo} refspec {refspec} to GitHub")
rc, out, err = run(["git", "push", "--prune", "github", refspec], cwd=path)
if rc != 0:
raise RuntimeError(f"git push failed for {refspec}: {err.strip()}")
# --------------------------
# Template rendering
# --------------------------
def render_template(tpl: str, vars: Dict[str, Any]) -> str:
try:
return tpl.format(**vars)
except Exception:
return tpl
# --------------------------
# Sync engine
# --------------------------
@dataclasses.dataclass
class RepoContext:
org: str
repo: str
gitea_repo: Dict[str, Any]
github_repo: Repository
class SyncEngine:
def __init__(self, cfg: Dict[str, Any], state: StateStore, gitea: GiteaClient, gh: GitHubClient, mirror: GitMirror):
self.cfg = cfg
self.state = state
self.gitea = gitea
self.gh = gh
self.mirror = mirror
def _derive_texts(self, org: str, repo: str, gitea_http_url: str) -> Tuple[str, str, str, str]:
gcfg = self.cfg["sync"]["gitea"]["new_repo_defaults"]
hcfg = self.cfg["sync"]["github"]["repo_defaults"]
g_desc = render_template(gcfg.get("description_template", "") or "", {"org": org, "repo": repo})
g_home = render_template(gcfg.get("homepage_template", "") or "", {"org": org, "repo": repo})
gh_desc = render_template(hcfg.get("description_template", "") or "", {"gitea_repo_url": gitea_http_url})
gh_home = render_template(hcfg.get("homepage_template", "") or "", {"gitea_repo_url": gitea_http_url})
return g_desc, g_home, gh_desc, gh_home
def ensure_repos_and_mirror(self, org: str, repo: str) -> RepoContext:
logger.debug(f"[Stage 1] Checking source repo on Gitea: {org}/{repo}")
gitea_repo = self.gitea.get_repo(org, repo)
gcfg = self.cfg["sync"]["gitea"]
if gitea_repo is None:
if not gcfg.get("create_if_missing", False):
raise RuntimeError(f"Gitea repo {org}/{repo} not found and auto-creation disabled")
vis = gcfg["new_repo_defaults"]["visibility"]
default_branch = gcfg["new_repo_defaults"]["default_branch"]
auto_init = bool(gcfg["new_repo_defaults"]["seed_new_repo"])
g_desc, g_home, _, _ = self._derive_texts(org, repo, f"{self.gitea.base_url}/{org}/{repo}")
logger.info(f"Creating missing Gitea repo {org}/{repo}")
gitea_repo = self.gitea.create_org_repo(
org=org,
name=repo,
private=(vis == "private"),
description=g_desc,
homepage=g_home,
default_branch=default_branch,
auto_init=auto_init
)
logger.debug(f"[Stage 1] Completed for {org}/{repo}")
gitea_http_clone = gitea_repo.get("clone_url") or f"{self.gitea.base_url}/{org}/{repo}.git"
gitea_html_url = gitea_repo.get("html_url") or f"{self.gitea.base_url}/{org}/{repo}"
logger.debug(f"[Stage 2] Ensuring GitHub repo for {org}/{repo}")
mirror_vis_policy = self.cfg["sync"]["github"]["repo_defaults"]["mirror_visibility"]
visibility = "private" if (gitea_repo.get("private") and mirror_vis_policy == "mirror_source") else "public"
g_desc, g_home, gh_desc, gh_home = self._derive_texts(org, repo, gitea_html_url)
github_repo = self.gh.get_or_create_repo(name=repo, visibility=visibility, description=gh_desc, homepage=gh_home)
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
self.gh.enforce_repo_settings(github_repo, topics=topics)
logger.debug(f"[Stage 2] Completed for {github_repo.full_name}")
logger.debug(f"[Stage 3] Ensuring webhook on {github_repo.full_name}")
wh = self.cfg["sync"]["github"]["webhook"]
if wh.get("mode") == "server":
self.gh.ensure_webhook(github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"]))
logger.debug(f"[Stage 3] Completed")
logger.debug(f"[Stage 4] Mirroring content {org}/{repo} -> {github_repo.full_name}")
gitea_clone = gitea_http_clone
if gitea_repo.get("private"):
token = self.cfg["sync"]["gitea"]["token"]
if token:
gitea_clone = gitea_http_clone.replace("://", f"://{token}@")
self.mirror.ensure_local_mirror(org, repo, gitea_clone)
gh_push = self.gh.make_push_url(repo)
self.mirror.push_to_github(org, repo, gh_push)
logger.debug(f"[Stage 4] Completed")
logger.debug(f"[Stage 5] Applying branch protection on {github_repo.full_name}")
default_branch = gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
bp = self.cfg["sync"]["github"]["branch_protection"]
restrict_users = bp["restrict_push"].get("users", [])
restrict_teams = bp["restrict_push"].get("teams", [])
restrict_apps = bp["restrict_push"].get("apps", [])
if "<PAT-username>" in restrict_users and self.gh.auth_user != "<unknown>":
restrict_users = [u for u in restrict_users if u != "<PAT-username>"] + [self.gh.auth_user]
self.gh.ensure_branch_protection(
repo=github_repo,
branch_name=default_branch,
include_admins=bp.get("include_admins", True),
allow_force_pushes=bp.get("allow_force_pushes", True),
allow_deletions=bp.get("allow_deletions", False),
restrict_users=restrict_users,
restrict_teams=restrict_teams,
restrict_apps=restrict_apps
)
logger.debug(f"[Stage 5] Completed")
logger.debug(f"[Stage 6] Configuring Gitea push mirror (if enabled)")
if self.cfg["sync"]["gitea"].get("push_mirror", {}).get("enable", True):
gh_pat = self.gh.token
remote_address = f"https://{gh_pat}:x-oauth-basic@github.com/{self.gh.owner}/{repo}.git"
with contextlib.suppress(Exception):
self.gitea.create_push_mirror(owner=org, repo=repo, remote_address=remote_address, username=None, password=None, sync_on_commit=True)
logger.debug(f"[Stage 6] Completed")
if gitea_repo.get("id"):
self.state.set_repo_map(int(gitea_repo["id"]), f"{org}/{repo}", f"{self.gh.owner}/{repo}")
logger.info(f"Mirror ensure complete: {org}/{repo} -> {github_repo.full_name}")
return RepoContext(org=org, repo=repo, gitea_repo=gitea_repo, github_repo=github_repo)
def enforce_settings_only(self, ctx: RepoContext):
logger.debug(f"Enforcing settings-only for {ctx.github_repo.full_name}")
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
self.gh.enforce_repo_settings(ctx.github_repo, topics=topics)
wh = self.cfg["sync"]["github"]["webhook"]
if wh.get("mode") == "server":
self.gh.ensure_webhook(ctx.github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"]))
default_branch = ctx.gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
bp = self.cfg["sync"]["github"]["branch_protection"]
restrict_users = bp["restrict_push"].get("users", [])
if "<PAT-username>" in restrict_users and self.gh.auth_user != "<unknown>":
restrict_users = [u for u in restrict_users if u != "<PAT-username>"] + [self.gh.auth_user]
self.gh.ensure_branch_protection(
repo=ctx.github_repo,
branch_name=default_branch,
include_admins=bp.get("include_admins", True),
allow_force_pushes=bp.get("allow_force_pushes", True),
allow_deletions=bp.get("allow_deletions", False),
restrict_users=restrict_users,
restrict_teams=bp["restrict_push"].get("teams", []),
restrict_apps=bp["restrict_push"].get("apps", [])
)
logger.info(f"Settings enforced on {ctx.github_repo.full_name}")
# --------------------------
# Webhook server (PR autocloser)
# --------------------------
class PRAutocloserServer:
def __init__(self, cfg: Dict[str, Any], state: StateStore, gh_client: GitHubClient, bz_client: BugzillaClient):
self.cfg = cfg
self.state = state
self.ghc = gh_client
self.bzc = bz_client
self.app = FastAPI()
self._setup_routes()
logger.info("Webhook server initialized")
def _verify_signature(self, secret: str, body: bytes, signature: str):
expected = hmac_sha256(secret, body)
if not hmac.compare_digest(expected, signature):
logger.warning("Webhook signature verification failed")
raise HTTPException(status_code=401, detail="Invalid signature")
async def _handle_pr_event(self, payload: Dict[str, Any]):
action = payload.get("action")
if action not in ("opened", "reopened", "synchronize", "ready_for_review"):
logger.debug(f"Ignored PR action: {action}")
return JSONResponse({"status": "ignored", "action": action})
repo_full = payload["repository"]["full_name"]
owner = payload["repository"]["owner"]["login"]
repo = payload["repository"]["name"]
logger.info(f"Handling PR event '{action}' for {repo_full}")
pr = payload["pull_request"]
pr_number = pr["number"]
pr_url = pr["html_url"]
pr_title = pr.get("title") or ""
pr_body = pr.get("body") or ""
github_user = pr["user"]["login"]
github_user_url = pr["user"]["html_url"]
base = pr["base"]
head = pr["head"]
target_branch = base["ref"]
source_branch = f"{head.get('repo', {}).get('owner', {}).get('login', '')}:{head['ref']}" if head.get("repo") else head["ref"]
base_sha = base["sha"][:7]
head_sha = head["sha"][:7]
created_at = pr.get("created_at") or rfc3339_now()
labels = [l["name"] for l in pr.get("labels", [])] if pr.get("labels") else []
compare_url = pr.get("html_url") + "/files"
org = self.cfg["sync"]["gitea"]["org"]
gitea_repo_url = f"{self.cfg['sync']['gitea']['base_url']}/{org}/{repo}"
bugzilla_base_url = self.cfg["bugzilla"]["base_url"]
variables = {
"bugzilla_base_url": bugzilla_base_url,
"bug_id": "",
"bug_url": "",
"org": org,
"repo": repo,
"gitea_repo_url": gitea_repo_url,
"github_owner": owner,
"pr_number": pr_number,
"pr_title": pr_title,
"pr_url": pr_url,
"github_user": github_user,
"github_user_url": github_user_url,
"source_branch": source_branch,
"target_branch": target_branch,
"head_sha": head_sha,
"base_sha": base_sha,
"created_at": created_at,
"pr_body": pr_body,
"commit_count": pr.get("commits", ""),
"compare_url": compare_url,
"labels": ", ".join(labels)
}
repo_obj = self.ghc.gh.get_repo(repo_full)
pr_key = f"{repo_full}#{pr_number}"
bug_id = self.state.get_pr_bug(pr_key)
if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"):
component = self.cfg["bugzilla"]["component_template"].format(repo=repo)
bug_summary = render_template(self.cfg["bugzilla"]["templates"]["bug_summary"], variables)
bug_body = render_template(self.cfg["bugzilla"]["templates"]["bug_body"], variables)
create_ok = True
try:
bug_id = self.bzc.create_bug(bug_summary, bug_body, component, visibility_groups=self.cfg["bugzilla"].get("groups", []))
self.state.set_pr_bug(pr_key, bug_id)
logger.info(f"Bugzilla bug {bug_id} created for PR {pr_key}")
except Exception as e:
logger.error(f"Bugzilla create bug failed for PR {pr_key}: {e}")
create_ok = False
if create_ok:
if self.cfg["bugzilla"].get("attach_diff", True):
try:
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
r = httpx.get(api_patch_url, headers=headers, timeout=120)
r.raise_for_status()
patch_bytes = r.content
self.bzc.add_attachment(
bug_id=bug_id,
file_name=f"PR-{pr_number}-{head_sha}.patch",
content_type="text/x-patch",
summary=f"Patch for PR #{pr_number} ({head_sha})",
data_bytes=patch_bytes
)
logger.debug(f"Attached patch to bug {bug_id} for PR {pr_key}")
except Exception as e:
logger.warning(f"Failed to attach patch for PR {pr_key} to bug {bug_id}: {e}")
variables["bug_id"] = str(bug_id)
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_success"], variables)
self.ghc.close_pr_with_comment_and_label(repo_obj, pr_number, comment, label=None)
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
else:
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_failure"], variables)
label = self.cfg["bugzilla"]["failure_policy"].get("label_on_bugzilla_failure") or None
self.ghc.comment_on_pr(repo_obj, pr_number, comment, label=label)
return JSONResponse({"status": "bugzilla_failed", "action": action})
elif bug_id is not None and action == "synchronize":
try:
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
r = httpx.get(api_patch_url, headers=headers, timeout=120)
r.raise_for_status()
patch_bytes = r.content
self.bzc.add_attachment(
bug_id=bug_id,
file_name=f"PR-{pr_number}-{head_sha}.patch",
content_type="text/x-patch",
summary=f"Updated patch for PR #{pr_number} ({head_sha})",
data_bytes=patch_bytes
)
short_tpl = self.cfg["bugzilla"]["templates"].get("pr_sync_short_comment")
if short_tpl:
variables["bug_id"] = str(bug_id)
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
self.ghc.comment_on_pr(repo_obj, pr_number, render_template(short_tpl, variables), label=None)
pr_obj = repo_obj.get_pull(pr_number)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
logger.info(f"Updated patch attached and PR ensured closed (PR #{pr_number})")
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
except Exception as e:
logger.error(f"Failed to attach updated patch for PR {pr_key} to bug {bug_id}: {e}")
return JSONResponse({"status": "attach_failed", "bug_id": bug_id, "action": action})
return JSONResponse({"status": "noop", "action": action})
def _setup_routes(self):
@self.app.get("/healthz")
async def healthz():
return PlainTextResponse("ok")
@self.app.post("/webhook/github")
async def github_webhook(request: Request, x_hub_signature_256: str = Header(None), x_github_event: str = Header(None)):
body = await request.body()
secret = self.cfg["sync"]["github"]["webhook"]["secret"]
if not x_hub_signature_256:
logger.warning("Webhook missing signature")
raise HTTPException(status_code=401, detail="Missing signature")
self._verify_signature(secret, body, x_hub_signature_256)
try:
payload = json.loads(body.decode("utf-8"))
except Exception:
logger.warning("Invalid JSON in webhook")
raise HTTPException(status_code=400, detail="Invalid JSON")
if x_github_event != "pull_request":
logger.debug(f"Ignored event {x_github_event}")
return JSONResponse({"status": "ignored", "event": x_github_event})
return await self._handle_pr_event(payload)
# --------------------------
# CLI and workflows
# --------------------------
def load_config(path: Optional[str]) -> Dict[str, Any]:
cfg = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy
if path:
with open(path, "r") as f:
user_cfg = yaml.safe_load(f) or {}
cfg = merge_dicts(cfg, user_cfg)
return cfg
def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
out = dict(a)
for k, v in b.items():
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
out[k] = merge_dicts(out[k], v)
else:
out[k] = v
return out
def single_shot(cfg: Dict[str, Any], org_repo: str):
logger.info(f"Single-shot sync starting for {org_repo}")
org, repo = org_repo.split("/", 1)
state = StateStore(cfg["sync"]["state_path"])
gitea = GiteaClient(cfg["sync"]["gitea"]["base_url"], cfg["sync"]["gitea"]["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
ctx = engine.ensure_repos_and_mirror(org, repo)
logger.info(f"Single-shot sync completed for {org_repo}")
print(json.dumps({
"gitea_repo": ctx.gitea_repo.get("full_name") or f"{org}/{repo}",
"github_repo": ctx.github_repo.full_name
}, indent=2))
def continuous_mode(cfg: Dict[str, Any], interval: int):
state = StateStore(cfg["sync"]["state_path"])
gitea_cfg = cfg["sync"]["gitea"]
gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
org = gitea_cfg["org"]
logger.info(f"Starting continuous sync loop for Gitea org {org} every {interval}s")
stop = threading.Event()
def handle_sig(sig, frame):
logger.info("Shutting down continuous loop...")
stop.set()
signal.signal(signal.SIGINT, handle_sig)
signal.signal(signal.SIGTERM, handle_sig)
while not stop.is_set():
try:
repos = gitea.list_org_repos(org)
for r in repos:
name = r["name"]
if cfg["sync"].get("skip_forks", True) and r.get("fork", False):
logger.debug(f"Skipping fork {org}/{name}")
continue
try:
logger.debug(f"Syncing repo {org}/{name}")
engine.ensure_repos_and_mirror(org, name)
except Exception as e:
logger.error(f"Error syncing {org}/{name}: {e}")
except Exception as e:
logger.error(f"Scan error: {e}")
for _ in range(interval):
if stop.is_set():
break
time.sleep(1)
def validate_mode(cfg: Dict[str, Any], dry_run: bool):
state = StateStore(cfg["sync"]["state_path"])
gitea_cfg = cfg["sync"]["gitea"]
gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
org = gitea_cfg["org"]
logger.info(f"Validation scan starting for Gitea org {org} (dry_run={dry_run})")
repos = gitea.list_org_repos(org)
report = []
for r in repos:
name = r["name"]
target_full = f"{gh.owner}/{name}"
ok_presence = True
try:
repo_obj = gh.gh.get_repo(target_full)
except GithubException:
ok_presence = False
repo_obj = None
item = {
"repo": f"{org}/{name}",
"github_repo": target_full,
"exists_on_github": ok_presence,
"issues_disabled": None,
"merge_methods_disabled": None,
"webhook_present": None,
"branch_protection": None
}
if ok_presence and repo_obj is not None:
item["issues_disabled"] = (repo_obj.has_issues is False)
item["merge_methods_disabled"] = (not repo_obj.allow_merge_commit and not repo_obj.allow_squash_merge and not repo_obj.allow_rebase_merge)
wh_url = cfg["sync"]["github"]["webhook"]["url"]
item["webhook_present"] = any(h.config.get("url") == wh_url for h in repo_obj.get_hooks())
default_branch = r.get("default_branch") or cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
try:
b = repo_obj.get_branch(default_branch)
prot = b.protection
item["branch_protection"] = bool(prot and prot.enabled)
except GithubException:
item["branch_protection"] = False
report.append(item)
if not dry_run:
try:
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
ctx = engine.ensure_repos_and_mirror(org, name)
engine.enforce_settings_only(ctx)
except Exception as e:
logger.error(f"Validation fix failed for {org}/{name}: {e}")
print(json.dumps({"report": report}, indent=2))
logger.info("Validation scan complete")
def run_webhook_server(cfg: Dict[str, Any]):
state = StateStore(cfg["sync"]["state_path"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
bz = BugzillaClient(cfg["bugzilla"])
server = PRAutocloserServer(cfg, state, gh, bz)
url = cfg["sync"]["github"]["webhook"]["url"]
parsed_port = 8080
host = "0.0.0.0"
with contextlib.suppress(Exception):
from urllib.parse import urlparse
u = urlparse(url)
if u.port:
parsed_port = u.port
logger.info(f"Starting webhook server on {host}:{parsed_port}")
uvicorn.run(server.app, host=host, port=parsed_port)
def main():
ap = argparse.ArgumentParser(description="Gitea→GitHub mirror and PR autocloser")
ap.add_argument("--config", help="Path to YAML config", default=None)
ap.add_argument("--mode", choices=["single", "continuous", "validate", "webhook"], required=True)
ap.add_argument("--gitea-repo", help="org/repo for single mode")
ap.add_argument("--interval", type=int, help="Polling interval seconds (continuous)", default=None)
ap.add_argument("--dry-run", action="store_true", help="Validation dry run")
ap.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"), help="DEBUG, INFO, WARNING, ERROR")
args = ap.parse_args()
configure_logging(args.log_level)
cfg = load_config(args.config)
if args.mode == "single":
if not args.gitea_repo:
logger.error("--gitea-repo org/repo is required for single mode")
sys.exit(2)
single_shot(cfg, args.gitea_repo)
elif args.mode == "continuous":
interval = args.interval or cfg["sync"]["interval"]
continuous_mode(cfg, interval)
elif args.mode == "validate":
validate_mode(cfg, dry_run=args.dry_run)
elif args.mode == "webhook":
run_webhook_server(cfg)
if __name__ == "__main__":
main()