Add proper logging and fix deprecated time calls.

This commit is contained in:
2025-09-16 17:15:51 +02:00
parent 2147fdbb02
commit d2b8690c0a

View File

@@ -9,6 +9,7 @@ import hashlib
import hmac
import io
import json
import logging
import os
import signal
import sqlite3
@@ -19,7 +20,7 @@ import time
from typing import Any, Dict, List, Optional, Tuple
# Third-party deps:
# pip install pyyaml httpx PyGithub fastapi uvicorn
# pip install PyYAML httpx PyGithub fastapi uvicorn
import httpx
import yaml
from fastapi import FastAPI, Header, HTTPException, Request
@@ -31,15 +32,36 @@ import uvicorn
from urllib.parse import quote as urlquote
from datetime import timezone
# --------------------------
# Time helpers
# --------------------------
def utc_now() -> dt.datetime:
return dt.datetime.now(timezone.utc)
def rfc3339_now() -> str:
# Example: 2025-09-16T12:10:03Z
return utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
def log(msg: str):
print(f"[{rfc3339_now()}] {msg}", flush=True)
# --------------------------
# Logging setup
# --------------------------
class UTCFormatter(logging.Formatter):
converter = time.gmtime
def formatTime(self, record, datefmt=None):
ts = dt.datetime.fromtimestamp(record.created, tz=timezone.utc)
return ts.isoformat(timespec="seconds").replace("+00:00", "Z")
def configure_logging(level_name: str = "INFO"):
level = getattr(logging, (level_name or "INFO").upper(), logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(UTCFormatter("[%(asctime)s] %(levelname)s %(message)s"))
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(level)
logger = logging.getLogger(__name__)
# --------------------------
# Configuration and defaults
@@ -186,8 +208,13 @@ DEFAULT_CONFIG = {
# --------------------------
def run(cmd: List[str], cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> Tuple[int, str, str]:
logger.debug(f"Running command: {' '.join(cmd)} (cwd={cwd})")
p = subprocess.Popen(cmd, cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
out, err = p.communicate()
if p.returncode != 0:
logger.error(f"Command failed ({p.returncode}): {' '.join(cmd)}\nstderr: {err.strip()}")
else:
logger.debug(f"Command succeeded: {' '.join(cmd)}")
return p.returncode, out, err
def ensure_dir(path: str):
@@ -210,6 +237,7 @@ class StateStore:
self.conn = sqlite3.connect(path, check_same_thread=False)
self.conn.execute("PRAGMA journal_mode=WAL;")
self._init()
logger.debug(f"StateStore initialized at {path}")
def _init(self):
cur = self.conn.cursor()
@@ -240,6 +268,7 @@ class StateStore:
ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at
""", (pr_key, bug_id, now, now))
self.conn.commit()
logger.debug(f"PR→Bug mapping set {pr_key} -> {bug_id}")
def get_pr_bug(self, pr_key: str) -> Optional[int]:
cur = self.conn.cursor()
@@ -256,6 +285,7 @@ class StateStore:
ON CONFLICT(gitea_repo_id) DO UPDATE SET gitea_full_name=excluded.gitea_full_name, github_full_name=excluded.github_full_name, updated_at=excluded.updated_at
""", (gitea_repo_id, gitea_full_name, github_full_name, now))
self.conn.commit()
logger.debug(f"Repo mapping set {gitea_full_name} (id={gitea_repo_id}) -> {github_full_name}")
def get_repo_map_by_id(self, gitea_repo_id: int) -> Optional[Tuple[str, str]]:
cur = self.conn.cursor()
@@ -269,11 +299,10 @@ class StateStore:
class GiteaClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.token = (token or "").strip()
import re
self.base_url = base_url.rstrip("/")
self.token = re.sub(r'^(token|bearer)\s+', '', (token or ''), flags=re.I).strip()
log(f"Gitea token len={len(self.token)} prefix={self.token[:6]+'...' if self.token else '<empty>'}")
logger.debug(f"Gitea token len={len(self.token)} prefix={(self.token[:6] + '...') if self.token else '<empty>'}")
def _url(self, path: str) -> str:
return f"{self.base_url}{path}"
@@ -282,25 +311,31 @@ class GiteaClient:
url = self._url(path)
# 1) token scheme
headers = {"Authorization": f"token {self.token}"} if self.token else {}
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout)
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout, trust_env=False)
logger.debug(f"Gitea {method} {path} auth=token -> {r.status_code}")
if r.status_code == 401 and self.token:
# 2) Bearer scheme
headers = {"Authorization": f"Bearer {self.token}"}
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout)
r = httpx.request(method, url, headers=headers, json=json_body, params=params, timeout=timeout, trust_env=False)
logger.debug(f"Gitea {method} {path} auth=bearer -> {r.status_code}")
if r.status_code == 401:
# 3) Query param fallback (proxy may strip Authorization)
qp = dict(params or {})
qp["access_token"] = self.token
r = httpx.request(method, url, json=json_body, params=qp, timeout=timeout)
r = httpx.request(method, url, json=json_body, params=qp, timeout=timeout, trust_env=False)
logger.debug(f"Gitea {method} {path} auth=query -> {r.status_code}")
return r
def get_repo(self, owner: str, repo: str):
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}")
if r.status_code == 404:
logger.info(f"Gitea repo {owner}/{repo} not found (404)")
return None
if r.status_code == 401:
logger.error(f"Gitea 401 for GET /repos/{owner}/{repo}. Token may lack access or proxy strips Authorization.")
raise RuntimeError(f"Gitea 401 for GET /repos/{owner}/{repo}. Token user may lack access to org '{owner}', or proxy is stripping Authorization.")
r.raise_for_status()
logger.debug(f"Gitea repo {owner}/{repo} fetched OK")
return r.json()
def create_org_repo(self, org: str, name: str, private: bool, description: str, homepage: str, default_branch: str, auto_init: bool):
@@ -312,15 +347,20 @@ class GiteaClient:
"default_branch": default_branch,
"auto_init": auto_init
}
logger.info(f"Creating Gitea repo {org}/{name} (private={private}, auto_init={auto_init}, default_branch={default_branch})")
r = self._request("POST", f"/api/v1/orgs/{org}/repos", json_body=payload, timeout=60)
if r.status_code == 401:
raise RuntimeError(f"Gitea 401 creating repo in org '{org}'. Ensure token user is org member with create permissions and token scopes include write:organization.")
logger.error(f"Gitea 401 creating repo in org '{org}'")
raise RuntimeError(f"Gitea 401 creating repo in org '{org}'. Ensure token user has permissions and token scopes include write:organization.")
if r.status_code not in (200, 201):
logger.error(f"Gitea create repo failed: {r.status_code} {r.text}")
raise RuntimeError(f"Gitea create repo failed: {r.status_code} {r.text}")
logger.info(f"Gitea repo {org}/{name} created")
return r.json()
def list_org_repos(self, org: str):
out, page = [], 1
logger.debug(f"Listing Gitea repos for org {org}")
while True:
r = self._request("GET", f"/api/v1/orgs/{org}/repos", params={"limit": 50, "page": page})
r.raise_for_status()
@@ -329,31 +369,34 @@ class GiteaClient:
break
out.extend(items)
page += 1
logger.info(f"Found {len(out)} repos in Gitea org {org}")
return out
def get_push_mirrors(self, owner: str, repo: str):
r = self._request("GET", f"/api/v1/repos/{owner}/{repo}/push_mirrors")
if r.status_code == 404:
logger.debug(f"Gitea push mirror list not available or none for {owner}/{repo}")
return []
r.raise_for_status()
return r.json()
def create_push_mirror(self, owner: str, repo: str, remote_address: str, username: Optional[str], password: Optional[str], sync_on_commit: bool = True, interval: str = "24h0m0s") -> bool:
url = self._url(f"/api/v1/repos/{owner}/{repo}/push_mirrors")
payload = {
"remote_address": remote_address,
"remote_username": username or "",
"remote_password": password or "",
"sync_on_commit": sync_on_commit,
"interval": interval, # required by Gitea 1.24
"interval": interval,
}
r = httpx.post(url, headers=self._headers_token_first(), json=payload, timeout=30)
logger.info(f"Configuring Gitea push mirror for {owner}/{repo} -> GitHub (interval={interval}, sync_on_commit={sync_on_commit})")
r = self._request("POST", f"/api/v1/repos/{owner}/{repo}/push_mirrors", json_body=payload, timeout=30)
if r.status_code in (200, 201):
logger.info(f"Gitea push mirror created for {owner}/{repo}")
return True
if r.status_code == 404:
log("Gitea push mirror API not available on this server/version.")
logger.warning("Gitea push mirror API not available on this server/version.")
return False
log(f"Failed to create push mirror on Gitea: {r.status_code} {r.text}")
logger.error(f"Failed to create push mirror on Gitea {owner}/{repo}: {r.status_code} {r.text}")
return False
class GitHubClient:
@@ -361,11 +404,14 @@ class GitHubClient:
self.owner = owner
self.gh = Github(auth=Auth.Token(token), per_page=100)
self.token = token
log(f"Github token {token} ")
# Redacted token logging
red_prefix = (token[:4] + '...') if token else '<empty>'
logger.debug(f"GitHub token prefix={red_prefix} len={len(token) if token else 0}")
try:
self.auth_user = self.gh.get_user().login
log(f"login to github {self.auth_user} {self.owner}")
except Exception:
logger.info(f"Authenticated to GitHub as {self.auth_user}, target owner={self.owner}")
except Exception as e:
logger.error(f"Failed to authenticate to GitHub: {e}")
self.auth_user = "<unknown>"
def make_push_url(self, repo_name: str) -> str:
@@ -373,12 +419,14 @@ class GitHubClient:
return f"https://{urlquote(user)}:{urlquote(self.token)}@github.com/{self.owner}/{repo_name}.git"
def get_or_create_repo(self, name: str, visibility: str, description: str, homepage: str) -> Repository:
logger.debug(f"Ensuring GitHub repo {self.owner}/{name} (visibility={visibility})")
try:
org = self.gh.get_organization(self.owner)
repo = None
with contextlib.suppress(GithubException):
repo = org.get_repo(name)
if repo is None:
logger.info(f"Creating GitHub repo {self.owner}/{name} under organization")
repo = org.create_repo(
name=name,
description=description or "",
@@ -399,13 +447,14 @@ class GitHubClient:
)
return repo
except GithubException as e:
# If owner is a user, not an org
if e.status == 404:
logger.info(f"Owner {self.owner} not an org or not visible; trying user namespace")
user = self.gh.get_user(self.owner)
repo = None
with contextlib.suppress(GithubException):
repo = user.get_repo(name)
if repo is None:
logger.info(f"Creating GitHub repo {self.owner}/{name} under user")
repo = user.create_repo(
name=name,
description=description or "",
@@ -423,22 +472,22 @@ class GitHubClient:
has_issues=False
)
return repo
logger.error(f"GitHub get_or_create_repo error: {e}")
raise
def enforce_repo_settings(self, repo: Repository, topics: List[str]):
# Ensure at least one merge method is enabled (GitHub requirement)
logger.debug(f"Enforcing repo settings on {repo.full_name} (issues off, merge methods config)")
try:
repo.edit(
allow_merge_commit=True, # keep one enabled
allow_squash_merge=False,
allow_rebase_merge=False,
allow_auto_merge=False, # optional explicit
allow_auto_merge=False,
has_issues=False,
)
except GithubException as e:
# Fallback in case older API fields differ, or we hit the 422 anyway
if getattr(e, "status", None) == 422:
# Retry with a different single merge method just in case
logger.warning(f"{repo.full_name}: 422 no_merge_method; retrying with squash merge method")
repo.edit(
allow_merge_commit=False,
allow_squash_merge=True,
@@ -446,37 +495,29 @@ class GitHubClient:
has_issues=False,
)
else:
logger.error(f"Failed to edit repo settings for {repo.full_name}: {e}")
raise
# Topics
if topics:
with contextlib.suppress(GithubException):
repo.replace_topics(topics)
logger.debug(f"Set topics on {repo.full_name}: {topics}")
def ensure_webhook(self, repo, url: str, secret: str, events: list[str]):
def ensure_webhook(self, repo, url: str, secret: str, events: List[str]):
desired_cfg = {
"url": url,
"content_type": "json",
"secret": secret, # always set; GitHub wont echo it back
"insecure_ssl": "0", # "0" or "1" as strings
"secret": secret,
"insecure_ssl": "0",
}
hooks = list(repo.get_hooks())
for h in hooks:
cfg = h.config or {}
if cfg.get("url") == url:
# Update in place. Must pass name="web" as first arg.
try:
h.edit("web", config=desired_cfg, events=events, active=True)
except Exception as e:
# Some PyGithub versions need add/remove events instead; fallback to setting full list
logger.info(f"Updating existing webhook on {repo.full_name}")
h.edit("web", config=desired_cfg, events=events, active=True)
return
# Create if missing
repo.create_hook(
name="web",
config=desired_cfg,
events=events,
active=True,
)
logger.info(f"Creating webhook on {repo.full_name}")
repo.create_hook(name="web", config=desired_cfg, events=events, active=True)
def ensure_branch_protection(
self,
@@ -489,11 +530,10 @@ class GitHubClient:
restrict_teams: list[str],
restrict_apps: list[str],
):
# Ensure branch exists
try:
repo.get_branch(branch_name)
except GithubException as e:
log(f"Branch {branch_name} not found on {repo.full_name}: {e}")
logger.warning(f"Branch {branch_name} not found on {repo.full_name}: {e}")
return
owner = repo.owner.login
@@ -505,7 +545,8 @@ class GitHubClient:
"X-GitHub-Api-Version": "2022-11-28",
}
# 1) Base protection
# Base protection
logger.debug(f"Applying base protection on {repo.full_name}@{branch_name} (enforce_admins={include_admins})")
r = httpx.put(
f"{base}/protection",
headers=headers,
@@ -518,10 +559,10 @@ class GitHubClient:
timeout=30,
)
if r.status_code not in (200, 201):
log(f"Failed to set base protection on {repo.full_name}@{branch_name}: {r.status_code} {r.text}")
logger.error(f"Failed to set base protection on {repo.full_name}@{branch_name}: {r.status_code} {r.text}")
return
# 2) Allow force pushes
# Allow force pushes
r_fp = httpx.put(
f"{base}/protection/allow_force_pushes",
headers=headers,
@@ -529,9 +570,9 @@ class GitHubClient:
timeout=15,
)
if r_fp.status_code not in (200, 201):
log(f"Failed to set allow_force_pushes on {repo.full_name}@{branch_name}: {r_fp.status_code} {r_fp.text}")
logger.warning(f"Failed to set allow_force_pushes on {repo.full_name}@{branch_name}: {r_fp.status_code} {r_fp.text}")
# 3) Allow deletions
# Allow deletions
r_del = httpx.put(
f"{base}/protection/allow_deletions",
headers=headers,
@@ -539,17 +580,18 @@ class GitHubClient:
timeout=15,
)
if r_del.status_code not in (200, 201):
log(f"Failed to set allow_deletions on {repo.full_name}@{branch_name}: {r_del.status_code} {r_del.text}")
logger.warning(f"Failed to set allow_deletions on {repo.full_name}@{branch_name}: {r_del.status_code} {r_del.text}")
# If we couldnt enable force pushes, remove protection to avoid blocking the mirror
# If enabling force pushes failed, remove protection to avoid blocking mirror updates
if allow_force_pushes and r_fp.status_code == 404:
httpx.delete(f"{base}/protection", headers=headers, timeout=15)
log(f"Removed protection on {repo.full_name}@{branch_name} to prevent blocking mirror (force pushes endpoint 404).")
logger.warning(f"Removed protection on {repo.full_name}@{branch_name} to prevent blocking mirror (force pushes endpoint 404).")
return
# 4) Restrictions (org repos only)
# Push restrictions (org repos only)
is_org = getattr(repo.owner, "type", None) == "Organization"
if is_org and (restrict_users or restrict_teams or restrict_apps):
logger.debug(f"Applying push restrictions on {repo.full_name}@{branch_name} (users={restrict_users}, teams={restrict_teams}, apps={restrict_apps})")
r_res = httpx.put(
f"{base}/protection/restrictions",
headers=headers,
@@ -557,8 +599,8 @@ class GitHubClient:
timeout=30,
)
if r_res.status_code not in (200, 201):
log(f"Failed to set push restrictions on {repo.full_name}@{branch_name}: {r_res.status_code} {r_res.text}")
log(f"Repo owner type for {repo.full_name} is {getattr(repo.owner,'type',None)}")
logger.warning(f"Failed to set push restrictions on {repo.full_name}@{branch_name}: {r_res.status_code} {r_res.text}")
logger.info(f"Branch protection applied (best effort) on {repo.full_name}@{branch_name}. Owner type: {getattr(repo.owner,'type',None)}")
def close_pr_with_comment_and_label(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
pr = repo.get_pull(pr_number)
@@ -570,6 +612,7 @@ class GitHubClient:
pr.as_issue().add_to_labels(label)
if pr.state != "closed":
pr.edit(state="closed")
logger.info(f"PR #{pr_number} closed with comment on {repo.full_name}")
def comment_on_pr(self, repo: Repository, pr_number: int, comment: str, label: Optional[str] = None):
pr = repo.get_pull(pr_number)
@@ -579,6 +622,7 @@ class GitHubClient:
repo.create_label(name=label, color="ededed")
with contextlib.suppress(GithubException):
pr.as_issue().add_to_labels(label)
logger.info(f"Commented on PR #{pr_number} on {repo.full_name}")
# --------------------------
# Bugzilla integration
@@ -598,6 +642,7 @@ class BugzillaClient:
self.groups = cfg.get("groups", [])
self.attach_diff = bool(cfg.get("attach_diff", True))
self.templates = cfg["templates"]
logger.debug("Bugzilla client initialized")
def _headers(self) -> Dict[str, str]:
headers = {"Accept": "application/json"}
@@ -620,13 +665,17 @@ class BugzillaClient:
}
if visibility_groups:
payload["groups"] = visibility_groups
logger.info(f"Creating Bugzilla bug (product={self.product}, component={component})")
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
if r.status_code not in (200, 201):
logger.error(f"Bugzilla create bug failed: {r.status_code} {r.text}")
raise RuntimeError(f"Bugzilla create bug failed: {r.status_code} {r.text}")
data = r.json()
bug_id = data.get("id") or (data.get("bugs") and data["bugs"][0]["id"])
if not bug_id:
logger.error(f"Bugzilla response missing bug id: {data}")
raise RuntimeError(f"Bugzilla response missing bug id: {data}")
logger.info(f"Bugzilla bug created: {bug_id}")
return int(bug_id)
def add_attachment(self, bug_id: int, file_name: str, content_type: str, summary: str, data_bytes: bytes):
@@ -639,15 +688,19 @@ class BugzillaClient:
"content_type": content_type,
"is_patch": True
}
logger.debug(f"Adding attachment to Bugzilla bug {bug_id}: {file_name}")
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=120)
if r.status_code not in (200, 201):
logger.error(f"Bugzilla add attachment failed: {r.status_code} {r.text}")
raise RuntimeError(f"Bugzilla add attachment failed: {r.status_code} {r.text}")
def add_comment(self, bug_id: int, comment: str):
url = f"{self.base_url}/rest/bug/{bug_id}/comment"
payload = {"comment": comment}
logger.debug(f"Adding comment to Bugzilla bug {bug_id}")
r = httpx.post(url, headers=self._headers(), auth=self._auth(), json=payload, timeout=60)
if r.status_code not in (200, 201):
logger.error(f"Bugzilla add comment failed: {r.status_code} {r.text}")
raise RuntimeError(f"Bugzilla add comment failed: {r.status_code} {r.text}")
# --------------------------
@@ -666,36 +719,39 @@ class GitMirror:
path = self.local_path(org, repo)
if not os.path.isdir(path):
ensure_dir(os.path.dirname(path))
logger.info(f"Cloning bare mirror for {org}/{repo}")
rc, out, err = run(["git", "clone", "--mirror", gitea_clone_url, path])
if rc != 0:
raise RuntimeError(f"git clone --mirror failed: {err.strip()}")
else:
# Ensure origin URL is correct and fetch
logger.debug(f"Updating origin URL for {org}/{repo} mirror")
rc, out, err = run(["git", "remote", "set-url", "origin", gitea_clone_url], cwd=path)
if rc != 0:
log(f"Warning: could not set origin URL: {err.strip()}")
# Fetch
logger.warning(f"Could not set origin URL for {org}/{repo}: {err.strip()}")
logger.debug(f"Fetching updates for {org}/{repo}")
rc, out, err = run(["git", "fetch", "--prune", "--tags", "--force"], cwd=path)
if rc != 0:
raise RuntimeError(f"git fetch failed: {err.strip()}")
def push_to_github(self, org: str, repo: str, github_url: str):
path = self.local_path(org, repo)
# Add or update 'github' remote
remotes_rc, remotes_out, _ = run(["git", "remote"], cwd=path)
if remotes_rc != 0:
raise RuntimeError("git remote list failed")
remotes = set(remotes_out.strip().splitlines())
if "github" not in remotes:
logger.debug(f"Adding github remote for {org}/{repo}")
rc, out, err = run(["git", "remote", "add", "github", github_url], cwd=path)
if rc != 0:
raise RuntimeError(f"git remote add github failed: {err.strip()}")
else:
logger.debug(f"Updating github remote URL for {org}/{repo}")
rc, out, err = run(["git", "remote", "set-url", "github", github_url], cwd=path)
if rc != 0:
raise RuntimeError(f"git remote set-url github failed: {err.strip()}")
# Push refspecs
for refspec in ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]:
logger.info(f"Pushing {org}/{repo} refspec {refspec} to GitHub")
rc, out, err = run(["git", "push", "--prune", "github", refspec], cwd=path)
if rc != 0:
raise RuntimeError(f"git push failed for {refspec}: {err.strip()}")
@@ -708,7 +764,6 @@ def render_template(tpl: str, vars: Dict[str, Any]) -> str:
try:
return tpl.format(**vars)
except Exception:
# Fallback: leave placeholders as-is
return tpl
# --------------------------
@@ -740,7 +795,7 @@ class SyncEngine:
return g_desc, g_home, gh_desc, gh_home
def ensure_repos_and_mirror(self, org: str, repo: str) -> RepoContext:
# 1) Gitea: get or create
logger.debug(f"[Stage 1] Checking source repo on Gitea: {org}/{repo}")
gitea_repo = self.gitea.get_repo(org, repo)
gcfg = self.cfg["sync"]["gitea"]
if gitea_repo is None:
@@ -750,7 +805,7 @@ class SyncEngine:
default_branch = gcfg["new_repo_defaults"]["default_branch"]
auto_init = bool(gcfg["new_repo_defaults"]["seed_new_repo"])
g_desc, g_home, _, _ = self._derive_texts(org, repo, f"{self.gitea.base_url}/{org}/{repo}")
log(f"Creating Gitea repo {org}/{repo} (visibility={vis}, auto_init={auto_init})")
logger.info(f"Creating missing Gitea repo {org}/{repo}")
gitea_repo = self.gitea.create_org_repo(
org=org,
name=repo,
@@ -760,43 +815,40 @@ class SyncEngine:
default_branch=default_branch,
auto_init=auto_init
)
logger.debug(f"[Stage 1] Completed for {org}/{repo}")
# Build Gitea clone URLs
gitea_http_clone = gitea_repo.get("clone_url") or f"{self.gitea.base_url}/{org}/{repo}.git"
gitea_ssh_clone = gitea_repo.get("ssh_url") or gitea_http_clone
gitea_html_url = gitea_repo.get("html_url") or f"{self.gitea.base_url}/{org}/{repo}"
# 2) GitHub: ensure repo
logger.debug(f"[Stage 2] Ensuring GitHub repo for {org}/{repo}")
mirror_vis_policy = self.cfg["sync"]["github"]["repo_defaults"]["mirror_visibility"]
visibility = "private" if (gitea_repo.get("private") and mirror_vis_policy == "mirror_source") else "public"
g_desc, g_home, gh_desc, gh_home = self._derive_texts(org, repo, gitea_html_url)
github_repo = self.gh.get_or_create_repo(name=repo, visibility=visibility, description=gh_desc, homepage=gh_home)
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
self.gh.enforce_repo_settings(github_repo, topics=topics)
logger.debug(f"[Stage 2] Completed for {github_repo.full_name}")
# 3) Apply webhook
logger.debug(f"[Stage 3] Ensuring webhook on {github_repo.full_name}")
wh = self.cfg["sync"]["github"]["webhook"]
if wh.get("mode") == "server":
self.gh.ensure_webhook(github_repo, wh["url"], wh["secret"], wh.get("events", ["pull_request"]))
logger.debug(f"[Stage 3] Completed")
# 4) Mirror content
# Prefer HTTPS clone on Gitea with token for private repos; but for fetch we can use anonymous if public
logger.debug(f"[Stage 4] Mirroring content {org}/{repo} -> {github_repo.full_name}")
gitea_clone = gitea_http_clone
if gitea_repo.get("private"):
# embed token for read (not ideal; better to use SSH deploy key)
token = self.cfg["sync"]["gitea"]["token"]
if token:
gitea_clone = gitea_http_clone.replace("://", f"://{token}@")
self.mirror.ensure_local_mirror(org, repo, gitea_clone)
# GitHub push URL with PAT
gh_push = self.gh.make_push_url(repo) #f"https://{self.auth_user}:{self.token}@github.com/{self.owner}/{repo}.git"
gh_push = self.gh.make_push_url(repo)
self.mirror.push_to_github(org, repo, gh_push)
logger.debug(f"[Stage 4] Completed")
# 5) Default branch and protection
logger.debug(f"[Stage 5] Applying branch protection on {github_repo.full_name}")
default_branch = gitea_repo.get("default_branch") or self.cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
bp = self.cfg["sync"]["github"]["branch_protection"]
# Replace placeholder in restrict list
restrict_users = bp["restrict_push"].get("users", [])
restrict_teams = bp["restrict_push"].get("teams", [])
restrict_apps = bp["restrict_push"].get("apps", [])
@@ -812,22 +864,24 @@ class SyncEngine:
restrict_teams=restrict_teams,
restrict_apps=restrict_apps
)
logger.debug(f"[Stage 5] Completed")
# 6) Gitea push mirror (optional)
logger.debug(f"[Stage 6] Configuring Gitea push mirror (if enabled)")
if self.cfg["sync"]["gitea"].get("push_mirror", {}).get("enable", True):
# Use HTTPS with PAT to GitHub. Use PAT user/token
gh_pat = self.gh.token
remote_address = f"https://{gh_pat}:x-oauth-basic@github.com/{self.gh.owner}/{repo}.git"
with contextlib.suppress(Exception):
self.gitea.create_push_mirror(owner=org, repo=repo, remote_address=remote_address, username=None, password=None, sync_on_commit=True)
logger.debug(f"[Stage 6] Completed")
# Map repo IDs for rename detection later
if gitea_repo.get("id"):
self.state.set_repo_map(int(gitea_repo["id"]), f"{org}/{repo}", f"{self.gh.owner}/{repo}")
logger.info(f"Mirror ensure complete: {org}/{repo} -> {github_repo.full_name}")
return RepoContext(org=org, repo=repo, gitea_repo=gitea_repo, github_repo=github_repo)
def enforce_settings_only(self, ctx: RepoContext):
logger.debug(f"Enforcing settings-only for {ctx.github_repo.full_name}")
topics = self.cfg["sync"]["github"]["repo_defaults"].get("topics", [])
self.gh.enforce_repo_settings(ctx.github_repo, topics=topics)
wh = self.cfg["sync"]["github"]["webhook"]
@@ -848,6 +902,7 @@ class SyncEngine:
restrict_teams=bp["restrict_push"].get("teams", []),
restrict_apps=bp["restrict_push"].get("apps", [])
)
logger.info(f"Settings enforced on {ctx.github_repo.full_name}")
# --------------------------
# Webhook server (PR autocloser)
@@ -861,23 +916,27 @@ class PRAutocloserServer:
self.bzc = bz_client
self.app = FastAPI()
self._setup_routes()
logger.info("Webhook server initialized")
def _verify_signature(self, secret: str, body: bytes, signature: str):
expected = hmac_sha256(secret, body)
if not hmac.compare_digest(expected, signature):
logger.warning("Webhook signature verification failed")
raise HTTPException(status_code=401, detail="Invalid signature")
async def _handle_pr_event(self, payload: Dict[str, Any]):
action = payload.get("action")
if action not in ("opened", "reopened", "synchronize", "ready_for_review"):
logger.debug(f"Ignored PR action: {action}")
return JSONResponse({"status": "ignored", "action": action})
repo_full = payload["repository"]["full_name"] # owner/repo on GitHub
repo_full = payload["repository"]["full_name"]
owner = payload["repository"]["owner"]["login"]
repo = payload["repository"]["name"]
logger.info(f"Handling PR event '{action}' for {repo_full}")
pr = payload["pull_request"]
pr_number = pr["number"]
pr_state = pr["state"]
pr_url = pr["html_url"]
pr_title = pr.get("title") or ""
pr_body = pr.get("body") or ""
@@ -920,17 +979,11 @@ class PRAutocloserServer:
"labels": ", ".join(labels)
}
# Fetch repo via PyGithub
repo_obj = self.ghc.gh.get_repo(repo_full)
# PR key
pr_key = f"{repo_full}#{pr_number}"
# Ensure bug mapping or create bug (on opened/reopened/ready_for_review; on synchronize attach)
bug_id = self.state.get_pr_bug(pr_key)
if bug_id is None and action in ("opened", "reopened", "ready_for_review", "synchronize"):
# Attempt to create bug
component = self.cfg["bugzilla"]["component_template"].format(repo=repo)
bug_summary = render_template(self.cfg["bugzilla"]["templates"]["bug_summary"], variables)
bug_body = render_template(self.cfg["bugzilla"]["templates"]["bug_body"], variables)
@@ -938,17 +991,14 @@ class PRAutocloserServer:
try:
bug_id = self.bzc.create_bug(bug_summary, bug_body, component, visibility_groups=self.cfg["bugzilla"].get("groups", []))
self.state.set_pr_bug(pr_key, bug_id)
log(f"Created Bugzilla bug {bug_id} for PR {pr_key}")
logger.info(f"Bugzilla bug {bug_id} created for PR {pr_key}")
except Exception as e:
log(f"Bugzilla create bug failed for PR {pr_key}: {e}")
logger.error(f"Bugzilla create bug failed for PR {pr_key}: {e}")
create_ok = False
if create_ok:
# Attach diff/patch if enabled
if self.cfg["bugzilla"].get("attach_diff", True):
try:
# Pull .patch from GitHub API
# Using GitHubs patch endpoint requires auth if private; use token from gh client
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
r = httpx.get(api_patch_url, headers=headers, timeout=120)
@@ -961,24 +1011,22 @@ class PRAutocloserServer:
summary=f"Patch for PR #{pr_number} ({head_sha})",
data_bytes=patch_bytes
)
logger.debug(f"Attached patch to bug {bug_id} for PR {pr_key}")
except Exception as e:
log(f"Failed to attach patch for PR {pr_key} to bug {bug_id}: {e}")
logger.warning(f"Failed to attach patch for PR {pr_key} to bug {bug_id}: {e}")
# Post success comment and close PR
variables["bug_id"] = str(bug_id)
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_success"], variables)
self.ghc.close_pr_with_comment_and_label(repo_obj, pr_number, comment, label=None)
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
else:
# Post failure comment and keep PR open (policy)
comment = render_template(self.cfg["bugzilla"]["templates"]["pr_comment_failure"], variables)
label = self.cfg["bugzilla"]["failure_policy"].get("label_on_bugzilla_failure") or None
self.ghc.comment_on_pr(repo_obj, pr_number, comment, label=label)
return JSONResponse({"status": "bugzilla_failed", "action": action})
elif bug_id is not None and action == "synchronize":
# Attach updated diff and keep PR closed with optional short comment
try:
api_patch_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
headers = {"Accept": "application/vnd.github.v3.patch", "Authorization": f"token {self.ghc.token}"}
@@ -992,19 +1040,18 @@ class PRAutocloserServer:
summary=f"Updated patch for PR #{pr_number} ({head_sha})",
data_bytes=patch_bytes
)
# Optional brief PR comment
short_tpl = self.cfg["bugzilla"]["templates"].get("pr_sync_short_comment")
if short_tpl:
variables["bug_id"] = str(bug_id)
variables["bug_url"] = f"{bugzilla_base_url}/show_bug.cgi?id={bug_id}"
self.ghc.comment_on_pr(repo_obj, pr_number, render_template(short_tpl, variables), label=None)
# Ensure PR is closed (in case re-opened)
pr_obj = repo_obj.get_pull(pr_number)
if pr_obj.state != "closed":
pr_obj.edit(state="closed")
logger.info(f"Updated patch attached and PR ensured closed (PR #{pr_number})")
return JSONResponse({"status": "ok", "bug_id": bug_id, "action": action})
except Exception as e:
log(f"Failed to attach updated patch for PR {pr_key} to bug {bug_id}: {e}")
logger.error(f"Failed to attach updated patch for PR {pr_key} to bug {bug_id}: {e}")
return JSONResponse({"status": "attach_failed", "bug_id": bug_id, "action": action})
return JSONResponse({"status": "noop", "action": action})
@@ -1019,13 +1066,16 @@ class PRAutocloserServer:
body = await request.body()
secret = self.cfg["sync"]["github"]["webhook"]["secret"]
if not x_hub_signature_256:
logger.warning("Webhook missing signature")
raise HTTPException(status_code=401, detail="Missing signature")
self._verify_signature(secret, body, x_hub_signature_256)
try:
payload = json.loads(body.decode("utf-8"))
except Exception:
logger.warning("Invalid JSON in webhook")
raise HTTPException(status_code=400, detail="Invalid JSON")
if x_github_event != "pull_request":
logger.debug(f"Ignored event {x_github_event}")
return JSONResponse({"status": "ignored", "event": x_github_event})
return await self._handle_pr_event(payload)
@@ -1039,7 +1089,6 @@ def load_config(path: Optional[str]) -> Dict[str, Any]:
with open(path, "r") as f:
user_cfg = yaml.safe_load(f) or {}
cfg = merge_dicts(cfg, user_cfg)
# Ensure derived defaults
return cfg
def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
@@ -1052,15 +1101,15 @@ def merge_dicts(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
return out
def single_shot(cfg: Dict[str, Any], org_repo: str):
logger.info(f"Single-shot sync starting for {org_repo}")
org, repo = org_repo.split("/", 1)
state = StateStore(cfg["sync"]["state_path"])
gitea = GiteaClient(cfg["sync"]["gitea"]["base_url"], cfg["sync"]["gitea"]["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
ctx = engine.ensure_repos_and_mirror(org, repo)
# Print summary
logger.info(f"Single-shot sync completed for {org_repo}")
print(json.dumps({
"gitea_repo": ctx.gitea_repo.get("full_name") or f"{org}/{repo}",
"github_repo": ctx.github_repo.full_name
@@ -1074,11 +1123,11 @@ def continuous_mode(cfg: Dict[str, Any], interval: int):
mirror = GitMirror(cfg["sync"]["cache_path"])
engine = SyncEngine(cfg, state, gitea, gh, mirror)
org = gitea_cfg["org"]
log(f"Starting continuous sync loop for Gitea org {org} every {interval}s")
logger.info(f"Starting continuous sync loop for Gitea org {org} every {interval}s")
stop = threading.Event()
def handle_sig(sig, frame):
log("Shutting down continuous loop...")
logger.info("Shutting down continuous loop...")
stop.set()
signal.signal(signal.SIGINT, handle_sig)
signal.signal(signal.SIGTERM, handle_sig)
@@ -1089,14 +1138,15 @@ def continuous_mode(cfg: Dict[str, Any], interval: int):
for r in repos:
name = r["name"]
if cfg["sync"].get("skip_forks", True) and r.get("fork", False):
logger.debug(f"Skipping fork {org}/{name}")
continue
try:
logger.debug(f"Syncing repo {org}/{name}")
engine.ensure_repos_and_mirror(org, name)
except Exception as e:
log(f"Error syncing {org}/{name}: {e}")
logger.error(f"Error syncing {org}/{name}: {e}")
except Exception as e:
log(f"Scan error: {e}")
# Sleep with stop check
logger.error(f"Scan error: {e}")
for _ in range(interval):
if stop.is_set():
break
@@ -1108,6 +1158,7 @@ def validate_mode(cfg: Dict[str, Any], dry_run: bool):
gitea = GiteaClient(gitea_cfg["base_url"], gitea_cfg["token"])
gh = GitHubClient(cfg["sync"]["github"]["owner"], cfg["sync"]["github"]["auth"]["token"])
org = gitea_cfg["org"]
logger.info(f"Validation scan starting for Gitea org {org} (dry_run={dry_run})")
repos = gitea.list_org_repos(org)
report = []
for r in repos:
@@ -1131,10 +1182,8 @@ def validate_mode(cfg: Dict[str, Any], dry_run: bool):
if ok_presence and repo_obj is not None:
item["issues_disabled"] = (repo_obj.has_issues is False)
item["merge_methods_disabled"] = (not repo_obj.allow_merge_commit and not repo_obj.allow_squash_merge and not repo_obj.allow_rebase_merge)
# Webhook presence
wh_url = cfg["sync"]["github"]["webhook"]["url"]
item["webhook_present"] = any(h.config.get("url") == wh_url for h in repo_obj.get_hooks())
# Branch protection check
default_branch = r.get("default_branch") or cfg["sync"]["gitea"]["new_repo_defaults"]["default_branch"]
try:
b = repo_obj.get_branch(default_branch)
@@ -1143,7 +1192,6 @@ def validate_mode(cfg: Dict[str, Any], dry_run: bool):
except GithubException:
item["branch_protection"] = False
report.append(item)
# Optionally fix drift (not fully exhaustive to keep concise)
if not dry_run:
try:
mirror = GitMirror(cfg["sync"]["cache_path"])
@@ -1151,9 +1199,10 @@ def validate_mode(cfg: Dict[str, Any], dry_run: bool):
ctx = engine.ensure_repos_and_mirror(org, name)
engine.enforce_settings_only(ctx)
except Exception as e:
log(f"Validation fix failed for {org}/{name}: {e}")
logger.error(f"Validation fix failed for {org}/{name}: {e}")
print(json.dumps({"report": report}, indent=2))
logger.info("Validation scan complete")
def run_webhook_server(cfg: Dict[str, Any]):
state = StateStore(cfg["sync"]["state_path"])
@@ -1163,13 +1212,12 @@ def run_webhook_server(cfg: Dict[str, Any]):
url = cfg["sync"]["github"]["webhook"]["url"]
parsed_port = 8080
host = "0.0.0.0"
# Infer port from URL if provided (basic parsing)
with contextlib.suppress(Exception):
from urllib.parse import urlparse
u = urlparse(url)
if u.port:
parsed_port = u.port
log(f"Starting webhook server on port {parsed_port}")
logger.info(f"Starting webhook server on {host}:{parsed_port}")
uvicorn.run(server.app, host=host, port=parsed_port)
def main():
@@ -1179,13 +1227,16 @@ def main():
ap.add_argument("--gitea-repo", help="org/repo for single mode")
ap.add_argument("--interval", type=int, help="Polling interval seconds (continuous)", default=None)
ap.add_argument("--dry-run", action="store_true", help="Validation dry run")
ap.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"), help="DEBUG, INFO, WARNING, ERROR")
args = ap.parse_args()
configure_logging(args.log_level)
cfg = load_config(args.config)
if args.mode == "single":
if not args.gitea_repo:
print("--gitea-repo org/repo is required for single mode", file=sys.stderr)
logger.error("--gitea-repo org/repo is required for single mode")
sys.exit(2)
single_shot(cfg, args.gitea_repo)