From b72a4b62c272391c06df029eb86bfa14ce51b746 Mon Sep 17 00:00:00 2001 From: Brian Read Date: Thu, 25 Sep 2025 12:46:09 +0200 Subject: [PATCH] more test programs --- TestPrograms/bz_smoke_test.py | 161 ++++++++++++++++++++++++++++++++++ TestPrograms/post_pr_event.py | 107 ++++++++++++++++++++++ 2 files changed, 268 insertions(+) create mode 100644 TestPrograms/bz_smoke_test.py create mode 100644 TestPrograms/post_pr_event.py diff --git a/TestPrograms/bz_smoke_test.py b/TestPrograms/bz_smoke_test.py new file mode 100644 index 0000000..10e171a --- /dev/null +++ b/TestPrograms/bz_smoke_test.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +import os +import sys +import json +import base64 +import httpx +import datetime as dt +from datetime import timezone + +BASE_URL = os.environ.get("BUGZILLA_BASE_URL", "https://bugs.koozali.org").rstrip("/") +RPC_URL = f"{BASE_URL}/jsonrpc.cgi" + +# Auth: prefer API key. If not set, use username/password to obtain a session token. +API_KEY = os.environ.get("BUGZILLA_API_KEY", "").strip() +USER = os.environ.get("BUGZILLA_USER", "").strip() # usually your Bugzilla login (often an email) +PASSWORD = os.environ.get("BUGZILLA_PASSWORD", "").strip() + +# Required fields (must be provided via ENV) +PRODUCT = os.environ.get("BUGZILLA_PRODUCT", "").strip() +VERSION_FIELD = os.environ.get("BUGZILLA_VERSION", "").strip() + +# Optional fields +COMPONENT = os.environ.get("BUGZILLA_COMPONENT", "e-smith-*/smeserver-* packages").strip() + +# Behavior toggles +ATTACH_DIFF = os.environ.get("BUGZILLA_ATTACH_DIFF", "true").lower() == "true" +ATTACH_FILE = os.environ.get("BUGZILLA_ATTACH_FILE", "").strip() # optional path to a local patch file +RESOLVE_AFTER = os.environ.get("BUGZILLA_RESOLVE_AFTER", "false").lower() == "true" +RESOLUTION = os.environ.get("BUGZILLA_RESOLUTION", "INVALID") + +TIMEOUT = 60 # seconds + +def require_env(name: str, value: str): + if not value: + print(f"FAILED: required environment variable {name} is not set.") + sys.exit(2) + +def now_rfc3339(): + return dt.datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + +def rpc_call(method: str, params: dict, req_id: int = 1) -> dict: + body = {"method": method, "params": [params], "id": req_id} + r = httpx.post(RPC_URL, headers={"Content-Type": "application/json"}, json=body, timeout=TIMEOUT) + r.raise_for_status() + data = r.json() + if data.get("error"): + err = data["error"] + raise RuntimeError(f"RPC {method} error {err.get('code')}: {err.get('message')}") + return data.get("result", {}) + +def rpc(method: str, params: dict, req_id: int = 1, token: str | None = None) -> dict: + # Inject auth each call + if API_KEY: + params = dict(params, Bugzilla_api_key=API_KEY) + elif token: + params = dict(params, Bugzilla_token=token) + else: + raise RuntimeError("No API key or session token available for Bugzilla") + return rpc_call(method, params, req_id=req_id) + +def login_get_token() -> str: + if not (USER and PASSWORD): + raise RuntimeError("BUGZILLA_USER and BUGZILLA_PASSWORD must be set if no API key is provided") + result = rpc_call("User.login", {"login": USER, "password": PASSWORD}, req_id=0) + token = result.get("token") + if not token: + raise RuntimeError("Login did not return a token") + return token + +def create_bug(token: str | None) -> int: + summary = f"[Mirror smoke test] Confirm Bugzilla writes OK @ {now_rfc3339()}" + description = ( + "Automated smoke test from mirror service.\n" + "Validates JSON-RPC create/comment/attach flows. This issue can be resolved after verification." + ) + params = { + "product": PRODUCT, + "component": COMPONENT, + "version": VERSION_FIELD, # REQUIRED per your request + "summary": summary, + "description": description, + } + res = rpc("Bug.create", params, req_id=1, token=token) + bug_id = res.get("id") or (res.get("bugs") and res["bugs"][0]["id"]) + if not bug_id: + raise RuntimeError(f"Unexpected Bug.create result: {res}") + return int(bug_id) + +def add_comment(bug_id: int, token: str | None, text: str): + rpc("Bug.add_comment", {"id": bug_id, "comment": text}, req_id=2, token=token) + +def add_attachment(bug_id: int, token: str | None, filename: str, summary: str, content: bytes): + b64 = base64.b64encode(content).decode("ascii") + params = { + "ids": [bug_id], + "data": b64, + "file_name": filename, + "summary": summary, + "content_type": "text/x-patch", + "is_patch": True, + } + rpc("Bug.add_attachment", params, req_id=3, token=token) + +def resolve_bug(bug_id: int, token: str | None, resolution: str): + params = {"ids": [bug_id], "status": "RESOLVED", "resolution": resolution} + rpc("Bug.update", params, req_id=4, token=token) + +def main(): + # Require PRODUCT and VERSION from env + require_env("BUGZILLA_PRODUCT", PRODUCT) + require_env("BUGZILLA_VERSION", VERSION_FIELD) + + print(f"Bugzilla JSON-RPC endpoint: {RPC_URL}") + # Optional version check + try: + ver = rpc_call("Bugzilla.version", {}, req_id=99).get("version", "") + print("Bugzilla version:", ver) + except Exception as e: + print("Bugzilla.version call failed (continuing):", e) + + token = None + if not API_KEY: + print("No API key provided; logging in to obtain session token...") + token = login_get_token() + print("Got session token.") + + print(f"Creating bug in product={PRODUCT} component={COMPONENT} version={VERSION_FIELD} ...") + bug_id = create_bug(token) + bug_url = f"{BASE_URL}/show_bug.cgi?id={bug_id}" + print(f"Created bug {bug_id}: {bug_url}") + + print("Adding a comment ...") + add_comment(bug_id, token, f"Automated comment at {now_rfc3339()} (smoke test).") + print("Comment added.") + + if ATTACH_DIFF: + print("Adding attachment (patch) ...") + if ATTACH_FILE: + with open(ATTACH_FILE, "rb") as f: + content = f.read() + filename = os.path.basename(ATTACH_FILE) + else: + content = b"diff --git a/file.txt b/file.txt\n--- a/file.txt\n+++ b/file.txt\n@@\n+hello from JSON-RPC smoke test\n" + filename = "dummy.patch" + add_attachment(bug_id, token, filename, "Automated dummy patch", content) + print("Attachment added.") + + if RESOLVE_AFTER: + print(f"Resolving bug as RESOLVED/{RESOLUTION} ...") + resolve_bug(bug_id, token, RESOLUTION) + print("Bug resolved.") + + print("SUCCESS. Bugzilla JSON-RPC write path verified.") + return 0 + +if __name__ == "__main__": + try: + sys.exit(main()) + except Exception as e: + print("FAILED:", e) + sys.exit(1) \ No newline at end of file diff --git a/TestPrograms/post_pr_event.py b/TestPrograms/post_pr_event.py new file mode 100644 index 0000000..0ae0231 --- /dev/null +++ b/TestPrograms/post_pr_event.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +import argparse +import json +import hmac +import hashlib +import httpx +import sqlite3 +import os +import time + +def sign(secret: str, body: bytes) -> str: + return "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest() + +def seed_state(state_path: str, pr_key: str, bug_id: int = 1): + # Create/seed the PR->Bug mapping so the server skips Bugzilla and allows synchronize to run mirroring. + os.makedirs(os.path.dirname(state_path) or ".", exist_ok=True) + conn = sqlite3.connect(state_path) + cur = conn.cursor() + cur.execute(""" + CREATE TABLE IF NOT EXISTS pr_map ( + pr_key TEXT PRIMARY KEY, + bug_id INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + cur.execute(""" + INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at + """, (pr_key, bug_id, now, now)) + conn.commit() + conn.close() + +def build_payload(owner_repo: str, pr_number: int, title: str, body: str, + base_branch: str, base_sha: str, head_branch: str, head_sha: str, + gh_user: str): + owner, repo = owner_repo.split("/", 1) + return { + "action": "synchronize", + "repository": { + "full_name": owner_repo, + "owner": {"login": owner}, + "name": repo, + }, + "pull_request": { + "number": pr_number, + "html_url": f"https://github.com/{owner_repo}/pull/{pr_number}", + "title": title, + "body": body, + "user": {"login": gh_user, "html_url": f"https://github.com/{gh_user}"}, + "base": {"ref": base_branch, "sha": base_sha}, + "head": {"ref": head_branch, "sha": head_sha, "repo": {"owner": {"login": gh_user}}}, + "created_at": "2025-09-20T12:34:56Z", + "labels": [], + }, + } + +def main(): + ap = argparse.ArgumentParser(description="Post a signed GitHub PR webhook to a running webhook_endpoint server") + ap.add_argument("--server", required=True, help="Base URL of running server, e.g. http://127.0.0.1:8081") + ap.add_argument("--secret", required=True, help="Webhook secret (must match server WEBHOOK_SECRET)") + ap.add_argument("--repo-full", required=True, help="GitHub owner/repo (must match real repo hosting the PR)") + ap.add_argument("--pr", type=int, required=True, help="GitHub PR number to mirror") + ap.add_argument("--title", default="Test PR from harness") + ap.add_argument("--body", default="Harness test body.") + ap.add_argument("--base-branch", default="master") + ap.add_argument("--base-sha", default="9f8e7d6cafebabe0000deadbeef0000000000000") + ap.add_argument("--head-branch", default="feature/test") + ap.add_argument("--head-sha", default="a1b2c3ddeeddbb0000deadbeef0000000000000") + ap.add_argument("--gh-user", default="octocat", help="GitHub login of PR author") + ap.add_argument("--state-path", required=True, help="Path to the server's STATE_PATH SQLite file") + ap.add_argument("--seed", action="store_true", help="Seed the state DB so Bugzilla is skipped") + args = ap.parse_args() + + pr_key = f"{args.repo_full}#{args.pr}" + if args.seed: + print(f"Seeding PR mapping in {args.state_path}: {pr_key} -> bug_id=1") + seed_state(args.state_path, pr_key, bug_id=1) + + payload = build_payload( + owner_repo=args.repo_full, + pr_number=args.pr, + title=args.title, + body=args.body, + base_branch=args.base_branch, + base_sha=args.base_sha, + head_branch=args.head_branch, + head_sha=args.head_sha, + gh_user=args.gh_user, + ) + body_bytes = json.dumps(payload).encode("utf-8") + sig = sign(args.secret, body_bytes) + headers = { + "X-GitHub-Event": "pull_request", + "X-Hub-Signature-256": sig, + "Content-Type": "application/json", + } + + url = args.server.rstrip("/") + "/webhook/github" + print(f"POST {url} for {args.repo_full} PR #{args.pr}") + r = httpx.post(url, data=body_bytes, headers=headers, timeout=30) + print("Response:", r.status_code, r.text) + +if __name__ == "__main__": + main() \ No newline at end of file