RSS-to-Rocket/koji rss to rocket json.py

178 lines
5.7 KiB
Python

import time
import feedparser
import requests
import sqlite3
import logging
import os
import requests
from bs4 import BeautifulSoup
import socket
import sys
def join_lines_with_newline(lines):
return "\n".join(lines)
def get_ip_address(domain, retries=10, delay=1):
"""
Resolves the IP address of a domain, retrying up to `retries` times if it fails.
Args:
domain (str): The domain name to resolve.
retries (int): Number of retry attempts (default is 10).
delay (int): Delay between retries in seconds (default is 1 second).
Returns:
str: The IP address if resolved successfully.
Raises:
RuntimeError: If unable to resolve the domain after all attempts.
"""
for attempt in range(1, retries + 1):
try:
ip_address = socket.gethostbyname(domain)
logging.info(f"Successfully resolved {domain} to {ip_address}")
return ip_address
except socket.gaierror:
logging.warning(f"Attempt {attempt} failed. Retrying...")
time.sleep(delay)
raise RuntimeError(f"Unable to resolve domain '{domain}' after {retries} attempts.")
def extract_changelog_top(koji_link, host_header):
try:
response = requests.get(koji_link, headers={'Host': host_header})
except Exception as e:
logging.error(f"Koji link failed: {str(e)}")
return []
soup = BeautifulSoup(response.text, 'html.parser')
changelog_td = soup.find('td', class_='changelog')
if changelog_td:
changelog_text = changelog_td.get_text(strip=False)
lines = changelog_text.split('\n')
result = []
seen = set()
for line in lines:
stripped_line = line.strip()
if not stripped_line:
if result:
break
continue
if stripped_line not in seen:
seen.add(stripped_line)
if not result or stripped_line.startswith('-'):
result.append(stripped_line)
return join_lines_with_newline(result)
else:
return []
# Updated Rocket.Chat webhook URL
ROCKET_CHAT_URL = "https://chat.koozali.org/hooks/66d24441effca216c2ca309f/KJLaNwc5vyHwqz5MhRDpBkKWnQuAfsCX3xZMHxPhpuqmFgBn"
# Set up logging
log_file = "/var/log/Koji2Rocket.log"
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# Database setup
def setup_database():
conn = sqlite3.connect('sent_builds.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sent_builds (
id TEXT PRIMARY KEY
)
''')
conn.commit()
conn.close()
def has_build_been_sent(build_id):
conn = sqlite3.connect('sent_builds.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM sent_builds WHERE id = ?', (build_id,))
exists = cursor.fetchone() is not None
conn.close()
return exists
def mark_build_as_sent(build_id):
conn = sqlite3.connect('sent_builds.db')
cursor = conn.cursor()
cursor.execute('INSERT OR IGNORE INTO sent_builds (id) VALUES (?)', (build_id,))
conn.commit()
conn.close()
def send_to_rocket_chat(build_title, build_link, build_id, changelog):
payload = {
"alias": "Koji",
"text": f"Build Notification - ID: {build_id}\n{changelog}",
"attachments": [
{
"title": build_title,
"title_link": build_link,
"color": "#764FA5"
}
]
}
response = requests.post(ROCKET_CHAT_URL, json=payload)
if response.status_code == 200:
logging.info(f"Build notification sent successfully: {build_title} (ID: {build_id})")
else:
logging.error(f"Failed to send build notification: {response.status_code} - {response.text}")
def send_startup_message():
payload = {
"alias": "Koji",
"text": "Koji to Rocket.Chat integration started successfully.",
}
response = requests.post(ROCKET_CHAT_URL, json=payload)
if response.status_code == 200:
logging.info("Startup message sent successfully to Rocket.Chat.")
else:
logging.error(f"Failed to send startup message: {response.status_code} - {response.text}")
def fetch_koji_feed(koji_rss_url, host_header):
try:
response = requests.get(koji_rss_url, headers={'Host': host_header})
feed = feedparser.parse(response.content)
return feed.entries
except Exception as e:
logging.error(f"RSS request to koji failed: {str(e)}")
return []
def main():
setup_database()
send_startup_message()
original_domain = "koji.koozali.org"
try:
koji_ip = get_ip_address(original_domain)
except RuntimeError as e:
logging.error(str(e))
sys.exit(1)
# Create IP-based URL and preserve original domain for headers
koji_rss_url = f"http://{koji_ip}/koji/recentbuilds?feed=rss"
logging.info(f"Using Koji RSS URL: {koji_rss_url}")
while True:
entries = fetch_koji_feed(koji_rss_url, original_domain)
for entry in entries:
build_title = entry.title.strip()
original_link = entry.link
build_link = original_link.replace(original_domain, koji_ip)
build_id = build_title.split(':')[1].split(',')[0].strip()
changelog = extract_changelog_top(build_link, original_domain)
if not has_build_been_sent(build_id):
send_to_rocket_chat(build_title, build_link, build_id, changelog)
mark_build_as_sent(build_id)
time.sleep(60)
if __name__ == "__main__":
main()