178 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			178 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import time
 | 
						|
import feedparser
 | 
						|
import requests
 | 
						|
import sqlite3
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import json
 | 
						|
import re
 | 
						|
import argparse
 | 
						|
import socket
 | 
						|
 | 
						|
def join_lines_with_newline(lines):
 | 
						|
    return "\n".join(lines)
 | 
						|
 | 
						|
def get_ip_address(domain, retries=10, delay=1):
 | 
						|
    for attempt in range(1, retries + 1):
 | 
						|
        try:
 | 
						|
            ip_address = socket.gethostbyname(domain)
 | 
						|
            logging.info(f"Successfully resolved {domain} to {ip_address}")
 | 
						|
            return ip_address
 | 
						|
        except socket.gaierror:
 | 
						|
            logging.warning(f"Attempt {attempt} failed. Retrying...")
 | 
						|
            time.sleep(delay)
 | 
						|
    
 | 
						|
    raise RuntimeError(f"Unable to resolve domain '{domain}' after {retries} attempts.")
 | 
						|
 | 
						|
WIKI_DOMAIN = "wiki.koozali.org"
 | 
						|
Wiki_RSS_URL = None
 | 
						|
 | 
						|
ROCKET_CHAT_URL = "https://chat.koozali.org/hooks/67b8a0fc1f56a124fa47dbec/Dee9hFcASKvvBsW9uRyy7Xjg2m3NTxDprt2HQKTRNuWv8SFr"
 | 
						|
 | 
						|
log_file = "/var/log/Wiki2Rocket.log"
 | 
						|
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | 
						|
 | 
						|
def setup_database():
 | 
						|
    conn = sqlite3.connect('sent_wikis.db')
 | 
						|
    cursor = conn.cursor()
 | 
						|
    cursor.execute('''
 | 
						|
        CREATE TABLE IF NOT EXISTS sent_wikis (
 | 
						|
            id TEXT PRIMARY KEY
 | 
						|
        )
 | 
						|
    ''')
 | 
						|
    conn.commit()
 | 
						|
    conn.close()
 | 
						|
 | 
						|
def has_wiki_been_sent(wiki_id):
 | 
						|
    conn = sqlite3.connect('sent_wikis.db')
 | 
						|
    cursor = conn.cursor()
 | 
						|
    cursor.execute('SELECT * FROM sent_wikis WHERE id = ?', (wiki_id,))
 | 
						|
    exists = cursor.fetchone() is not None
 | 
						|
    conn.close()
 | 
						|
    return exists
 | 
						|
 | 
						|
def mark_wiki_as_sent(wiki_id):
 | 
						|
    conn = sqlite3.connect('sent_wikis.db')
 | 
						|
    cursor = conn.cursor()
 | 
						|
    cursor.execute('INSERT OR IGNORE INTO sent_wikis (id) VALUES (?)', (wiki_id,))
 | 
						|
    conn.commit()
 | 
						|
    conn.close()
 | 
						|
 | 
						|
def send_to_rocket_chat(wiki_title, wiki_link, wiki_id, description):
 | 
						|
    payload = {
 | 
						|
        "alias": "Wiki",
 | 
						|
        "text": f"{description}",
 | 
						|
        "attachments": [
 | 
						|
            {
 | 
						|
                "title": wiki_link,
 | 
						|
                "title_link": wiki_link,
 | 
						|
                "color": "#764FA5"
 | 
						|
            }
 | 
						|
        ]
 | 
						|
    }
 | 
						|
    response = requests.post(ROCKET_CHAT_URL, json=payload)
 | 
						|
    if response.status_code == 200:
 | 
						|
        logging.info(f"Wiki notification sent successfully: {wiki_title} (ID: {wiki_id})")
 | 
						|
    else:
 | 
						|
        logging.error(f"Failed to send wiki notification: {response.status_code} - {response.text}")
 | 
						|
 | 
						|
def send_startup_message():
 | 
						|
    payload = {
 | 
						|
        "alias": "Wiki",
 | 
						|
        "text": "Wiki to Rocket.Chat integration started successfully.",
 | 
						|
    }
 | 
						|
    response = requests.post(ROCKET_CHAT_URL, json=payload)
 | 
						|
    if response.status_code == 200:
 | 
						|
        logging.info("Startup message sent successfully to Rocket.Chat.")
 | 
						|
    else:
 | 
						|
        logging.error(f"Failed to send startup message: {response.status_code} - {response.text}")
 | 
						|
 | 
						|
