Add webhook endpoint test programs
This commit is contained in:
101
TestPrograms/test-pr-creation.py
Normal file
101
TestPrograms/test-pr-creation.py
Normal file
@@ -0,0 +1,101 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import hmac
|
||||
import hashlib
|
||||
from fastapi.testclient import TestClient
|
||||
import types
|
||||
|
||||
# Import your server module
|
||||
import webhook_endpoint as srv
|
||||
|
||||
# Ensure safe test config toggles
|
||||
srv.GITHUB_DRY_RUN = True
|
||||
srv.BUGZILLA_ATTACH_DIFF = False
|
||||
# Enable mirroring flag and replace pr_mirror with a fake
|
||||
srv.GITEA_MIRROR_PR_ENABLE = True
|
||||
|
||||
# Fake PR mirror
|
||||
class FakePRMirror:
|
||||
def __init__(self):
|
||||
self.calls = []
|
||||
|
||||
def mirror(self, gh_owner: str, gh_repo: str, pr_number: int, pr_title: str, pr_body: str):
|
||||
self.calls.append({
|
||||
"owner": gh_owner,
|
||||
"repo": gh_repo,
|
||||
"pr_number": pr_number,
|
||||
"title": pr_title,
|
||||
"body": pr_body,
|
||||
})
|
||||
# Pretend we created PR #101 on Gitea
|
||||
return 101
|
||||
|
||||
fake_mirror = FakePRMirror()
|
||||
srv.pr_mirror = fake_mirror # monkeypatch the orchestrator in the module
|
||||
|
||||
# Also monkeypatch Bugzilla client to avoid real writes
|
||||
class FakeBZ:
|
||||
def create_bug(self, summary, description, component):
|
||||
return 99999
|
||||
def update_bug_fields(self, bug_id, fields):
|
||||
pass
|
||||
def add_attachment(self, *a, **k):
|
||||
pass
|
||||
def add_cc(self, *a, **k):
|
||||
pass
|
||||
def add_or_update_author_email_note(self, *a, **k):
|
||||
pass
|
||||
|
||||
srv.bz = FakeBZ()
|
||||
|
||||
# Build app client
|
||||
client = TestClient(srv.app)
|
||||
|
||||
def sign(secret: str, body: bytes) -> str:
|
||||
return "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
|
||||
|
||||
def payload(action="opened"):
|
||||
return {
|
||||
"action": action,
|
||||
"repository": {
|
||||
"full_name": "Koozali-SME-Server/smeserver-manager",
|
||||
"owner": {"login": "Koozali-SME-Server"},
|
||||
"name": "smeserver-manager",
|
||||
},
|
||||
"pull_request": {
|
||||
"number": 42,
|
||||
"html_url": "https://github.com/Koozali-SME-Server/smeserver-manager/pull/42",
|
||||
"title": "Improve logging",
|
||||
"body": "Please review.",
|
||||
"user": {"login": "octocat", "html_url": "https://github.com/octocat"},
|
||||
"base": {"ref": "master", "sha": "9f8e7d6cafebabe0000deadbeef0000000000000"},
|
||||
"head": {"ref": "feature/logs", "sha": "a1b2c3ddeeddbb0000deadbeef0000000000000", "repo": {"owner": {"login": "octocat"}}},
|
||||
"created_at": "2025-09-17T12:34:56Z",
|
||||
"labels": [{"name": "enhancement"}],
|
||||
},
|
||||
}
|
||||
|
||||
def run_test():
|
||||
body = json.dumps(payload("opened")).encode("utf-8")
|
||||
# Use the server's configured secret (or set one here)
|
||||
secret = srv.WEBHOOK_SECRET or "devsecret"
|
||||
headers = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(secret, body),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
# If you run this without setting WEBHOOK_SECRET in env, also set it here:
|
||||
if not srv.WEBHOOK_SECRET:
|
||||
# monkeypatch the verify function to use our secret
|
||||
def verify_signature(secret_arg, *a, **k): return srv.verify_signature(secret, *a, **k)
|
||||
srv.verify_signature = verify_signature
|
||||
|
||||
resp = client.post("/webhook/github", data=body, headers=headers)
|
||||
print("Response:", resp.status_code, resp.json())
|
||||
print("Mirror calls recorded:", fake_mirror.calls)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json().get("status") in ("ok", "bugzilla_failed") # ok path returns "ok"
|
||||
assert any(c["pr_number"] == 42 for c in fake_mirror.calls), "Mirror was not invoked"
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_test()
|
116
TestPrograms/test-pr-event-creation.py
Normal file
116
TestPrograms/test-pr-event-creation.py
Normal file
@@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env python3
|
||||
# Simple webhook test sender for webhook_endpoint.py
|
||||
# Edit the CONFIG section and Run in Thonny. No CLI args needed.
|
||||
|
||||
import json
|
||||
import hmac
|
||||
import hashlib
|
||||
import httpx
|
||||
import sqlite3
|
||||
import os
|
||||
import time
|
||||
|
||||
# -------------- CONFIG (edit these) --------------
|
||||
|
||||
# Your running server (uvicorn webhook_endpoint:app ...)
|
||||
SERVER_BASE = "http://127.0.0.1:8081"
|
||||
|
||||
# Must match WEBHOOK_SECRET used by the server
|
||||
WEBHOOK_SECRET = "123456gfdsaqwertyuio"
|
||||
|
||||
# Path to the server's STATE_PATH (so we can seed the PR mapping to skip Bugzilla)
|
||||
STATE_PATH = "/tmp/webhook_state.sqlite"
|
||||
|
||||
# The real GitHub repo and PR you want to mirror (PR must exist on GitHub)
|
||||
REPO_FULL = "Koozali-SME-Server/smeserver-manager" # owner/repo
|
||||
PR_NUMBER = 9 # existing PR number on GitHub
|
||||
|
||||
# Cosmetic fields (not used for mirroring logic; placeholders are fine)
|
||||
PR_TITLE = "Test PR from harness"
|
||||
PR_BODY = "Harness test body."
|
||||
BASE_BRANCH = "master"
|
||||
BASE_SHA = "9f8e7d6cafebabe0000deadbeef0000000000000"
|
||||
HEAD_BRANCH = "feature/test"
|
||||
HEAD_SHA = "a1b2c3ddeeddbb0000deadbeef0000000000000"
|
||||
PR_AUTHOR = "octocat" # GitHub login of PR author (cosmetic)
|
||||
|
||||
# Seed the state DB so the server skips Bugzilla and treats event as "synchronize"
|
||||
SEED_STATE = True
|
||||
|
||||
# -------------------------------------------------
|
||||
|
||||
def sign(secret: str, body: bytes) -> str:
|
||||
return "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
|
||||
|
||||
def seed_state_db(state_path: str, pr_key: str, bug_id: int = 1):
|
||||
os.makedirs(os.path.dirname(state_path) or ".", exist_ok=True)
|
||||
conn = sqlite3.connect(state_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS pr_map (
|
||||
pr_key TEXT PRIMARY KEY,
|
||||
bug_id INTEGER NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
cur.execute("""
|
||||
INSERT INTO pr_map (pr_key, bug_id, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(pr_key) DO UPDATE SET bug_id=excluded.bug_id, updated_at=excluded.updated_at
|
||||
""", (pr_key, bug_id, now, now))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def build_payload():
|
||||
owner, repo = REPO_FULL.split("/", 1)
|
||||
return {
|
||||
"action": "synchronize", # use synchronize to trigger mirroring without Bugzilla create
|
||||
"repository": {
|
||||
"full_name": REPO_FULL,
|
||||
"owner": {"login": owner},
|
||||
"name": repo,
|
||||
},
|
||||
"pull_request": {
|
||||
"number": PR_NUMBER,
|
||||
"html_url": f"https://github.com/{REPO_FULL}/pull/{PR_NUMBER}",
|
||||
"title": PR_TITLE,
|
||||
"body": PR_BODY,
|
||||
"user": {"login": PR_AUTHOR, "html_url": f"https://github.com/{PR_AUTHOR}"},
|
||||
"base": {"ref": BASE_BRANCH, "sha": BASE_SHA},
|
||||
"head": {"ref": HEAD_BRANCH, "sha": HEAD_SHA, "repo": {"owner": {"login": PR_AUTHOR}}},
|
||||
"created_at": "2025-09-20T12:34:56Z",
|
||||
"labels": [],
|
||||
},
|
||||
}
|
||||
|
||||
def main():
|
||||
pr_key = f"{REPO_FULL}#{PR_NUMBER}"
|
||||
|
||||
if SEED_STATE:
|
||||
print(f"Seeding state: {STATE_PATH} -> {pr_key} = bug_id 1 (skips Bugzilla)")
|
||||
seed_state_db(STATE_PATH, pr_key, bug_id=1)
|
||||
|
||||
# Optional health check (won't fail the run)
|
||||
try:
|
||||
r = httpx.get(f"{SERVER_BASE.rstrip('/')}/healthz", timeout=3)
|
||||
print("Healthz:", r.status_code, r.text)
|
||||
except Exception as e:
|
||||
print("Healthz check failed (continuing):", e)
|
||||
|
||||
payload = build_payload()
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
headers = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(WEBHOOK_SECRET, body),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
url = f"{SERVER_BASE.rstrip('/')}/webhook/github"
|
||||
print(f"POST {url} for {REPO_FULL} PR #{PR_NUMBER}")
|
||||
resp = httpx.post(url, data=body, headers=headers, timeout=30)
|
||||
print("Response:", resp.status_code, resp.text)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
262
TestPrograms/test_harness.py
Normal file
262
TestPrograms/test_harness.py
Normal file
@@ -0,0 +1,262 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import hmac
|
||||
import hashlib
|
||||
from typing import Any, Dict, Optional
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
|
||||
# FastAPI TestClient for local calls without a real web server
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
# Import your classes from the main code (ensure this file is next to gitea_to_github_sync.py)
|
||||
from gitea_to_github_sync import PRAutocloserServer
|
||||
|
||||
# ---------------------------
|
||||
# In-memory fakes and helpers
|
||||
# ---------------------------
|
||||
|
||||
class InMemoryState:
|
||||
def __init__(self):
|
||||
self.map: Dict[str, int] = {}
|
||||
|
||||
def set_pr_bug(self, pr_key: str, bug_id: int):
|
||||
self.map[pr_key] = bug_id
|
||||
|
||||
def get_pr_bug(self, pr_key: str) -> Optional[int]:
|
||||
return self.map.get(pr_key)
|
||||
|
||||
@dataclass
|
||||
class RecordedComment:
|
||||
pr_number: int
|
||||
text: str
|
||||
|
||||
@dataclass
|
||||
class RecordedLabel:
|
||||
pr_number: int
|
||||
label: str
|
||||
|
||||
class FakeIssue:
|
||||
def __init__(self, pr_number: int, tracker: "FakeRepo"):
|
||||
self.pr_number = pr_number
|
||||
self.tracker = tracker
|
||||
|
||||
def add_to_labels(self, label: str):
|
||||
self.tracker.labels.append(RecordedLabel(self.pr_number, label))
|
||||
|
||||
class FakePR:
|
||||
def __init__(self, number: int, tracker: "FakeRepo"):
|
||||
self.number = number
|
||||
self.state = "open"
|
||||
self.tracker = tracker
|
||||
|
||||
def create_issue_comment(self, text: str):
|
||||
self.tracker.comments.append(RecordedComment(self.number, text))
|
||||
|
||||
def edit(self, state: str):
|
||||
if state in ("open", "closed"):
|
||||
self.state = state
|
||||
|
||||
def as_issue(self):
|
||||
return FakeIssue(self.number, self.tracker)
|
||||
|
||||
class FakeRepo:
|
||||
def __init__(self, full_name: str):
|
||||
self.full_name = full_name
|
||||
self._prs: Dict[int, FakePR] = {}
|
||||
self.comments: list[RecordedComment] = []
|
||||
self.labels: list[RecordedLabel] = []
|
||||
self.created_labels: set[str] = set()
|
||||
|
||||
def get_pull(self, number: int) -> FakePR:
|
||||
if number not in self._prs:
|
||||
self._prs[number] = FakePR(number, self)
|
||||
return self._prs[number]
|
||||
|
||||
def get_issue(self, number: int) -> FakeIssue:
|
||||
return FakeIssue(number, self)
|
||||
|
||||
def create_label(self, name: str, color: str):
|
||||
self.created_labels.add(name)
|
||||
|
||||
class FakeGhApi:
|
||||
def __init__(self, repo_full_name: str):
|
||||
self.repo_full_name = repo_full_name
|
||||
self.repo = FakeRepo(repo_full_name)
|
||||
|
||||
def get_repo(self, full_name: str) -> FakeRepo:
|
||||
assert full_name == self.repo_full_name, f"Unexpected repo requested: {full_name}"
|
||||
return self.repo
|
||||
|
||||
class FakeGitHubClient:
|
||||
def __init__(self, repo_full_name: str, token: str = "gh_test_token"):
|
||||
self.token = token
|
||||
self.gh = FakeGhApi(repo_full_name)
|
||||
|
||||
# Match PRAutocloserServer usage
|
||||
def close_pr_with_comment_and_label(self, repo, pr_number: int, comment: str, label: Optional[str] = None):
|
||||
pr = repo.get_pull(pr_number)
|
||||
pr.create_issue_comment(comment)
|
||||
if label:
|
||||
repo.create_label(label, "ededed")
|
||||
repo.get_issue(pr_number).add_to_labels(label)
|
||||
if pr.state != "closed":
|
||||
pr.edit(state="closed")
|
||||
|
||||
def comment_on_pr(self, repo, pr_number: int, comment: str, label: Optional[str] = None):
|
||||
pr = repo.get_pull(pr_number)
|
||||
pr.create_issue_comment(comment)
|
||||
if label:
|
||||
repo.create_label(label, "ededed")
|
||||
repo.get_issue(pr_number).add_to_labels(label)
|
||||
|
||||
class FakeBugzillaClient:
|
||||
def __init__(self, fail_create: bool = False):
|
||||
self.fail_create = fail_create
|
||||
self.created_bugs: list[int] = []
|
||||
self.attachments: list[tuple[int, str]] = [] # (bug_id, filename)
|
||||
self.comments: list[tuple[int, str]] = [] # (bug_id, text)
|
||||
self._next_id = 1000
|
||||
|
||||
def create_bug(self, summary: str, description: str, component: str, visibility_groups=None) -> int:
|
||||
if self.fail_create:
|
||||
raise RuntimeError("Simulated Bugzilla create failure")
|
||||
bug_id = self._next_id
|
||||
self._next_id += 1
|
||||
self.created_bugs.append(bug_id)
|
||||
return bug_id
|
||||
|
||||
def add_attachment(self, bug_id: int, file_name: str, content_type: str, summary: str, data_bytes: bytes):
|
||||
# No-op, just record
|
||||
self.attachments.append((bug_id, file_name))
|
||||
|
||||
def add_comment(self, bug_id: int, comment: str):
|
||||
self.comments.append((bug_id, comment))
|
||||
|
||||
# HMAC signature helper
|
||||
def sign(secret: str, body: bytes) -> str:
|
||||
return "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
|
||||
|
||||
# Sample GitHub PR payload (minimized to fields used by the endpoint)
|
||||
def sample_pr_payload(action="opened") -> Dict[str, Any]:
|
||||
return {
|
||||
"action": action,
|
||||
"repository": {
|
||||
"full_name": "Koozali-SME-Server/smeserver-manager",
|
||||
"owner": {"login": "Koozali-SME-Server"},
|
||||
"name": "smeserver-manager",
|
||||
},
|
||||
"pull_request": {
|
||||
"number": 42,
|
||||
"html_url": "https://github.com/Koozali-SME-Server/smeserver-manager/pull/42",
|
||||
"title": "Test PR: improve logging",
|
||||
"body": "Please review these changes.",
|
||||
"user": {"login": "octocat", "html_url": "https://github.com/octocat"},
|
||||
"base": {"ref": "master", "sha": "9f8e7d6cafebabe0000deadbeef0000000000000"},
|
||||
"head": {"ref": "feature/logs", "sha": "a1b2c3ddeeddbb0000deadbeef0000000000000", "repo": {"owner": {"login": "octocat"}}},
|
||||
"created_at": "2025-09-16T12:34:56Z",
|
||||
"labels": [{"name": "enhancement"}],
|
||||
},
|
||||
}
|
||||
|
||||
# ---------------------------
|
||||
# Test runner
|
||||
# ---------------------------
|
||||
|
||||
def run_tests():
|
||||
# Config for server (only the parts the endpoint uses)
|
||||
cfg = {
|
||||
"sync": {
|
||||
"gitea": {"base_url": "https://src.koozali.org", "org": "smeserver"},
|
||||
"github": {"webhook": {"secret": "test_secret"}},
|
||||
},
|
||||
"bugzilla": {
|
||||
"base_url": "https://bugs.koozali.org",
|
||||
"templates": {
|
||||
"pr_comment_success": "Bug {bug_id}: {bugzilla_base_url}/show_bug.cgi?id={bug_id}\nCanonical: {gitea_repo_url}\n",
|
||||
"pr_comment_failure": "Unable to create bug. Please file at {bugzilla_base_url} (Component: {repo})\nPR: {pr_url}\n",
|
||||
"bug_summary": "[GH PR #{pr_number}] {org}/{repo}: {pr_title}",
|
||||
"bug_body": "PR: {pr_url}\nNotes:\n{pr_body}\n",
|
||||
"bug_update_comment": "Update for PR #{pr_number}",
|
||||
"pr_sync_short_comment": "Updated bug {bug_id}: {bug_url}",
|
||||
},
|
||||
"attach_diff": False, # disable network to GitHub patch API during tests
|
||||
"failure_policy": {"close_pr_on_bugzilla_failure": False, "label_on_bugzilla_failure": "bugzilla-needed"},
|
||||
"component_template": "{repo}",
|
||||
},
|
||||
}
|
||||
|
||||
repo_full = "Koozali-SME-Server/smeserver-manager"
|
||||
|
||||
# Test 1: Success path (bug created, PR closed)
|
||||
print("\n=== Test 1: Success path ===")
|
||||
ghc = FakeGitHubClient(repo_full)
|
||||
bzc = FakeBugzillaClient(fail_create=False)
|
||||
state = InMemoryState()
|
||||
server = PRAutocloserServer(cfg, state, ghc, bzc)
|
||||
client = TestClient(server.app)
|
||||
|
||||
body = json.dumps(sample_pr_payload("opened")).encode("utf-8")
|
||||
headers = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(cfg["sync"]["github"]["webhook"]["secret"], body),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
resp = client.post("/webhook/github", data=body, headers=headers)
|
||||
print("Response:", resp.status_code, resp.json())
|
||||
# Inspect effects
|
||||
repo = ghc.gh.repo
|
||||
print("Created bugs:", bzc.created_bugs)
|
||||
print("PR comments:", [c.text.strip() for c in repo.comments])
|
||||
print("PR state:", repo.get_pull(42).state)
|
||||
assert repo.get_pull(42).state == "closed", "PR should be closed on success"
|
||||
|
||||
# Test 2: Failure path (bugzilla create fails, PR left open, guidance comment posted)
|
||||
print("\n=== Test 2: Failure path ===")
|
||||
ghc2 = FakeGitHubClient(repo_full)
|
||||
bzc2 = FakeBugzillaClient(fail_create=True)
|
||||
state2 = InMemoryState()
|
||||
server2 = PRAutocloserServer(cfg, state2, ghc2, bzc2)
|
||||
client2 = TestClient(server2.app)
|
||||
|
||||
body2 = json.dumps(sample_pr_payload("opened")).encode("utf-8")
|
||||
headers2 = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(cfg["sync"]["github"]["webhook"]["secret"], body2),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
resp2 = client2.post("/webhook/github", data=body2, headers=headers2)
|
||||
print("Response:", resp2.status_code, resp2.json())
|
||||
repo2 = ghc2.gh.repo
|
||||
print("Created bugs:", bzc2.created_bugs)
|
||||
print("PR comments:", [c.text.strip() for c in repo2.comments])
|
||||
print("PR labels:", [l.label for l in repo2.labels])
|
||||
print("PR state:", repo2.get_pull(42).state)
|
||||
assert repo2.get_pull(42).state == "open", "PR should remain open on bugzilla failure"
|
||||
|
||||
# Test 3: Synchronize event with existing bug (attachments disabled here)
|
||||
print("\n=== Test 3: Synchronize with existing bug ===")
|
||||
ghc3 = FakeGitHubClient(repo_full)
|
||||
bzc3 = FakeBugzillaClient(fail_create=False)
|
||||
state3 = InMemoryState()
|
||||
server3 = PRAutocloserServer(cfg, state3, ghc3, bzc3)
|
||||
client3 = TestClient(server3.app)
|
||||
|
||||
# Seed a bug mapping as if created earlier
|
||||
state3.set_pr_bug(f"{repo_full}#42", 2001)
|
||||
|
||||
body3 = json.dumps(sample_pr_payload("synchronize")).encode("utf-8")
|
||||
headers3 = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(cfg["sync"]["github"]["webhook"]["secret"], body3),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
resp3 = client3.post("/webhook/github", data=body3, headers=headers3)
|
||||
print("Response:", resp3.status_code, resp3.json())
|
||||
repo3 = ghc3.gh.repo
|
||||
print("PR comments:", [c.text.strip() for c in repo3.comments])
|
||||
print("Bug attachments recorded:", bzc3.attachments)
|
||||
print("PR state:", repo3.get_pull(42).state)
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_tests()
|
158
TestPrograms/test_webhook_endpoint_standalone.py
Normal file
158
TestPrograms/test_webhook_endpoint_standalone.py
Normal file
@@ -0,0 +1,158 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import hmac
|
||||
import hashlib
|
||||
from typing import Optional, Dict, Any
|
||||
from dataclasses import dataclass
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
import webhook_endpoint as ws # your standalone endpoint module
|
||||
|
||||
# --------- Fakes ---------
|
||||
|
||||
@dataclass
|
||||
class RecordedComment:
|
||||
pr_number: int
|
||||
text: str
|
||||
|
||||
@dataclass
|
||||
class RecordedLabel:
|
||||
pr_number: int
|
||||
label: str
|
||||
|
||||
class FakeIssue:
|
||||
def __init__(self, pr_number: int, repo: "FakeRepo"):
|
||||
self.pr_number = pr_number
|
||||
self.repo = repo
|
||||
def add_to_labels(self, label: str):
|
||||
self.repo.labels.append(RecordedLabel(self.pr_number, label))
|
||||
|
||||
class FakePR:
|
||||
def __init__(self, number: int, repo: "FakeRepo"):
|
||||
self.number = number
|
||||
self.repo = repo
|
||||
self.state = "open"
|
||||
def create_issue_comment(self, text: str):
|
||||
self.repo.comments.append(RecordedComment(self.number, text))
|
||||
def edit(self, state: str):
|
||||
if state in ("open", "closed"):
|
||||
self.state = state
|
||||
|
||||
class FakeRepo:
|
||||
def __init__(self, full_name: str):
|
||||
self.full_name = full_name
|
||||
self._prs: Dict[int, FakePR] = {}
|
||||
self.comments: list[RecordedComment] = []
|
||||
self.labels: list[RecordedLabel] = []
|
||||
self.created_labels: set[str] = set()
|
||||
def get_pull(self, number: int) -> FakePR:
|
||||
self._prs.setdefault(number, FakePR(number, self))
|
||||
return self._prs[number]
|
||||
def get_issue(self, number: int) -> FakeIssue:
|
||||
return FakeIssue(number, self)
|
||||
def create_label(self, name: str, color: str):
|
||||
self.created_labels.add(name)
|
||||
|
||||
class FakeGhApi:
|
||||
def __init__(self, repo_full_name: str):
|
||||
self.repo_full_name = repo_full_name
|
||||
self.repo = FakeRepo(repo_full_name)
|
||||
def get_repo(self, full_name: str) -> FakeRepo:
|
||||
assert full_name == self.repo_full_name
|
||||
return self.repo
|
||||
|
||||
class FakeGhSingleton:
|
||||
def __init__(self, repo_full_name: str):
|
||||
self._api = FakeGhApi(repo_full_name)
|
||||
def get_repo(self, full_name: str):
|
||||
return self._api.get_repo(full_name)
|
||||
|
||||
class FakeBugzilla:
|
||||
def __init__(self, fail_create: bool = False):
|
||||
self.fail_create = fail_create
|
||||
self.created_bugs: list[int] = []
|
||||
self.attachments: list[tuple[int, str]] = []
|
||||
self._next = 1000
|
||||
def create_bug(self, summary: str, description: str, component: str) -> int:
|
||||
if self.fail_create:
|
||||
raise RuntimeError("Simulated Bugzilla create failure")
|
||||
bug_id = self._next; self._next += 1
|
||||
self.created_bugs.append(bug_id)
|
||||
return bug_id
|
||||
def add_attachment(self, bug_id: int, filename: str, summary: str, data_bytes: bytes):
|
||||
self.attachments.append((bug_id, filename))
|
||||
def add_comment(self, bug_id: int, comment: str):
|
||||
pass
|
||||
|
||||
class InMemoryState:
|
||||
def __init__(self):
|
||||
self.map: Dict[str, int] = {}
|
||||
def get_bug(self, pr_key: str) -> Optional[int]:
|
||||
return self.map.get(pr_key)
|
||||
def set_bug(self, pr_key: str, bug_id: int):
|
||||
self.map[pr_key] = bug_id
|
||||
|
||||
# --------- Helpers ---------
|
||||
|
||||
def sign(secret: str, body: bytes) -> str:
|
||||
return "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
|
||||
|
||||
def sample_payload(action="opened") -> Dict[str, Any]:
|
||||
return {
|
||||
"action": action,
|
||||
"repository": {
|
||||
"full_name": "Koozali-SME-Server/smeserver-manager",
|
||||
"owner": {"login": "Koozali-SME-Server"},
|
||||
"name": "smeserver-manager",
|
||||
},
|
||||
"pull_request": {
|
||||
"number": 42,
|
||||
"html_url": "https://github.com/Koozali-SME-Server/smeserver-manager/pull/42",
|
||||
"title": "Improve logging",
|
||||
"body": "Please review.",
|
||||
"user": {"login": "octocat", "html_url": "https://github.com/octocat"},
|
||||
"base": {"ref": "master", "sha": "9f8e7d6cafebabe0000deadbeef0000000000000"},
|
||||
"head": {"ref": "feature/logs", "sha": "a1b2c3ddeeddbb0000deadbeef0000000000000", "repo": {"owner": {"login": "octocat"}}},
|
||||
"created_at": "2025-09-16T12:34:56Z",
|
||||
"labels": [{"name": "enhancement"}],
|
||||
},
|
||||
}
|
||||
|
||||
def run_tests():
|
||||
# Inject fakes into the running module singletons
|
||||
ws.gh = FakeGhSingleton("Koozali-SME-Server/smeserver-manager")
|
||||
ws.bz = FakeBugzilla(fail_create=False)
|
||||
ws.state = InMemoryState()
|
||||
|
||||
client = TestClient(ws.app)
|
||||
secret = os.environ.get("WEBHOOK_SECRET", "test_secret") # must match your server config
|
||||
payload = sample_payload("opened")
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
headers = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(secret, body),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
resp = client.post("/webhook/github", data=body, headers=headers)
|
||||
print("Opened response:", resp.status_code, resp.json())
|
||||
repo = ws.gh.get_repo("Koozali-SME-Server/smeserver-manager")
|
||||
print("Comments:", [c.text.strip() for c in repo.comments])
|
||||
print("PR state:", repo.get_pull(42).state)
|
||||
print("Created bugs:", ws.bz.created_bugs)
|
||||
|
||||
# Test synchronize on same PR (existing bug id used)
|
||||
ws.state.set_bug("Koozali-SME-Server/smeserver-manager#42", ws.bz.created_bugs[0])
|
||||
payload_sync = sample_payload("synchronize")
|
||||
body_sync = json.dumps(payload_sync).encode("utf-8")
|
||||
headers_sync = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(secret, body_sync),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
resp2 = client.post("/webhook/github", data=body_sync, headers=headers_sync)
|
||||
print("Sync response:", resp2.status_code, resp2.json())
|
||||
print("Attachments recorded:", ws.bz.attachments)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
run_tests()
|
45
TestPrograms/test_webhook_http.py
Normal file
45
TestPrograms/test_webhook_http.py
Normal file
@@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env python3
|
||||
import os, json, hmac, hashlib, httpx
|
||||
|
||||
WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "https://githubpr.bjsystems.co.uk/webhook/github")
|
||||
SECRET = os.environ.get("WEBHOOK_SECRET", "devsecret")
|
||||
|
||||
def sign(secret: str, body: bytes) -> str:
|
||||
return "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
|
||||
|
||||
def payload(action="opened"):
|
||||
return {
|
||||
"action": action,
|
||||
"repository": {
|
||||
"full_name": "Koozali-SME-Server/smeserver-manager",
|
||||
"owner": {"login": "Koozali-SME-Server"},
|
||||
"name": "smeserver-manager",
|
||||
},
|
||||
"pull_request": {
|
||||
"number": 52,
|
||||
"html_url": "https://github.com/Koozali-SME-Server/smeserver-manager/pull/42",
|
||||
"title": "Test PR from harness",
|
||||
"body": "Harness test body.",
|
||||
"user": {"login": "octocat", "html_url": "https://github.com/octocat"},
|
||||
"base": {"ref": "master", "sha": "9f8e7d6cafebabe0000deadbeef0000000000000"},
|
||||
"head": {"ref": "feature/test", "sha": "a1b2c3ddeeddbb0000deadbeef0000000000000", "repo": {"owner": {"login": "octocat"}}},
|
||||
"created_at": "2025-09-17T12:34:56Z",
|
||||
"labels": [{"name": "enhancement"}],
|
||||
},
|
||||
}
|
||||
|
||||
def post_event(action="opened"):
|
||||
body = json.dumps(payload(action)).encode("utf-8")
|
||||
headers = {
|
||||
"X-GitHub-Event": "pull_request",
|
||||
"X-Hub-Signature-256": sign(SECRET, body),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
r = httpx.post(WEBHOOK_URL, data=body, headers=headers, timeout=30)
|
||||
print(f"{action} ->", r.status_code, r.text)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Send an "opened" event then a "synchronize" event
|
||||
post_event("opened")
|
||||
post_event("synchronize")
|
||||
|
Reference in New Issue
Block a user