161 lines
6.0 KiB
Python
161 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import json
|
|
import base64
|
|
import httpx
|
|
import datetime as dt
|
|
from datetime import timezone
|
|
|
|
BASE_URL = os.environ.get("BUGZILLA_BASE_URL", "https://bugs.koozali.org").rstrip("/")
|
|
RPC_URL = f"{BASE_URL}/jsonrpc.cgi"
|
|
|
|
# Auth: prefer API key. If not set, use username/password to obtain a session token.
|
|
API_KEY = os.environ.get("BUGZILLA_API_KEY", "").strip()
|
|
USER = os.environ.get("BUGZILLA_USER", "").strip() # usually your Bugzilla login (often an email)
|
|
PASSWORD = os.environ.get("BUGZILLA_PASSWORD", "").strip()
|
|
|
|
# Required fields (must be provided via ENV)
|
|
PRODUCT = os.environ.get("BUGZILLA_PRODUCT", "").strip()
|
|
VERSION_FIELD = os.environ.get("BUGZILLA_VERSION", "").strip()
|
|
|
|
# Optional fields
|
|
COMPONENT = os.environ.get("BUGZILLA_COMPONENT", "e-smith-*/smeserver-* packages").strip()
|
|
|
|
# Behavior toggles
|
|
ATTACH_DIFF = os.environ.get("BUGZILLA_ATTACH_DIFF", "true").lower() == "true"
|
|
ATTACH_FILE = os.environ.get("BUGZILLA_ATTACH_FILE", "").strip() # optional path to a local patch file
|
|
RESOLVE_AFTER = os.environ.get("BUGZILLA_RESOLVE_AFTER", "false").lower() == "true"
|
|
RESOLUTION = os.environ.get("BUGZILLA_RESOLUTION", "INVALID")
|
|
|
|
TIMEOUT = 60 # seconds
|
|
|
|
def require_env(name: str, value: str):
|
|
if not value:
|
|
print(f"FAILED: required environment variable {name} is not set.")
|
|
sys.exit(2)
|
|
|
|
def now_rfc3339():
|
|
return dt.datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
def rpc_call(method: str, params: dict, req_id: int = 1) -> dict:
|
|
body = {"method": method, "params": [params], "id": req_id}
|
|
r = httpx.post(RPC_URL, headers={"Content-Type": "application/json"}, json=body, timeout=TIMEOUT)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
if data.get("error"):
|
|
err = data["error"]
|
|
raise RuntimeError(f"RPC {method} error {err.get('code')}: {err.get('message')}")
|
|
return data.get("result", {})
|
|
|
|
def rpc(method: str, params: dict, req_id: int = 1, token: str | None = None) -> dict:
|
|
# Inject auth each call
|
|
if API_KEY:
|
|
params = dict(params, Bugzilla_api_key=API_KEY)
|
|
elif token:
|
|
params = dict(params, Bugzilla_token=token)
|
|
else:
|
|
raise RuntimeError("No API key or session token available for Bugzilla")
|
|
return rpc_call(method, params, req_id=req_id)
|
|
|
|
def login_get_token() -> str:
|
|
if not (USER and PASSWORD):
|
|
raise RuntimeError("BUGZILLA_USER and BUGZILLA_PASSWORD must be set if no API key is provided")
|
|
result = rpc_call("User.login", {"login": USER, "password": PASSWORD}, req_id=0)
|
|
token = result.get("token")
|
|
if not token:
|
|
raise RuntimeError("Login did not return a token")
|
|
return token
|
|
|
|
def create_bug(token: str | None) -> int:
|
|
summary = f"[Mirror smoke test] Confirm Bugzilla writes OK @ {now_rfc3339()}"
|
|
description = (
|
|
"Automated smoke test from mirror service.\n"
|
|
"Validates JSON-RPC create/comment/attach flows. This issue can be resolved after verification."
|
|
)
|
|
params = {
|
|
"product": PRODUCT,
|
|
"component": COMPONENT,
|
|
"version": VERSION_FIELD, # REQUIRED per your request
|
|
"summary": summary,
|
|
"description": description,
|
|
}
|
|
res = rpc("Bug.create", params, req_id=1, token=token)
|
|
bug_id = res.get("id") or (res.get("bugs") and res["bugs"][0]["id"])
|
|
if not bug_id:
|
|
raise RuntimeError(f"Unexpected Bug.create result: {res}")
|
|
return int(bug_id)
|
|
|
|
def add_comment(bug_id: int, token: str | None, text: str):
|
|
rpc("Bug.add_comment", {"id": bug_id, "comment": text}, req_id=2, token=token)
|
|
|
|
def add_attachment(bug_id: int, token: str | None, filename: str, summary: str, content: bytes):
|
|
b64 = base64.b64encode(content).decode("ascii")
|
|
params = {
|
|
"ids": [bug_id],
|
|
"data": b64,
|
|
"file_name": filename,
|
|
"summary": summary,
|
|
"content_type": "text/x-patch",
|
|
"is_patch": True,
|
|
}
|
|
rpc("Bug.add_attachment", params, req_id=3, token=token)
|
|
|
|
def resolve_bug(bug_id: int, token: str | None, resolution: str):
|
|
params = {"ids": [bug_id], "status": "RESOLVED", "resolution": resolution}
|
|
rpc("Bug.update", params, req_id=4, token=token)
|
|
|
|
def main():
|
|
# Require PRODUCT and VERSION from env
|
|
require_env("BUGZILLA_PRODUCT", PRODUCT)
|
|
require_env("BUGZILLA_VERSION", VERSION_FIELD)
|
|
|
|
print(f"Bugzilla JSON-RPC endpoint: {RPC_URL}")
|
|
# Optional version check
|
|
try:
|
|
ver = rpc_call("Bugzilla.version", {}, req_id=99).get("version", "<unknown>")
|
|
print("Bugzilla version:", ver)
|
|
except Exception as e:
|
|
print("Bugzilla.version call failed (continuing):", e)
|
|
|
|
token = None
|
|
if not API_KEY:
|
|
print("No API key provided; logging in to obtain session token...")
|
|
token = login_get_token()
|
|
print("Got session token.")
|
|
|
|
print(f"Creating bug in product={PRODUCT} component={COMPONENT} version={VERSION_FIELD} ...")
|
|
bug_id = create_bug(token)
|
|
bug_url = f"{BASE_URL}/show_bug.cgi?id={bug_id}"
|
|
print(f"Created bug {bug_id}: {bug_url}")
|
|
|
|
print("Adding a comment ...")
|
|
add_comment(bug_id, token, f"Automated comment at {now_rfc3339()} (smoke test).")
|
|
print("Comment added.")
|
|
|
|
if ATTACH_DIFF:
|
|
print("Adding attachment (patch) ...")
|
|
if ATTACH_FILE:
|
|
with open(ATTACH_FILE, "rb") as f:
|
|
content = f.read()
|
|
filename = os.path.basename(ATTACH_FILE)
|
|
else:
|
|
content = b"diff --git a/file.txt b/file.txt\n--- a/file.txt\n+++ b/file.txt\n@@\n+hello from JSON-RPC smoke test\n"
|
|
filename = "dummy.patch"
|
|
add_attachment(bug_id, token, filename, "Automated dummy patch", content)
|
|
print("Attachment added.")
|
|
|
|
if RESOLVE_AFTER:
|
|
print(f"Resolving bug as RESOLVED/{RESOLUTION} ...")
|
|
resolve_bug(bug_id, token, RESOLUTION)
|
|
print("Bug resolved.")
|
|
|
|
print("SUCCESS. Bugzilla JSON-RPC write path verified.")
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
sys.exit(main())
|
|
except Exception as e:
|
|
print("FAILED:", e)
|
|
sys.exit(1) |