def fetch_Wiki_feed(original_domain):
 | 
						|
    try:
 | 
						|
        response = requests.get(
 | 
						|
            Wiki_RSS_URL,
 | 
						|
            headers={'Host': original_domain},
 | 
						|
            verify=True
 | 
						|
        )
 | 
						|
        feed = feedparser.parse(response.content)
 | 
						|
        return feed.entries
 | 
						|
    except Exception as e:
 | 
						|
        logging.error(f"RSS request to wiki failed: {str(e)}")
 | 
						|
        return []
 | 
						|
 | 
						|
def clear_sent_wiki_database():
 | 
						|
    conn = sqlite3.connect('sent_wikis.db')
 | 
						|
    cursor = conn.cursor()
 | 
						|
    cursor.execute('DELETE FROM sent_wikis')
 | 
						|
    conn.commit()
 | 
						|
    conn.close()
 | 
						|
    logging.info("Cleared the sent database.")
 | 
						|
 | 
						|
def main(one_shot=False, empty_db=False):
 | 
						|
    global Wiki_RSS_URL
 | 
						|
    
 | 
						|
    try:
 | 
						|
        wiki_ip = get_ip_address(WIKI_DOMAIN)
 | 
						|
        Wiki_RSS_URL = f"http://{wiki_ip}/api.php?hidebots=1&urlversion=2&days=7&limit=50&action=feedrecentchanges&feedformat=rss"
 | 
						|
        logging.info(f"Resolved Wiki URL to: {Wiki_RSS_URL}")
 | 
						|
    except RuntimeError as e:
 | 
						|
        logging.error(str(e))
 | 
						|
        return
 | 
						|
 | 
						|
    setup_database()
 | 
						|
 | 
						|
    if empty_db:
 | 
						|
        clear_sent_wiki_database()
 | 
						|
 | 
						|
    send_startup_message()
 | 
						|
 | 
						|
    while True:
 | 
						|
        entries = fetch_Wiki_feed(WIKI_DOMAIN)
 | 
						|
 | 
						|
        for entry in entries:
 | 
						|
            json_data = json.dumps(entry, indent=2)
 | 
						|
            data = json.loads(json_data)
 | 
						|
 | 
						|
            title = data["title"]
 | 
						|
            published_date = data["published"]
 | 
						|
            author = data["author"]
 | 
						|
            revision_link = data["link"]
 | 
						|
 | 
						|
            formatted_string = (
 | 
						|
                f"Title: {title}\n"
 | 
						|
                f"Published Date: {published_date}\n"
 | 
						|
                f"Author: {author}\n"
 | 
						|
                f"Link to Comparison (old vs new): {revision_link}"
 | 
						|
            )
 | 
						|
            
 | 
						|
            match = re.search(r'(.*?title=.*?)(.*)', data["id"])
 | 
						|
 | 
						|
            if match:
 | 
						|
                # Replace spaces with underscores in the title
 | 
						|
                title_modified = title.replace(' ', '_')
 | 
						|
                # Concatenate match.group(1) and modified title
 | 
						|
                wiki_link = match.group(1) + title_modified           
 | 
						|
                wiki_id = match.group(2)
 | 
						|
            else:
 | 
						|
                wiki_id = "Unexpected link"
 | 
						|
                wiki_link = "Unexpected link"
 | 
						|
 | 
						|
            if not has_wiki_been_sent(wiki_id):
 | 
						|
                send_to_rocket_chat(title, wiki_link, wiki_id, formatted_string)
 | 
						|
                mark_wiki_as_sent(wiki_id)
 | 
						|
                
 | 
						|
                if one_shot:
 | 
						|
                    logging.info("One-shot mode activated; exiting after sending one message.")
 | 
						|
                    return
 | 
						|
 | 
						|
        if not one_shot:
 | 
						|
            time.sleep(60)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    parser = argparse.ArgumentParser(description="Wiki to Rocket.Chat integration.")
 | 
						|
    parser.add_argument('--one-shot', action='store_true', help='Exit after sending one message.')
 | 
						|
    parser.add_argument('--empty-db', action='store_true', help='Clear the database of sent entries.')
 | 
						|
 | 
						|
    args = parser.parse_args()
 | 
						|
    
 | 
						|
    main(one_shot=args.one_shot, empty_db=args.empty_db) |