RSS-to-Rocket/bugzilla rss to rocket json.py

277 lines
9.5 KiB
Python

import argparse
import time
import feedparser
import requests
import sqlite3
import logging
import os
import xml.etree.ElementTree as ET
# Common chat URL for both feeds
COMMON_CHAT_URL = "https://chat.koozali.org/hooks/677e97a73ddf8049989dbc8c/r9uiYpTRAXo3mkFKxHnoTwGCdtKpYaDemCpHArgz89knkwLo"
# Constants: Mapping Bugzilla RSS feeds to specific Rocket.Chat URLs with filtering criteria
FEED_TO_CHAT_MAP = {
"Bugzilla Feed 1": {
"rss_feed": "https://bugs.koozali.org/buglist.cgi?chfield=%5BBug%20creation%5D&chfieldfrom=7d&ctype=atom&title=Bugs%20reported%20in%20the%20last%207%20days",
"chat_url": COMMON_CHAT_URL, # Use common chat URL
"filter_field": "status",
"filter_value": "open",
"bypass_filter": True,
},
"Bugzilla Feed 2": {
"rss_feed": "https://another.bugzilla.instance/buglist.cgi?chfield=%5BBug%20creation%5D&chfieldfrom=7d&ctype=atom&title=Another%20set%20of%20Bugs",
"chat_url": COMMON_CHAT_URL, # Use common chat URL
"filter_field": "priority",
"filter_value": "high",
"bypass_filter": True,
}
}
def get_ip_address(domain, retries=10, delay=1):
"""
Resolves the IP address of a domain, retrying up to `retries` times if it fails.
Args:
domain (str): The domain name to resolve.
retries (int): Number of retry attempts (default is 10).
delay (int): Delay between retries in seconds (default is 1 second).
Returns:
str: The IP address if resolved successfully.
Raises:
RuntimeError: If unable to resolve the domain after all attempts.
"""
for attempt in range(1, retries + 1):
try:
ip_address = socket.gethostbyname(domain)
logging.info(f"Successfully resolved {domain} to {ip_address}")
return ip_address
except socket.gaierror:
logging.warning(f"Attempt {attempt} failed. Retrying...")
time.sleep(delay)
raise RuntimeError(f"Unable to resolve domain '{domain}' after {retries} attempts.")
# Set up logging to the current directory
log_file = "/var/log/BugzillaToRocket.log"
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Set to track which chat URLs have sent the startup message
sent_chat_urls = set()
# Database setup
def setup_database():
db_path = 'sent_bugs.db'
database_exists = os.path.isfile(db_path)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
if database_exists:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sent_bugs';")
if cursor.fetchone() is not None:
cursor.execute("PRAGMA table_info(sent_bugs);")
columns = [column[1] for column in cursor.fetchall()]
expected_columns = ['id', 'status']
if sorted(columns) != sorted(expected_columns):
logging.warning("Database schema mismatch. Dropping existing table and recreating.")
cursor.execute('DROP TABLE sent_bugs;')
cursor.execute('''
CREATE TABLE sent_bugs (
id TEXT,
status TEXT,
PRIMARY KEY (id, status)
)
''')
else:
cursor.execute('''
CREATE TABLE sent_bugs (
id TEXT,
status TEXT,
PRIMARY KEY (id, status)
)
''')
else:
cursor.execute('''
CREATE TABLE sent_bugs (
id TEXT,
status TEXT,
PRIMARY KEY (id, status)
)
''')
conn.commit()
conn.close()
# Function to check if a bug with a specific ID and status has been sent
def has_bug_been_sent(bug_id, status):
conn = sqlite3.connect('sent_bugs.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM sent_bugs WHERE id = ? AND status = ?', (bug_id, status))
exists = cursor.fetchone() is not None
conn.close()
return exists
# Function to mark a bug with its status as sent
def mark_bug_as_sent(bug_id, status):
conn = sqlite3.connect('sent_bugs.db')
cursor = conn.cursor()
cursor.execute('INSERT OR IGNORE INTO sent_bugs (id, status) VALUES (?, ?)', (bug_id, status))
conn.commit()
conn.close()
# Function to send a message to Rocket.Chat
def send_to_rocket_chat(bug_title, bug_link, bug_id, status, reported_by, last_changed, product, component, chat_url):
payload = {
"alias": "Bugzilla",
"text": (
f"*Bug Report - ID: *{bug_id}, *Status: *{status} | "
f"*Reported By: *{reported_by} | "
f"*Last Changed: *{last_changed} | "
f"*Product: *{product} | "
f"*Component: *{component}"
),
"attachments": [
{
"title": bug_title,
"title_link": bug_link,
"color": "#764FA5"
}
]
}
response = requests.post(chat_url, json=payload)
if response.status_code == 200:
logging.info(f"Bug notification sent successfully: {bug_title} (ID: {bug_id}, Status: {status})")
else:
logging.error(f"Failed to send bug notification: {response.status_code} - {response.text}")
# Function to send a startup message to Rocket.Chat
def send_startup_message(chat_url):
global sent_chat_urls
if chat_url not in sent_chat_urls:
payload = {
"alias": "Bugzilla",
"text": "Bugzilla to Rocket.Chat integration started successfully.",
}
response = requests.post(chat_url, json=payload)
if response.status_code == 200:
logging.info(f"Startup message sent successfully to {chat_url}.")
sent_chat_urls.add(chat_url)
else:
logging.error(f"Failed to send startup message to {chat_url}: {response.status_code} - {response.text}")
# Function to extract fields from XML summary
def parse_summary(summary):
summary = summary.replace("&lt;", "<").replace("&gt;", ">").replace("&amp;", "&")
root = ET.fromstring(summary)
status = ''
reported_by = ''
last_changed = ''
product = ''
component = ''
for row in root.findall('.//tr'):
field = row[0].text
value = row[1].text
if field == 'Status':
status = value
elif field == 'ReportedByName':
reported_by = value
elif field == 'Last changed date':
last_changed = value
elif field == 'Product':
product = value
elif field == 'Component':
component = value
return status, reported_by, last_changed, product, component
# Function to fetch and parse Bugzilla RSS feeds
def fetch_bugzilla_feed(feed_url):
logging.info(f"Fetching feed: {feed_url}")
feed = feedparser.parse(feed_url)
entries = []
for entry in feed.entries:
summary = entry.summary
status, reported_by, last_changed, product, component = parse_summary(summary)
# Add the relevant fields to the entry
entry.status = status
entry.reported_by = reported_by
entry.last_changed = last_changed
entry.product = product
entry.component = component
entries.append(entry)
return entries
# Function to parse command-line arguments
def parse_arguments():
parser = argparse.ArgumentParser(description="Bugzilla to Rocket.Chat notifier.")
parser.add_argument(
'--sleep',
type=int,
default=1,
help='Number of minutes to sleep between polls (default: 1 minute)'
)
parser.add_argument(
'--one-off',
action='store_true',
help='Run once without sleeping'
)
return parser.parse_args()
# Main polling loop
def main():
args = parse_arguments()
sleep_duration = args.sleep * 60
setup_database()
# Send startup messages for each feed's chat URL
for feed_info in FEED_TO_CHAT_MAP.values():
send_startup_message(feed_info['chat_url'])
while True:
for feed_name, chat_info in FEED_TO_CHAT_MAP.items():
entries = fetch_bugzilla_feed(chat_info['rss_feed'])
filter_field = chat_info.get('filter_field', '')
filter_value = chat_info.get('filter_value', '')
bypass_filter = chat_info.get('bypass_filter', False)
for entry in entries:
bug_id = entry.id.split('=')[-1]
status = entry.status.lower() if hasattr(entry, 'status') else "unknown"
if bypass_filter or (getattr(entry, filter_field, "").lower() == filter_value):
if not has_bug_been_sent(bug_id, status):
title = entry.title.strip()
link = entry.link
reported_by = entry.reported_by
last_changed = entry.last_changed
product = entry.product
component = entry.component
send_to_rocket_chat(title, link, bug_id, status, reported_by, last_changed, product, component, chat_info['chat_url'])
mark_bug_as_sent(bug_id, status)
# Sleep for the specified duration unless --one-off flag is set
if not args.one_off:
time.sleep(sleep_duration)
else:
break # Exit loop after one iteration if --one-off is set
if __name__ == "__main__":
main()