Add in Wiki to rocket and edit log path for bugzilla

This commit is contained in:
root 2025-02-27 09:05:33 +00:00
parent d7f2b71533
commit 712c80d113
3 changed files with 443 additions and 55 deletions

View File

@ -1,56 +1,114 @@
import argparse
import time
import feedparser
import requests
import sqlite3
import logging
import os
import xml.etree.ElementTree as ET
# Bugzilla RSS feed URL
BUGZILLA_RSS_URL = "https://bugs.koozali.org/buglist.cgi?chfield=%5BBug%20creation%5D&chfieldfrom=7d&ctype=atom&title=Bugs%20reported%20in%20the%20last%207%20days"
# Updated Rocket.Chat webhook URL
ROCKET_CHAT_URL = "https://chat.koozali.org/hooks/677e97a73ddf8049989dbc8c/r9uiYpTRAXo3mkFKxHnoTwGCdtKpYaDemCpHArgz89knkwLo"
# Common chat URL for both feeds
COMMON_CHAT_URL = "https://chat.koozali.org/hooks/677e97a73ddf8049989dbc8c/r9uiYpTRAXo3mkFKxHnoTwGCdtKpYaDemCpHArgz89knkwLo"
# Set up logging
# Constants: Mapping Bugzilla RSS feeds to specific Rocket.Chat URLs with filtering criteria
FEED_TO_CHAT_MAP = {
"Bugzilla Feed 1": {
"rss_feed": "https://bugs.koozali.org/buglist.cgi?chfield=%5BBug%20creation%5D&chfieldfrom=7d&ctype=atom&title=Bugs%20reported%20in%20the%20last%207%20days",
"chat_url": COMMON_CHAT_URL, # Use common chat URL
"filter_field": "status",
"filter_value": "open",
"bypass_filter": True,
},
"Bugzilla Feed 2": {
"rss_feed": "https://another.bugzilla.instance/buglist.cgi?chfield=%5BBug%20creation%5D&chfieldfrom=7d&ctype=atom&title=Another%20set%20of%20Bugs",
"chat_url": COMMON_CHAT_URL, # Use common chat URL
"filter_field": "priority",
"filter_value": "high",
"bypass_filter": True,
}
}
# Set up logging to the current directory
log_file = "/var/log/BugzillaToRocket.log"
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Ensure the log directory exists
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# Set to track which chat URLs have sent the startup message
sent_chat_urls = set()
# Database setup
def setup_database():
conn = sqlite3.connect('sent_bugs.db')
db_path = 'sent_bugs.db'
database_exists = os.path.isfile(db_path)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sent_bugs (
id TEXT PRIMARY KEY
)
''')
if database_exists:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sent_bugs';")
if cursor.fetchone() is not None:
cursor.execute("PRAGMA table_info(sent_bugs);")
columns = [column[1] for column in cursor.fetchall()]
expected_columns = ['id', 'status']
if sorted(columns) != sorted(expected_columns):
logging.warning("Database schema mismatch. Dropping existing table and recreating.")
cursor.execute('DROP TABLE sent_bugs;')
cursor.execute('''
CREATE TABLE sent_bugs (
id TEXT,
status TEXT,
PRIMARY KEY (id, status)
)
''')
else:
cursor.execute('''
CREATE TABLE sent_bugs (
id TEXT,
status TEXT,
PRIMARY KEY (id, status)
)
''')
else:
cursor.execute('''
CREATE TABLE sent_bugs (
id TEXT,
status TEXT,
PRIMARY KEY (id, status)
)
''')
conn.commit()
conn.close()
# Function to check if a bug ID has been sent
def has_bug_been_sent(bug_id):
# Function to check if a bug with a specific ID and status has been sent
def has_bug_been_sent(bug_id, status):
conn = sqlite3.connect('sent_bugs.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM sent_bugs WHERE id = ?', (bug_id,))
cursor.execute('SELECT * FROM sent_bugs WHERE id = ? AND status = ?', (bug_id, status))
exists = cursor.fetchone() is not None
conn.close()
return exists
# Function to mark a bug ID as sent
def mark_bug_as_sent(bug_id):
# Function to mark a bug with its status as sent
def mark_bug_as_sent(bug_id, status):
conn = sqlite3.connect('sent_bugs.db')
cursor = conn.cursor()
cursor.execute('INSERT OR IGNORE INTO sent_bugs (id) VALUES (?)', (bug_id,))
cursor.execute('INSERT OR IGNORE INTO sent_bugs (id, status) VALUES (?, ?)', (bug_id, status))
conn.commit()
conn.close()
# Function to send a message to Rocket.Chat
def send_to_rocket_chat(bug_title, bug_link, bug_id):
def send_to_rocket_chat(bug_title, bug_link, bug_id, status, reported_by, last_changed, product, component, chat_url):
payload = {
"alias": "Bugzilla",
"text": f"Bug Report - ID: {bug_id}",
"text": (
f"*Bug Report - ID: *{bug_id}, *Status: *{status} | "
f"*Reported By: *{reported_by} | "
f"*Last Changed: *{last_changed} | "
f"*Product: *{product} | "
f"*Component: *{component}"
),
"attachments": [
{
"title": bug_title,
@ -59,48 +117,134 @@ def send_to_rocket_chat(bug_title, bug_link, bug_id):
}
]
}
response = requests.post(ROCKET_CHAT_URL, json=payload)
response = requests.post(chat_url, json=payload)
if response.status_code == 200:
logging.info(f"Bug notification sent successfully: {bug_title} (ID: {bug_id})")
logging.info(f"Bug notification sent successfully: {bug_title} (ID: {bug_id}, Status: {status})")
else:
logging.error(f"Failed to send bug notification: {response.status_code} - {response.text}")
# Function to send a startup message to Rocket.Chat
def send_startup_message():
payload = {
"alias": "Bugzilla",
"text": "Bugzilla to Rocket.Chat integration started successfully.",
}
response = requests.post(ROCKET_CHAT_URL, json=payload)
if response.status_code == 200:
logging.info("Startup message sent successfully to Rocket.Chat.")
else:
logging.error(f"Failed to send startup message: {response.status_code} - {response.text}")
def send_startup_message(chat_url):
global sent_chat_urls
if chat_url not in sent_chat_urls:
payload = {
"alias": "Bugzilla",
"text": "Bugzilla to Rocket.Chat integration started successfully.",
}
response = requests.post(chat_url, json=payload)
if response.status_code == 200:
logging.info(f"Startup message sent successfully to {chat_url}.")
sent_chat_urls.add(chat_url)
else:
logging.error(f"Failed to send startup message to {chat_url}: {response.status_code} - {response.text}")
# Function to fetch and parse the Bugzilla RSS feed
def fetch_bugzilla_feed():
feed = feedparser.parse(BUGZILLA_RSS_URL)
return feed.entries
# Function to extract fields from XML summary
def parse_summary(summary):
summary = summary.replace("&lt;", "<").replace("&gt;", ">").replace("&amp;", "&")
root = ET.fromstring(summary)
status = ''
reported_by = ''
last_changed = ''
product = ''
component = ''
for row in root.findall('.//tr'):
field = row[0].text
value = row[1].text
if field == 'Status':
status = value
elif field == 'ReportedByName':
reported_by = value
elif field == 'Last changed date':
last_changed = value
elif field == 'Product':
product = value
elif field == 'Component':
component = value
return status, reported_by, last_changed, product, component
# Function to fetch and parse Bugzilla RSS feeds
def fetch_bugzilla_feed(feed_url):
logging.info(f"Fetching feed: {feed_url}")
feed = feedparser.parse(feed_url)
entries = []
for entry in feed.entries:
summary = entry.summary
status, reported_by, last_changed, product, component = parse_summary(summary)
# Add the relevant fields to the entry
entry.status = status
entry.reported_by = reported_by
entry.last_changed = last_changed
entry.product = product
entry.component = component
entries.append(entry)
return entries
# Function to parse command-line arguments
def parse_arguments():
parser = argparse.ArgumentParser(description="Bugzilla to Rocket.Chat notifier.")
parser.add_argument(
'--sleep',
type=int,
default=1,
help='Number of minutes to sleep between polls (default: 1 minute)'
)
parser.add_argument(
'--one-off',
action='store_true',
help='Run once without sleeping'
)
return parser.parse_args()
# Main polling loop
def main():
setup_database() # Initialize the database
send_startup_message() # Send the startup message
args = parse_arguments()
sleep_duration = args.sleep * 60
setup_database()
# Send startup messages for each feed's chat URL
for feed_info in FEED_TO_CHAT_MAP.values():
send_startup_message(feed_info['chat_url'])
while True:
entries = fetch_bugzilla_feed()
for entry in entries:
bug_id = entry.id.split('=')[-1] # Extract the bug number from the URL
if not has_bug_been_sent(bug_id): # Check if the bug has been sent
title = entry.title.strip()
link = entry.link
# Send the bug title, link, and ID
send_to_rocket_chat(title, link, bug_id)
mark_bug_as_sent(bug_id) # Mark the bug ID as sent
# Wait for 1 minute before polling again
time.sleep(60)
for feed_name, chat_info in FEED_TO_CHAT_MAP.items():
entries = fetch_bugzilla_feed(chat_info['rss_feed'])
filter_field = chat_info.get('filter_field', '')
filter_value = chat_info.get('filter_value', '')
bypass_filter = chat_info.get('bypass_filter', False)
for entry in entries:
bug_id = entry.id.split('=')[-1]
status = entry.status.lower() if hasattr(entry, 'status') else "unknown"
if bypass_filter or (getattr(entry, filter_field, "").lower() == filter_value):
if not has_bug_been_sent(bug_id, status):
title = entry.title.strip()
link = entry.link
reported_by = entry.reported_by
last_changed = entry.last_changed
product = entry.product
component = entry.component
send_to_rocket_chat(title, link, bug_id, status, reported_by, last_changed, product, component, chat_info['chat_url'])
mark_bug_as_sent(bug_id, status)
# Sleep for the specified duration unless --one-off flag is set
if not args.one_off:
time.sleep(sleep_duration)
else:
break # Exit loop after one iteration if --one-off is set
if __name__ == "__main__":
main()

View File

@ -4,6 +4,37 @@ import requests
import sqlite3
import logging
import os
import requests
from bs4 import BeautifulSoup
def join_lines_with_newline(lines):
return "\n".join(lines)
def extract_changelog_top(koji_link):
response = requests.get(koji_link)
soup = BeautifulSoup(response.text, 'html.parser')
changelog_td = soup.find('td', class_='changelog')
if changelog_td:
changelog_text = changelog_td.get_text(strip=False)
lines = changelog_text.split('\n')
result = []
seen = set()
for line in lines:
stripped_line = line.strip()
if not stripped_line:
if result: # Stop at the first blank line after content
break
continue
if stripped_line not in seen:
seen.add(stripped_line)
if not result or stripped_line.startswith('-'):
result.append(stripped_line)
return join_lines_with_newline(result)
else:
return []
# Koji RSS feed URL
KOJI_RSS_URL = "http://koji.koozali.org/koji/recentbuilds?feed=rss"
@ -47,10 +78,10 @@ def mark_build_as_sent(build_id):
conn.close()
# Function to send message to Rocket.Chat
def send_to_rocket_chat(build_title, build_link, build_id):
def send_to_rocket_chat(build_title, build_link, build_id,changelog):
payload = {
"alias": "Koji",
"text": f"Build Notification - ID: {build_id}",
"text": f"Build Notification - ID: {build_id}\n{changelog}",
"attachments": [
{
"title": build_title,
@ -95,9 +126,12 @@ def main():
build_link = entry.link
# Extracting the build ID from the title
build_id = build_title.split(':')[1].split(',')[0].strip() # Extract the relevant part from the title
changelog = extract_changelog_top(entry.link)
# print(f"{entry.link}\n{changelog}")
# quit(1)
if not has_build_been_sent(build_id): # Check if the build has been sent
send_to_rocket_chat(build_title, build_link, build_id)
send_to_rocket_chat(build_title, build_link, build_id,changelog)
mark_build_as_sent(build_id) # Mark the build ID as sent
# Wait for 1 minute before polling again

210
wiki rss to rocket chat.py Normal file
View File

@ -0,0 +1,210 @@
import time
import feedparser
import requests
import sqlite3
import logging
import os
from bs4 import BeautifulSoup
import json
import re
import argparse
# Example structure from Wiki
# {
# "title": "Koozali SME Server Debugging (French)",
# "recent_changes": {
# "last_updated": "February 16, 2025, at 20:26 GMT",
# "author": "Gieres"
# },
# "summary_of_changes": {
# "description": "The recent update to the page discusses using Perl with Visual Studio.",
# "server_commands": {
# "removed_commands": "yum --enablerepo=* install gcc gcc-c++ perl-App-cpanminus perl-AnyEvent-AIO perl-Coro",
# "updated_commands": "yum --enablerepo=* install gcc gcc-c++"
# },
# "package_changes": {
# "changed_from": "cpanm Class::Refresh",
# "changed_to": "cpan App::cpanminus",
# "additional_commands": [
# "cpanm Compiler::Lexer",
# "cpanm Perl::LanguageServer"
# ]
# }
# },
# "links": {
# "full_revision": "https://wiki.koozali.org/index.php?title=Koozali_SME_Server_Debugging/fr&diff=43309&oldid=43107",
# "comments": "https://wiki.koozali.org/Talk:Koozali_SME_Server_Debugging/fr"
# }
# }
def join_lines_with_newline(lines):
return "\n".join(lines)
def extract_changelog_top(Wiki_link):
response = requests.get(Wiki_link)
soup = BeautifulSoup(response.text, 'html.parser')
changelog_td = soup.find('td', class_='changelog')
if changelog_td:
changelog_text = changelog_td.get_text(strip=False)
lines = changelog_text.split('\n')
result = []
seen = set()
for line in lines:
stripped_line = line.strip()
if not stripped_line:
if result: # Stop at the first blank line after content
break
continue
if stripped_line not in seen:
seen.add(stripped_line)
if not result or stripped_line.startswith('-'):
result.append(stripped_line)
return join_lines_with_newline(result)
else:
return []
# Wiki RSS feed URL
Wiki_RSS_URL = "https://wiki.koozali.org/api.php?hidebots=1&urlversion=2&days=7&limit=50&action=feedrecentchanges&feedformat=rss"
# Updated Rocket.Chat webhook URL
ROCKET_CHAT_URL = "https://chat.koozali.org/hooks/67b8a0fc1f56a124fa47dbec/Dee9hFcASKvvBsW9uRyy7Xjg2m3NTxDprt2HQKTRNuWv8SFr"
# Set up logging
log_file = "/var/log/Wiki2Rocket.log"
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Database setup
def setup_database():
conn = sqlite3.connect('sent_wikis.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sent_wikis (
id TEXT PRIMARY KEY
)
''')
conn.commit()
conn.close()
# Function to check if a wiki ID has been sent
def has_wiki_been_sent(wiki_id):
conn = sqlite3.connect('sent_wikis.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM sent_wikis WHERE id = ?', (wiki_id,))
exists = cursor.fetchone() is not None
conn.close()
return exists
# Function to mark a wiki ID as sent
def mark_wiki_as_sent(wiki_id):
conn = sqlite3.connect('sent_wikis.db')
cursor = conn.cursor()
cursor.execute('INSERT OR IGNORE INTO sent_wikis (id) VALUES (?)', (wiki_id,))
conn.commit()
conn.close()
# Function to send message to Rocket.Chat
def send_to_rocket_chat(wiki_title, wiki_link, wiki_id, description):
#logging.info(f"Logging link:{wiki_link}")
payload = {
"alias": "Wiki",
"text": f"{description}",
"attachments": [
{
"title": wiki_link,
"title_link": wiki_link,
"color": "#764FA5"
}
]
}
response = requests.post(ROCKET_CHAT_URL, json=payload)
if response.status_code == 200:
logging.info(f"Wiki notification sent successfully: {wiki_title} (ID: {wiki_id})")
else:
logging.error(f"Failed to send wiki notification: {response.status_code} - {response.text}")
# Function to send startup message to Rocket.Chat
def send_startup_message():
payload = {
"alias": "Wiki",
"text": "Wiki to Rocket.Chat integration started successfully.",
}
response = requests.post(ROCKET_CHAT_URL, json=payload)
if response.status_code == 200:
logging.info("Startup message sent successfully to Rocket.Chat.")
else:
logging.error(f"Failed to send startup message: {response.status_code} - {response.text}")
# Function to fetch and parse the Wiki RSS feed
def fetch_Wiki_feed():
feed = feedparser.parse(Wiki_RSS_URL)
return feed.entries
# Function to clear the sent wiki database
def clear_sent_wiki_database():
conn = sqlite3.connect('sent_wikis.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM sent_wikis')
conn.commit()
conn.close()
logging.info("Cleared the sent database.")
# Main polling loop
def main(one_shot=False, empty_db=False):
setup_database() # Initialize the database
if empty_db:
clear_sent_wiki_database()
send_startup_message() # Send startup message
entries = fetch_Wiki_feed()
for entry in entries:
json_data = json.dumps(entry, indent=2)
data = json.loads(json_data)
title = data["title"]
published_date = data["published"]
author = data["author"]
revision_link = data["link"]
formatted_string = (
f"Title: {title}\n"
f"Published Date: {published_date}\n"
f"Author: {author}\n"
f"Link to Comparison (old vs new): {revision_link}"
)
match = re.search(r'(.*?title=.*?)(.*)', data["id"])
if match:
wiki_link = match.group(1)+title # Everything up to and including "title=<whatever>"
wiki_id = match.group(2) # Everything from (and including) <whatever> to the end
else:
wiki_id = "Unexpected link"
wiki_link = "Unexpected link"
#logging.info(f"{wiki_link} {wiki_id} {title}")
if not has_wiki_been_sent(wiki_id): # Check if the wiki has been sent
send_to_rocket_chat(title, wiki_link, wiki_id, formatted_string)
mark_wiki_as_sent(wiki_id) # Mark the wiki ID as sent
if one_shot:
logging.info("One-shot mode activated; exiting after sending one message.")
return # Exit after sending the first message
if not one_shot:
# Wait for 1 minute before polling again
time.sleep(60)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Wiki to Rocket.Chat integration.")
parser.add_argument('--one-shot', action='store_true', help='Exit after sending one message.')
parser.add_argument('--empty-db', action='store_true', help='Clear the database of sent entries.')
args = parser.parse_args()
main(one_shot=args.one_shot, empty_db=args.empty_db)