import argparse import time import feedparser import requests import sqlite3 import logging import os import xml.etree.ElementTree as ET import socket # Ensure you import socket for gethostbyname # Common chat URL for the feed COMMON_CHAT_URL = "https://chat.koozali.org/hooks/677e97a73ddf8049989dbc8c/r9uiYpTRAXo3mkFKxHnoTwGCdtKpYaDemCpHArgz89knkwLo" # Constants: Mapping Bugzilla RSS feeds to specific Rocket.Chat URLs with filtering criteria FEED_TO_CHAT_MAP = { "Bugzilla Feed 1": { "rss_feed": None, # To be set later with the resolved IP address "chat_url": COMMON_CHAT_URL, "filter_field": "status", "filter_value": "open", "bypass_filter": True, } } def get_ip_address(domain, retries=10, delay=1): """ Resolves the IP address of a domain, retrying up to `retries` times if it fails. Args: domain (str): The domain name to resolve. retries (int): Number of retry attempts (default is 10). delay (int): Delay between retries in seconds (default is 1 second). Returns: str: The IP address if resolved successfully. Raises: RuntimeError: If unable to resolve the domain after all attempts. """ for attempt in range(1, retries + 1): try: ip_address = socket.gethostbyname(domain) logging.info(f"Successfully resolved {domain} to {ip_address}") return ip_address except socket.gaierror: logging.warning(f"Attempt {attempt} failed. Retrying...") time.sleep(delay) raise RuntimeError(f"Unable to resolve domain '{domain}' after {retries} attempts.") # Set up logging to the current directory log_file = "/var/log/BugzillaToRocket.log" logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Set to track which chat URLs have sent the startup message sent_chat_urls = set() # Database setup def setup_database(): db_path = 'sent_bugs.db' database_exists = os.path.isfile(db_path) conn = sqlite3.connect(db_path) cursor = conn.cursor() if database_exists: cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sent_bugs';") if cursor.fetchone() is not None: cursor.execute("PRAGMA table_info(sent_bugs);") columns = [column[1] for column in cursor.fetchall()] expected_columns = ['id', 'status'] if sorted(columns) != sorted(expected_columns): logging.warning("Database schema mismatch. Dropping existing table and recreating.") cursor.execute('DROP TABLE sent_bugs;') cursor.execute(''' CREATE TABLE sent_bugs ( id TEXT, status TEXT, PRIMARY KEY (id, status) ) ''') else: cursor.execute(''' CREATE TABLE sent_bugs ( id TEXT, status TEXT, PRIMARY KEY (id, status) ) ''') else: cursor.execute(''' CREATE TABLE sent_bugs ( id TEXT, status TEXT, PRIMARY KEY (id, status) ) ''') conn.commit() conn.close() # Function to check if a bug with a specific ID and status has been sent def has_bug_been_sent(bug_id, status): conn = sqlite3.connect('sent_bugs.db') cursor = conn.cursor() cursor.execute('SELECT * FROM sent_bugs WHERE id = ? AND status = ?', (bug_id, status)) exists = cursor.fetchone() is not None conn.close() return exists # Function to mark a bug with its status as sent def mark_bug_as_sent(bug_id, status): conn = sqlite3.connect('sent_bugs.db') cursor = conn.cursor() cursor.execute('INSERT OR IGNORE INTO sent_bugs (id, status) VALUES (?, ?)', (bug_id, status)) conn.commit() conn.close() # Function to send a message to Rocket.Chat def send_to_rocket_chat(bug_title, bug_link, bug_id, status, reported_by, last_changed, product, component, chat_url): payload = { "alias": "Bugzilla", "text": ( f"*Bug Report - ID: *{bug_id}, *Status: *{status} | " f"*Reported By: *{reported_by} | " f"*Last Changed: *{last_changed} | " f"*Product: *{product} | " f"*Component: *{component}" ), "attachments": [ { "title": bug_title, "title_link": bug_link, "color": "#764FA5" } ] } response = requests.post(chat_url, json=payload) if response.status_code == 200: logging.info(f"Bug notification sent successfully: {bug_title} (ID: {bug_id}, Status: {status})") else: logging.error(f"Failed to send bug notification: {response.status_code} - {response.text}") # Function to send a startup message to Rocket.Chat def send_startup_message(chat_url): global sent_chat_urls if chat_url not in sent_chat_urls: payload = { "alias": "Bugzilla", "text": "Bugzilla to Rocket.Chat integration started successfully.", } response = requests.post(chat_url, json=payload) if response.status_code == 200: logging.info(f"Startup message sent successfully to {chat_url}.") sent_chat_urls.add(chat_url) else: logging.error(f"Failed to send startup message to {chat_url}: {response.status_code} - {response.text}") # Function to extract fields from XML summary def parse_summary(summary): summary = summary.replace("<", "<").replace(">", ">").replace("&", "&") root = ET.fromstring(summary) status = '' reported_by = '' last_changed = '' product = '' component = '' for row in root.findall('.//tr'): field = row[0].text value = row[1].text if field == 'Status': status = value elif field == 'ReportedByName': reported_by = value elif field == 'Last changed date': last_changed = value elif field == 'Product': product = value elif field == 'Component': component = value return status, reported_by, last_changed, product, component # Function to fetch and parse Bugzilla RSS feeds def fetch_bugzilla_feed(feed_url, original_domain): headers = {'Host': original_domain} # Set Host header logging.info(f"Fetching feed: {feed_url}") feed = feedparser.parse(feed_url, request_headers=headers) entries = [] for entry in feed.entries: summary = entry.summary status, reported_by, last_changed, product, component = parse_summary(summary) # Add the relevant fields to the entry entry.status = status entry.reported_by = reported_by entry.last_changed = last_changed entry.product = product entry.component = component entries.append(entry) return entries # Function to parse command-line arguments def parse_arguments(): parser = argparse.ArgumentParser(description="Bugzilla to Rocket.Chat notifier.") parser.add_argument( '--sleep', type=int, default=1, help='Number of minutes to sleep between polls (default: 1 minute)' ) parser.add_argument( '--one-off', action='store_true', help='Run once without sleeping' ) return parser.parse_args() # Main polling loop def main(): args = parse_arguments() sleep_duration = args.sleep * 60 setup_database() # Preload IP address for Bugzilla instance bugzilla_ip = get_ip_address("bugs.koozali.org") # Update the RSS feed URL with the resolved IP address FEED_TO_CHAT_MAP["Bugzilla Feed 1"]["rss_feed"] = f"http://{bugzilla_ip}/buglist.cgi?chfield=%5BBug%20creation%5D&chfieldfrom=7d&ctype=atom&title=Bugs%20reported%20in%20the%20last%207%20days" # Send startup message for the feed's chat URL send_startup_message(FEED_TO_CHAT_MAP["Bugzilla Feed 1"]["chat_url"]) while True: for feed_name, chat_info in FEED_TO_CHAT_MAP.items(): # Use the resolved IP address in fetch_bugzilla_feed entries = fetch_bugzilla_feed(chat_info['rss_feed'], "bugs.koozali.org") filter_field = chat_info.get('filter_field', '') filter_value = chat_info.get('filter_value', '') bypass_filter = chat_info.get('bypass_filter', False) for entry in entries: bug_id = entry.id.split('=')[-1] status = entry.status.lower() if hasattr(entry, 'status') else "unknown" if bypass_filter or (getattr(entry, filter_field, "").lower() == filter_value): if not has_bug_been_sent(bug_id, status): title = entry.title.strip() link = entry.link reported_by = entry.reported_by last_changed = entry.last_changed product = entry.product component = entry.component send_to_rocket_chat(title, link, bug_id, status, reported_by, last_changed, product, component, chat_info['chat_url']) mark_bug_as_sent(bug_id, status) # Sleep for the specified duration unless --one-off flag is set if not args.one_off: time.sleep(sleep_duration) else: break # Exit loop after one iteration if --one-off is set if __name__ == "__main__": main()