# # Mailstats.py # # # This script provides daily SpamFilter statistics. # # Mailstats # # Optional arguments: # -h, --help show this help message and exit # -d DATE, --date DATE Specify a valid date (yyyy-mm-dd) for the analysis # -ef EMAILFILE, --emailfile EMAILFILE # Save an html file of the email sent (y/N) # -tf TEXTFILE, --textfile TEXTFILE # Save a txt file of the html page (y/N) # --version show program's version number and exit # # # (June 2024 - bjr) Re-written in Python from Mailstats.pl (Perl) to conform to SME11 / Postfix / qpsmtpd log formats # and html output added # # Todo: # 2 Other stats # 3. Extra bits for sub tables - DONE # 4. Percent char causes sort to fail - look at adding it in the template - DONE # 5. Chase disparity in counts betweeen old mailstats and this - Some of it DONE # 6. Count emails delivered over ports 25/587/465 (SMTPS?) # 7. Arrange that the spec file overwrites the date even if it has been overwritten before # 8. Allow mailstats pages to be public or private (=> templating the fragment)) - DONE # 9. Update format of the summarylogs page # 10. Add in links to summarylogs in web pages # # Future: # 1. Write summary line for each transaction to DB and link to it through cell in main table -DONE (write to DB)) # 2. Make DB password something more obscure. # 3. Prune the DB according to parameter # # Even more Future (if ever)) # 2. Link each summary line through DB to actual transaction lines # # Centos7: # yum install python3-chameleon --enablerepo=epel # yum install html2text --enablerepo=epel # yum install mysql-connector-python --enablerepo=epel (not sure if this is required as well the pip3)) # pip3 install mysql-connector # # Rocky8: (probably - not yet checked this) # # dnf install python3-chameleon --enablerepo=epel # dnf install html2text --enablerepo=epel # # from datetime import datetime, timedelta import sys from chameleon import PageTemplateFile,PageTemplate import pkg_resources import re import ipaddress import subprocess import os from collections import defaultdict import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import codecs import argparse import tempfile import mysql.connector Mailstats_version = '1.2' build_date_time = "2024-06-18 12:03:40OURCE" build_date_time = build_date_time[:19] #Take out crap that sneaks in. #if build_date_time == "2024-06-18 12:03:40OURCE": # build_date_time = "Unknown" script_dir = os.path.dirname(os.path.abspath(__file__)) data_file_path = script_dir+'/../..' #back to the top now = datetime.now() yesterday = now - timedelta(days=1) formatted_yesterday = yesterday.strftime("%Y-%m-%d") #html_page_path = data_file_path+"/home/e-smith/files/ibays/mesdb/html/mailstats/" html_page_dir = data_file_path+"/opt/mailstats/html/" template_dir = data_file_path+"/opt/mailstats/templates/" logs_dir = data_file_path+"/opt/mailstats/logs/" # Column numbering (easy to renumber or add one in) Hour = 0 WebMail = Hour + 1 Local = WebMail + 1 MailMan = Local + 1 Relay = MailMan + 1 DMARC = Relay + 1 Virus = DMARC + 1 RBLDNS = Virus + 1 Geoip = RBLDNS + 1 NonConf = Geoip + 1 RejLoad = NonConf + 1 Karma = RejLoad + 1 DelSpam = Karma + 1 QuedSpam = DelSpam + 1 Ham = QuedSpam + 1 TOTALS = Ham + 1 PERCENT = TOTALS + 1 ColTotals = 24 ColPercent = 25 import mysql.connector import json def save_summaries_to_db(date_str, hour, parsed_data): # Convert parsed_data to JSON string json_data = json.dumps(parsed_data) # Insert the record insert_query = """ INSERT INTO SummaryLogs (Date, Hour, logData) VALUES (%s, %s, %s) """ try: cursor.execute(insert_query, (date_str, hour, json_data)) conn.commit() except mysql.connector.Error as err: print(f"DB Error {date_str} {hour} : {err}") conn.rollback() def is_running_under_thonny(): # Check for the 'THONNY_USER_DIR' environment variable return 'THONNY_USER_DIR' in os.environ # Routines to access the E-Smith dbs def parse_entity_line(line): """ Parses a single line of key-value pairs. :param line: Single line string to be parsed :return: Dictionary with keys and values """ parts = line.split('|') # First part contains the entity name and type in the format 'entity_name=type' entity_part = parts.pop(0) entity_name, entity_type = entity_part.split('=') entity_dict = {'type': entity_type} for i in range(0, len(parts)-1, 2): key = parts[i] value = parts[i+1] entity_dict[key] = value return entity_name, entity_dict def parse_config(config_string): """ Parses a multi-line configuration string where each line is an entity with key-value pairs. :param config_string: Multi-line string to be parsed :return: Dictionary of dictionaries with entity names as keys """ config_dict = {} lines = config_string.strip().split('\n') for line in lines: line = line.strip() if line.startswith('#'): # Skip lines that start with '#' continue entity_name, entity_dict = parse_entity_line(line) config_dict[entity_name] = entity_dict return config_dict def read_config_file(file_path): """ Reads a configuration file and parses its contents. :param file_path: Path to the configuration file :return: Parsed configuration dictionary """ with open(file_path, 'r') as file: config_string = file.read() return parse_config(config_string) def get_value(config_dict, entity, key, default=None): """ Retrieves the value corresponding to the given key from a specific entity. :param config_dict: Dictionary of dictionaries with parsed config :param entity: Entity from which to retrieve the key's value :param key: Key whose value needs to be retrieved :param default: Default value to return if the entity or key does not exist :return: Value corresponding to the key, or the default value if the entity or key does not exist """ return config_dict.get(entity, {}).get(key, default) def is_private_ip(ip): try: # Convert string to an IPv4Address object ip_addr = ipaddress.ip_address(ip) except ValueError: return False # Define private IP ranges private_ranges = [ ipaddress.ip_network('10.0.0.0/8'), ipaddress.ip_network('172.16.0.0/12'), ipaddress.ip_network('192.168.0.0/16'), ] # Check if the IP address is within any of these ranges for private_range in private_ranges: if ip_addr in private_range: return True return False def truncate_microseconds(timestamp): # Split timestamp into main part and microseconds try: main_part, microseconds = timestamp.split('.') # Truncate the last three digits of the microseconds truncated_microseconds = microseconds[:-3] # Combine the main part and truncated microseconds truncated_timestamp = f"{main_part}.{truncated_microseconds}" except Exception as e: print(f"{e} {timestamp}") raise ValueError # Remove the microseconds completely if they exist return truncated_timestamp.split('.')[0] def read_in_relevant_log_file(file_path,analysis_date=yesterday): # Read the file and split each line into a list - timestamp and the rest log_entries = [] skip_record_count = 0 ignore_record_count = 0 with codecs.open(file_path, 'rb','utf-8', errors='replace') as file: try: for Line in file: #extract time stamp try: entry = split_timestamp_and_data(Line) # compare with anal date timestamp_str = truncate_microseconds(entry[0]) except ValueError as e: #print(f"ValueError {e} on timestamp create {timestamp_str}:{entry[0]} {entry[1]}") skip_record_count += 1 continue # Parse the timestamp string into a datetime object # Ignoring extra microseconds try: timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S") except ValueError as e: print(f"ValueError {e} on timestamp extract {timestamp_str}:{entry[1]}") #print(f"{timestamp.date()} {analysis_date.date()}") #quit() if timestamp.date() == analysis_date.date(): log_entries.append((timestamp, entry[1])) else: ignore_record_count += 1 except UnicodeDecodeError as e: #print(f"{Line} {len(log_entries)} {e} ") pass return [log_entries,skip_record_count,ignore_record_count] def filter_summary_records(log_entries): # Return just the summary records filtered_log_entries = [] skipped_entry_count = 0 for line in log_entries: #print(line) #quit() if '`' in line[1]: filtered_log_entries.append(line) else: skipped_entry_count += 1 return [filtered_log_entries,skipped_entry_count] def sort_log_entries(log_entries): # Sort the records, based on the timestamp sorted_entries = sorted(log_entries, key=lambda x: x[0]) # and return a dictionary sorted_dict = {entry[0]: entry[1] for entry in sorted_entries} return sorted_dict def parse_data(data): # Split data string into parts and map to named fields. # Adjust the field names and parsing logic according to your data format. # Split at the backtick - before it fields split at space, after, fields split at tab parts = data.split('`') #print(f"{parts[0]}:{parts[1]}") fields1 = parts[0].strip().split() if len(parts) > 0 else [] fields2 = parts[1].split('\t') if len(parts) > 1 else [] # then merge them fields = fields1 + fields2 # if fields[4] == 'localhost': # i = 0 # print(f"len:{len(fields)}") # for part in fields: # print(f"{i}: {part}") # i = i +1 # quit() # and mapping: try: return_dict = { 'id': fields[0].strip() if len(fields) > 0 else None, 'action': fields[1].strip() if len(fields) > 1 else None, 'logterse': fields[2].strip() if len(fields) > 2 else None, 'ip': fields[3].strip() if len(fields) > 3 else None, 'sendurl': fields[4].strip() if len(fields) > 4 else None, #1 'sendurl1': fields[5].strip() if len(fields) > 5 else None, #2 'from-email': fields[6].strip() if len(fields) > 6 else None, #3 'error-reason': fields[6].strip() if len(fields) > 6 else None, #3 'to-email': fields[7].strip() if len(fields) > 7 else None, #4 'error-plugin': fields[8].strip() if len(fields) > 8 else None, #5 'action1': fields[8].strip() if len(fields) > 8 else None, #5 'error-number' : fields[9].strip() if len(fields) > 9 else None, #6 'sender': fields[10].strip() if len(fields) > 10 else None, #7 'error-msg' :fields[10].strip() if len(fields) > 10 else None, #7 'spam-status': fields[11].strip() if len(fields) > 11 else None, #8 'error-result': fields[11].strip() if len(fields) > 11 else None,#8 # Add more fields as necessary } except: #print(f"error:len:{len(fields)}") return_dict = {} #print(return_dict) #quit() return return_dict def count_entries_by_hour(log_entries): hourly_counts = defaultdict(int) for entry in log_entries: # Extract hour from the timestamp timestamp = entry['timestamp'] hour = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H') hourly_counts[hour] += 1 return hourly_counts def initialize_2d_array(num_hours, column_headers_len,reporting_date): num_hours += 1 # Adjust for the zeroth hour # Initialize the 2D list with zeroes return [[0] * column_headers_len for _ in range(num_hours)] def search_2d_list(target, data): """ Search for a target string in a 2D list of variable-length lists of strings. :param target: str, the string to search for :param data: list of lists of str, the 2D list to search :return: int, the row number where the target string is found, or -1 if not found """ for row_idx, row in enumerate(data): if target in row: return row_idx return -1 # Return -1 if not found def check_html2text_installed(): try: # Check if html2text is installed by running 'which html2text' result = subprocess.run( ['which', 'html2text'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # If the command finds html2text, it will output the path html2text_path = result.stdout.decode('utf-8').strip() if not html2text_path: raise FileNotFoundError print(f"html2text is installed at: {html2text_path}") return True except subprocess.CalledProcessError: print("html2text is not installed. Please install it using your package manager.", file=sys.stderr) return False def html_to_text(input_file, output_file): if not check_html2text_installed(): sys.exit(1) try: # Run the html2text command with -b0 --pad-tables parameters result = subprocess.run( ['html2text', '-b0', '--pad-tables', input_file], check=True, # Raise a CalledProcessError on non-zero exit stdout=subprocess.PIPE, # Capture stdout stderr=subprocess.PIPE # Capture stderr ) # Write the stdout from the command to the output file with open(output_file, 'w', encoding='utf-8') as outfile: outfile.write(result.stdout.decode('utf-8')) print(f"Converted {input_file} to {output_file}") except subprocess.CalledProcessError as e: print(f"Error occurred: {e.stderr.decode('utf-8')}", file=sys.stderr) sys.exit(e.returncode) def get_html2text_version(): try: result = subprocess.run(['html2text', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # Ensure the result is treated as a string in Python 3.6+ return result.stdout.strip() except subprocess.CalledProcessError as e: print(f"Error occurred while checking html2text version: {e}", file=sys.stderr) return None def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=50, fill='█', print_end="\r"): """ Call in a loop to create a terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) print_end - Optional : end character (e.g. "\r", "\r\n") (Str) """ if total == 0: raise ValueError("Progress total is zero") percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filled_length = int(length * iteration // total) bar = fill * filled_length + '-' * (length - filled_length) print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end) # Print New Line on Complete if iteration == total: print() def insert_string_after(original:str, to_insert:str, after:str) -> str: """ Insert to_insert into original after the first occurrence of after. :param original: The original string. :param to_insert: The string to be inserted. :param after: The set of characters after which the string will be inserted. :return: The new string with to_insert inserted after after. """ position = original.find(after) if position == -1: print(f"insert_string_after:({after}) string is not found in original") return original # Position of the insertion point insert_pos = position + len(after) return original[:insert_pos] + to_insert + original[insert_pos:] def split_timestamp_and_data(log_entry: str) -> list: """ Split a log entry into timestamp and the rest of the data. :param log_entry: The log entry as a string. :return: A list with two entries: [timestamp, rest_of_data]. """ # The timestamp is always the first part, up to the first space after the milliseconds parts = log_entry.split(' ', 2) if len(parts) < 3: raise ValueError(f"The log entry format is incorrect {parts}") timestamp = ' '.join(parts[:2]) rest_of_data = parts[2] return [timestamp, rest_of_data] def render_sub_table(table_title,table_headers,found_values,get_character=None): # Get the total total_sum = sum(found_values.values()) # and add in list with second element the percentage # Create a list of tuples with each tuple containing (key, value, percentage) if get_character: sub_result = [(key, value, f"{round(value / total_sum * 100, 2)}", f"{get_character(key)}") for key, value in found_values.items() ] else: sub_result = [(key, value, f"{round(value / total_sum * 100, 2)}") for key, value in found_values.items() ] sub_result.sort(key=lambda x: float(x[2]), reverse=True) # Sort by percentage in descending order sub_template_path = template_dir+'mailstats-sub-table.html.pt' # Load the template with open(sub_template_path, 'r') as template_file: template_content = template_file.read() # Create a Chameleon template instance try: template = PageTemplate(template_content) # Render the template with the 2D array data and column headers try: rendered_html = template(array_2d=sub_result, column_headers=table_headers, title=table_title) except Exception as e: raise ValueError(f"{table_title}: A chameleon controller render error occurred: {e}") except Exception as e: raise ValueError(f"{table_title}: A chameleon controller template error occurred: {e}") return rendered_html def get_character_in_reject_list(code): if code in BadCountries: return "*" else: return "" def read_html_from_file(filepath): """ Reads HTML content from a given file. Args: filepath (str): Path to the HTML file. Returns: str: HTML content of the file. """ # Need to add in here the contents of the css file at the end of the head section. with open(filepath, 'r', encoding='utf-8') as file: html_contents = file.read() print("reading from html file") #print(len(html_contents)) # Get Filepath css_path = os.path.dirname(filepath)+"/../css/mailstats.css" #print(css_path) # Read in CSS with open(css_path, 'r', encoding='utf-8') as file: css_contents = file.read() #print(len(css_contents)) html_contents = insert_string_after(html_contents,"\n"+css_contents,"") #print(len(html_contents)) return html_contents def read_text_from_file(filepath): """ Reads plain text content from a given file. Args: filepath (str): Path to the text file. Returns: str: Text content of the file. """ try: with open(filepath, 'r', encoding='utf-8') as file: return file.read() except: print(f"{filepath} not found") return def send_email(subject, from_email, to_email, smtp_server, smtp_port, HTML_content=None, Text_content=None, smtp_user=None, smtp_password=None): """ Sends an HTML email. Args: html_content (str): The HTML content to send in the email. subject (str): The subject of the email. from_email (str): The sender's email address. to_email (str): The recipient's email address. smtp_server (str): SMTP server address. smtp_port (int): SMTP server port. smtp_user (str, optional): SMTP server username. Default is None. smtp_password (str, optional): SMTP server password. Default is None. """ #Example (which works!) # send_email( # subject="Your subject", # from_email="mailstats@bjsystems.co.uk", # to_email="brianr@bjsystems.co.uk", # smtp_server="mail.bjsystems.co.uk", # smtp_port=25 # HTML_content=html_content, # Text_content=Text_content, # ) # Set up the email msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = from_email msg['To'] = to_email if HTML_content: part = MIMEText(HTML_content, 'html') msg.attach(part) if Text_content: part = MIMEText(Text_content, 'plain') msg.attach(part) # Sending the email with smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() # Upgrade the connection to secure if smtp_user and smtp_password: server.login(smtp_user, smtp_password) # Authenticate only if credentials are provided server.sendmail(from_email, to_email, msg.as_string()) def replace_between(text, start, end, replacement): # Escaping start and end in case they contain special regex characters pattern = re.escape(start) + '.*?' + re.escape(end) # Using re.DOTALL to match any character including newline replaced_text = re.sub(pattern, replacement, text, flags=re.DOTALL) return replaced_text def get_heading(): # # Needs from anaytsis # SATagLevel - done # SARejectLevel - done # warnnoreject - done # totalexamined - done # emailperhour - done # spamavg - done # rejectspamavg - done # hamavg - done # DMARCSendCount - done # hamcount - done # DMARCOkCount - deone # Clam Version/DB Count/Last DB update clam_output = subprocess.getoutput("freshclam -V") clam_info = f"Clam Version/DB Count/Last DB update: {clam_output}" # SpamAssassin Version sa_output = subprocess.getoutput("spamassassin -V") sa_info = f"SpamAssassin Version: {sa_output}" # Tag level and Reject level tag_reject_info = f"Tag level: {SATagLevel}; Reject level: {SARejectLevel} {warnnoreject}" # SMTP connection stats smtp_stats = f"External SMTP connections accepted: {totalexternalsmtpsessions}\n"\ f"Internal SMTP connections accepted: {totalinternalsmtpsessions}" if len(connection_type_counts)>0: for connect_type in connection_type_counts.keys(): smtp_stats = smtp_stats + f"\nCount of {connection_type} connections:{connection_type_counts[connect_type]}" smtp_stats = smtp_stats + f"\nEmails per hour: {emailperhour:.1f}/hr\n"\ f"Average spam score (accepted): {spamavg or 0:.2f}\n"\ f"Average spam score (rejected): {rejectspamavg or 0:.2f}\n"\ f"Average ham score: {hamavg or 0:.2f}\n"\ f"Number of DMARC reporting emails sent: {DMARCSendCount or 0} (not shown on table)" # DMARC approved emails dmarc_info = "" if hamcount != 0: dmarc_ok_percentage = DMARCOkCount * 100 / hamcount dmarc_info = f"Number of emails approved through DMARC: {DMARCOkCount or 0} ({dmarc_ok_percentage:.2f}% of Ham count)" # Accumulate all strings header_str = "\n".join([clam_info, sa_info, tag_reject_info, smtp_stats, dmarc_info]) # switch newlines to
header_str = header_str.replace("\n","
") return header_str if __name__ == "__main__": try: chameleon_version = pkg_resources.get_distribution("Chameleon").version except pkg_resources.DistributionNotFound: chameleon_version = "Version information not available" python_version = sys.version python_version = python_version[:8] current_datetime = datetime.now() formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M") # Command line parameters parser = argparse.ArgumentParser(description="Mailstats") parser.add_argument('-d', '--date', help='Specify a valid date (yyyy-mm-dd) for the analysis', default=formatted_yesterday) parser.add_argument('-ef', '--emailfile', help='Save an html file of the email sent (y/N)', default='n') parser.add_argument('-tf', '--textfile', help='Save a txt file of the html page (y/N)', default='n') parser.add_argument('--version', action='version', version='%(prog)s '+Mailstats_version+" built on "+build_date_time) parser.add_argument('-db', '--dbsave', help='Force save of summary logs in DB (y/N)', default='n') args = parser.parse_args() analysis_date = args.date # and check its format is valid try: datetime.strptime(analysis_date, '%Y-%m-%d') except ValueError: print("Specify a valid date (yyyy-mm-dd) for the analysis") quit() anaysis_date_obj = datetime.strptime(analysis_date, '%Y-%m-%d') noemailfile = args.emailfile.lower() == 'n' notextfile = args.textfile.lower() == 'n' isThonny = is_running_under_thonny() forceDbSave = args.dbsave.lower() == 'y' #E-Smith Config DBs if isThonny: db_dir = "/home/brianr/SME11Build/GITFiles/smecontribs/smeserver-mailstats/" else: db_dir = "/home/e-smith/db/" #From SMEServer DB ConfigDB = read_config_file(db_dir+"configuration") DomainName = get_value(ConfigDB, "DomainName", "type") #'bjsystems.co.uk' # $cdb->get('DomainName')->value; hello_string = "Mailstats:"+Mailstats_version+' for '+DomainName+" at "+formatted_datetime+" for "+analysis_date print(hello_string) version_string = "Chameleon:"+chameleon_version+" Python:"+python_version if isThonny: version_string = version_string + "...under Thonny" print(version_string) RHSenabled = get_value(ConfigDB, "qpsmtpd", "RHSBL","disabled") == "enabled" #True #( $cdb->get('qpsmtpd')->prop('RHSBL') eq 'enabled' ); DNSenabled = get_value(ConfigDB, "qpsmtpd", "DNSBL","disabled") == "enabled" #True #( $cdb->get('qpsmtpd')->prop('DNSBL') eq 'enabled' ); SARejectLevel = int(get_value(ConfigDB, "spamassassin", "RejectLevel","12")) #12 #$cdb->get('spamassassin')->prop('RejectLevel'); SATagLevel = int(get_value(ConfigDB, "spamassassin", "TagLevel","4")) #4 #$cdb->get('spamassassin')->prop('TagLevel'); if SARejectLevel == 0: warnnoreject = "(*Warning* 0 = no reject)" else: warnnoreject = "" EmailAddress = get_value(ConfigDB,"mailstats","Email","admin@"+DomainName) if '@' not in EmailAddress: EmailAddress = EmailAddress+"@"+DomainName EmailTextOrHTML = get_value(ConfigDB,"mailstats","EmailTextOrHTML","Both") #Text or Both or None EmailHost = get_value(ConfigDB,"mailstats","EmailHost","localhost") #Default will be localhost EmailPort = int(get_value(ConfigDB,"mailstats","EmailPort","25")) EMailSMTPUser = get_value(ConfigDB,"mailstats","EmailUser") #None = default => no authenticatioon needed EMailSMTPPassword = get_value(ConfigDB,"mailstats","EmailPassword") BadCountries = get_value(ConfigDB,"qpsmtpd","BadCountries") # Db save control saveData = get_value(ConfigDB,"mailstats","SaveDataToMySQL","no") == 'yes' or forceDbSave if saveData: DBName = "mailstats"; DBHost = get_value(ConfigDB,'mailstats','DBHost',"localhost") DBPort = get_value(ConfigDB,'mailstats','DBPort',"3306") DBName = 'mailstats' DBPassw = 'mailstats' DBUser = 'mailstats' UnixSocket = "/var/lib/mysql/mysql.sock" # see if the DB exists # Try to Establish a database connection try: conn = mysql.connector.connect( host=DBHost, user=DBUser, password=DBPassw, database=DBName, port=DBPort, unix_socket=UnixSocket ) cursor = conn.cursor() # Create table if it doesn't exist cursor.execute(""" CREATE TABLE IF NOT EXISTS SummaryLogs ( id INT AUTO_INCREMENT PRIMARY KEY, Date DATE, Hour INT, logData TEXT ) """) # and prune the DB here if needed. # Delete existing records for the given date try: delete_query = """ DELETE FROM SummaryLogs WHERE Date = %s """ cursor.execute(delete_query, (analysis_date,)) #Don't forget the syntactic sugar of the extra comma to make it a tuple! # Get the number of records deleted rows_deleted = cursor.rowcount print(rows_deleted) #quit() if rows_deleted > 0: print(f"Deleted {rows_deleted} rows for {analysis_date} ") except mysql.connector.Error as e: print(f"SQL Delete failed ({delete_query}) ({e}) ") except mysql.connector.Error as e: print(f"Unable to connect to {DBName} on {DBHost} port {DBPort} error ({e}) ") saveData = False # Not sure we need these... # if (ConfigDB,"qpsmtpd","RHSBL").lower() == 'enabled': # RBLList = get_value(ConfigDB,"qpsmtpd","RBLList") # else: # RBLList = "" # if (ConfigDB,"qpsmtpd","RBLList").lower() == 'enabled': # SBLLIst = get_value(ConfigDB,"qpsmtpd","SBLLIst") # else: # RBLList = "" # if (ConfigDB,"qpsmtpd","RBLList").lower() == 'enabled': # UBLList = get_value(ConfigDB,"qpsmtpd","UBLLIst") # else: # RBLList = "" FetchmailIP = '127.0.0.200'; #Apparent Ip address of fetchmail deliveries WebmailIP = '127.0.0.1'; #Apparent Ip of Webmail sender localhost = 'localhost'; #Apparent sender for webmail FETCHMAIL = 'FETCHMAIL'; #Sender from fetchmail when Ip address not 127.0.0.200 - when qpsmtpd denies the email MAILMAN = "bounces"; #sender when mailman sending when orig is localhost DMARCDomain="dmarc"; #Pattern to recognised DMARC sent emails (this not very reliable, as the email address could be anything) DMARCOkPattern="dmarc: pass"; #Pattern to use to detect DMARC approval num_hours = 25 # Represents hours from 0 to 23 - adds extra one for column totals and another for percentages log_file = logs_dir+'current.log' log_entries,skip_count,ignored_count = read_in_relevant_log_file(log_file,anaysis_date_obj) # if len(log_entries) == 0: # print(f"No records found in {log_file}") # quit() # else: print(f"Found {len(log_entries)} entries in log for for {anaysis_date_obj.strftime('%Y-%m-%d')} Ignored: {ignored_count} skipped: {skip_count}") summary_log_entries,skip_count = filter_summary_records(log_entries) print(f"Found {len(summary_log_entries)} summary entries and skipped {skip_count} entries") sorted_log_dict = sort_log_entries(summary_log_entries) print(f"Sorted {len(sorted_log_dict)} entries") columnHeaders = ['Count','WebMail','Local','MailMan','Relay','DMARC','Virus','RBL/DNS','Geoip.','Non.Conf.','Karma','Rej.Load','Del.Spam','Qued.Spam?',' Ham','TOTALS','PERCENT'] # dict for each colum identifying plugin that increments count columnPlugin = [''] * 17 columnPlugin[Hour] = [] columnPlugin[WebMail] = [] columnPlugin[Local] = [] columnPlugin[MailMan] = [] columnPlugin[DMARC] = ['dmarc'] columnPlugin[Virus] = ['pattern_filter', 'virus::pattern_filter','virus::clamav'] columnPlugin[RBLDNS] = ['rhsbl', 'dnsbl','uribl'] columnPlugin[Geoip] = ['check_badcountries'] columnPlugin[NonConf] = ['check_earlytalker','check_relay','check_norelay', 'require_resolvable_fromhost' ,'check_basicheaders','check_badmailfrom','check_badrcptto_patterns' ,'check_badrcptto','check_spamhelo','check_goodrcptto extn','rcpt_ok' ,'check_goodrcptto','check_smtp_forward','count_unrecognized_commands','tls','auth::auth_cvm_unix_local' ,'auth::auth_imap', 'earlytalker','resolvable_fromhost','relay','headers','mailfrom','badrcptto','helo' ,'check_smtp_forward','sender_permitted_from'] columnPlugin[RejLoad] = ['loadcheck'] columnPlugin[DelSpam] = [] columnPlugin[QuedSpam] = [] columnPlugin[Ham] = [] columnPlugin[TOTALS] = [] columnPlugin[PERCENT] = [] columnPlugin[Karma] = ['karma'] columnHeaders_len = len(columnHeaders) columnCounts_2d = initialize_2d_array(num_hours, columnHeaders_len,analysis_date) virus_pattern = re.compile(r"Virus found: (.*)") found_viruses = defaultdict(int) found_qpcodes = defaultdict(int) qpcodes_pattern = re.compile(r"(\(.*\)).*'") i = 0; sorted_len= len(sorted_log_dict) #unless none to show spamavg = 0; spamqueuedcount = 0 hamcount = 0 hamavg = 0 rejectspamcount = 0 rejectspamavg = 0 DMARCSendCount = 0 totalexamined = 0 if sorted_len > 0: if isThonny: # Initial call to print the progress bar print_progress_bar(0, sorted_len, prefix='Progress:', suffix='Complete', length=50) for timestamp, data in sorted_log_dict.items(): i += 1 totalexamined += 1 if isThonny: print_progress_bar(i, sorted_len, prefix='Scanning for main table:', suffix='Complete', length=50) # Count of in which hour it falls # Parse the timestamp string into a datetime object dt = timestamp hour = dt.hour # parse the data parsed_data = parse_data(data) # Save the data here if necessary if saveData: save_summaries_to_db(anaysis_date_obj.strftime('%Y-%m-%d'),hour,parsed_data) # Increment Count in which headings it falls #Hourly count and column total columnCounts_2d[hour][Hour] += 1 columnCounts_2d[ColTotals][Hour] += 1 #Row Totals columnCounts_2d[hour][TOTALS] += 1 #Total totals columnCounts_2d[ColTotals][TOTALS] += 1 # first spot the fetchmail and local deliveries. #Local send if DomainName in parsed_data['sendurl']: columnCounts_2d[hour][Local] += 1 columnCounts_2d[ColTotals][Local] += 1 #Relay or webmail elif not is_private_ip(parsed_data['ip']) and is_private_ip(parsed_data['sendurl1']) and parsed_data['action1'] == 'queued': #Relay columnCounts_2d[hour][Relay] += 1 columnCounts_2d[ColTotals][Relay] += 1 elif WebmailIP in parsed_data['sendurl1'] and not is_private_ip(parsed_data['ip']): #webmail columnCounts_2d[hour][WebMail] += 1 columnCounts_2d[ColTotals][WebMail] += 1 elif localhost in parsed_data['sendurl']: # but not if it comes from fetchmail if not FETCHMAIL in parsed_data['sendurl1']: # might still be from mailman here if MAILMAN in parsed_data['sendurl1']: #$mailmansendcount++; #$localsendtotal++; columnCounts_2d[hour][MailMan] += 1 columnCounts_2d[ColTotals][MailMan] += 1 #$counts{$abshour}{$CATMAILMAN}++; #$localflag = 1; else: #Or sent to the DMARC server #check for email address in $DMARC_Report_emails string #my $logemail = $log_items[4]; if DMARCDomain in parsed_data['from-email']: #(index($DMARC_Report_emails,$logemail)>=0) or #$localsendtotal++; DMARCSendCount += 1 #localflag = 1; else: # ignore incoming localhost spoofs if not 'msg denied before queued' in parsed_data['error-msg']: #Webmail #$localflag = 1; #$WebMailsendtotal++; columnCounts_2d[hour][WebMail] += 1 columnCounts_2d[ColTotals][WebMail] += 1 #$WebMailflag = 1; else: #$localflag = 1; #$WebMailsendtotal++; #$WebMailflag = 1; columnCounts_2d[hour][WebMail] += 1 columnCounts_2d[ColTotals][WebMail] += 1 #Queued email if parsed_data['action'] == '(queue)': columnCounts_2d[hour][Ham] += 1 columnCounts_2d[ColTotals][Ham] += 1 # spamassassin not rejected if parsed_data.get('spam-status') is not None and isinstance(parsed_data['spam-status'], str): if parsed_data['spam-status'].lower().startswith('no'): #Extract other parameters from this string # example: No, score=-3.9 spam_pattern = re.compile(r'score=(-?\d+\.\d+) required=(-?\d+\.\d+)') match = re.search(spam_pattern, parsed_data['spam-status']) if match: score = float(match.group(1)) #print(score,SATagLevel) if score < float(SATagLevel): # Accumulate allowed score (inc negatives?) hamavg += score hamcount += 1 #spamassasin rejects if parsed_data.get('spam-status') is not None and isinstance(parsed_data['spam-status'], str): if parsed_data['spam-status'].lower().startswith('yes'): #Extract other parameters from this string # example: Yes, score=10.3 required=4.0 autolearn=disable spam_pattern = re.compile(r'score=(-?\d+\.\d+) required=(-?\d+\.\d+)') match = re.search(spam_pattern, parsed_data['spam-status']) if match: score = float(match.group(1)) required = float(match.group(2)) #print(f"{parsed_data['spam-status']} / {score} {required}") if score >= SARejectLevel: columnCounts_2d[hour][DelSpam] += 1 columnCounts_2d[ColTotals][DelSpam] += 1 rejectspamavg += score rejectspamcount += 1 elif score >= required: columnCounts_2d[hour][QuedSpam] += 1 columnCounts_2d[ColTotals][QuedSpam] += 1 spamavg += score spamqueuedcount += 1 # Count the qpsmtpd codes if parsed_data['error-plugin'].strip() == 'naughty': #print(f"Found naughty {parsed_data['error-msg']}") if parsed_data['error-msg'].startswith("(dnsbl)"): #print("Found dnsbl") columnCounts_2d[hour][RBLDNS]+= 1 columnCounts_2d[ColTotals][RBLDNS]+= 1 elif parsed_data['error-msg'].startswith("(karma)"): columnCounts_2d[hour][KARMA] += 1 columnCounts_2d[ColTotals][KARMA]+= 1 elif parsed_data['error-msg'].startswith("(helo)"): columnCounts_2d[hour][RBLDNS] += 1 columnCounts_2d[ColTotals][RBLDNS]+= 1 else: match = qpcodes_pattern.match(parsed_data['action1']) if match: rejReason = match.group(1) found_qpcodes[parsed_data['error-plugin']+"-"+rejReason] += 1 else: found_qpcodes[parsed_data['action1']] += 1 #Now increment the column which the plugin name indicates if parsed_data['action'] == '(deny)' and parsed_data['error-plugin']: #print(f"Found plugin {parsed_data['error-plugin']}") if parsed_data['error-plugin']: row = search_2d_list(parsed_data['error-plugin'],columnPlugin) #print(row,parsed_data['error-plugin']) if not row == -1: #print(f"Found row: {row}") columnCounts_2d[hour][row] += 1 columnCounts_2d[ColTotals][row] += 1 # a few ad hoc extra extractons of data if row == Virus: match = virus_pattern.match(parsed_data['action1']) if match: found_viruses[match.group(1)] += 1 else: found_viruses[parsed_data['action1']] += 1 else: found_qpcodes[parsed_data['action1']] += 1 if isThonny: print() #seperate the [progress bar] # Compute percentages total_Count = columnCounts_2d[ColTotals][TOTALS] #Column of percentages for row in range(ColTotals): if total_Count == 0: percentage_of_total = 0 else: percentage_of_total = f"{round(round(columnCounts_2d[row][TOTALS] / total_Count,4) * 100,1)}%" columnCounts_2d[row][PERCENT] = percentage_of_total #Row of percentages for col in range(TOTALS): if total_Count == 0: percentage_of_total = 0 else: percentage_of_total = f"{round(round(columnCounts_2d[ColTotals][col] / total_Count,4) * 100,1)}%" columnCounts_2d[ColPercent][col] = percentage_of_total # and drop in the 100% to make it look correct! columnCounts_2d[ColPercent][PERCENT] = '100%' columnCounts_2d[ColTotals][PERCENT] = '100%' columnCounts_2d[ColPercent][TOTALS] = '100%' #other stats emailperhour = (totalexamined / 24) if not spamqueuedcount == 0: spamavg = spamavg / spamqueuedcount if not rejectspamcount == 0: rejectspamavg = rejectspamavg / rejectspamcount if not hamcount == 0: hamavg = hamavg / hamcount # Now scan for the other lines in the log of interest found_countries = defaultdict(int) geoip_pattern = re.compile(r".*check_badcountries: GeoIP Country: (.*)") dmarc_pattern = re.compile(r".*dmarc: pass") helo_pattern = re.compile(r".*Accepted connection.*?from (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) \/ ([\w.-]+)") connect_type_pattern = re.compile(r".*connect via (.*)") total_countries = 0 DMARCOkCount = 0 totalinternalsmtpsessions = 0 totalexternalsmtpsessions = 0 i = 0 j = 0 log_len = len(log_entries) connection_type_counts = defaultdict(int) if log_len > 0: if isThonny: print_progress_bar(0, log_len, prefix='Progress:', suffix='Complete', length=50) for data in log_entries: i += 1 if isThonny: print_progress_bar(i, log_len, prefix='Scanning for sub tables:', suffix='Complete', length=50) # Match initial connection message match = helo_pattern.match(data[1]) if match: ip = match.group(1) fqdn = match.group(2) if is_private_ip(ip): totalinternalsmtpsessions += 1 else: totalexternalsmtpsessions += 1 continue #Pull out Geoip countries for analysis table match = geoip_pattern.match(data[1]) if match: j += 1 country = match.group(1) found_countries[country] += 1 total_countries += 1 continue #Pull out DMARC approvals match = dmarc_pattern.match(data[1]) if match: DMARCOkCount += 1 continue #Pull out type of connection match = connect_type_pattern.match(data[1]) if match: connection_type = match.group(1) connection_type_counts[connection_type] += 1 continue #print(columnCounts_2d) #quit() #Now apply the results to the chameleon template - main table # Path to the template file template_path = template_dir+'mailstats.html.pt' # Load the template with open(template_path, 'r') as template_file: template_content = template_file.read() # Create a Chameleon template instance try: template = PageTemplate(template_content) # Render the template with the 2D array data and column headers try: rendered_html = template(array_2d=columnCounts_2d, column_headers=columnHeaders, reporting_date=analysis_date, title=hello_string, version=version_string) except Exception as e: print(f"Chameleon template Exception {e}") except Exception as e: print(f"Chameleon render Exception {e}") total_html = rendered_html # Add in the header information rendered_html = get_heading() total_html = insert_string_after(total_html,rendered_html, "") #add in the subservient tables.. #qpsmtd codes qpsmtpd_headers = ["Reason",'Count','Percent'] qpsmtpd_title = 'Qpsmtpd codes league table:' rendered_html = render_sub_table(qpsmtpd_title,qpsmtpd_headers,found_qpcodes) # Add it to the total total_html = insert_string_after(total_html,rendered_html, "") #Geoip Country codes geoip_headers = ['Country','Count','Percent','Rejected?'] geoip_title = 'Geoip results:' rendered_html = render_sub_table(geoip_title,geoip_headers,found_countries,get_character_in_reject_list) # Add it to the total total_html = insert_string_after(total_html,rendered_html, "") if saveData: # Close the connection cursor.close() conn.close() #Add in navigation html - next/previous/see in browser day_format = "%Y-%m-%d" # Convert the time string to a datetime object date_obj = datetime.strptime(analysis_date, day_format) # Compute the next date by adding one day next_date = date_obj + timedelta(days=1) # Compute the previous date by subtracting one day previous_date = date_obj - timedelta(days=1) # Convert the datetime objects back to strings in the desired format next_date_str = next_date.strftime(day_format) previous_date_str = previous_date.strftime(day_format) navigation_str_html = "
\ Previous\
Index of files
\ Next\
" try: template = PageTemplate(navigation_str_html) try: Nav_str = template(PreviousDate=previous_date_str,NextDate=next_date_str,TodayDate=analysis_date,DomainName=DomainName) except Exception as e: print(f"Chameleon nav template Exception {e}") except Exception as e: print(f"Chameleon nav render Exception {e}") # And insert it total_html = insert_string_after(total_html,Nav_str, "") # Write the rendered HTML to a file output_path = html_page_dir+'mailstats_for_'+analysis_date output_path = output_path.replace(' ','_') with open(output_path+'.html', 'w') as output_file: output_file.write(total_html) #and create a text version if the local version of html2text is suffiicent if get_html2text_version() == '2019.9.26': # Get a temporary file name temp_file_name = tempfile.mktemp() html_to_text(output_path+'.html',temp_file_name) print(f"Rendered HTML saved to {temp_file_name}") # and save it if required if not notextfile: text_file_path = output_path+'.txt' # and rename it os.rename(temp_file_name, text_file_path) else: text_file_path = temp_file_name else: text_file_path = "" html_content = None text_content = None #Now see if Email required if EmailTextOrHTML: if EmailTextOrHTML == "HTML" or EmailTextOrHTML == "Both": # Send html email (default)) filepath = html_page_dir+"mailstats_for_"+analysis_date+".html" html_content = read_html_from_file(filepath) # Replace the Navigation by a "See in browser" prompt replace_str = f"
See in browser
" html_content = replace_between(html_content, "
", ">Next
", replace_str) if not noemailfile: # Write out the email html to a web page email_file = html_page_dir + "Email_mailstats_for_"+analysis_date with open(email_file+'.html', 'w') as output_file: output_file.write(html_content) if EmailTextOrHTML == "Text" or EmailTextOrHTML == "Both": #filepath = html_page_dir+"mailstats_for_"+analysis_date+".txt" if not text_file_path == "": text_content = read_text_from_file(text_file_path) else: text_content = "No text avaiable as html2text (was not " if EMailSMTPUser: # Send authenticated print("Sending authenticated") send_email( html_content=email_content, subject="Mailstats for "+analysis_date, from_email="mailstats@"+DomainName, to_email=EmailAddress, smtp_server=EmailHost, smtp_port=EmailPort, HTML_content=html_content, Text_content=text_content, smtp_user=EMailSMTPUser, smtp_password=EMailSMTPPassword ) else: # No authentication print(f"Sending non authenticated {EmailAddress} {EmailHost}") try: send_email( subject="Mailstats for "+analysis_date, from_email="mailstats@"+DomainName, to_email=EmailAddress, smtp_server=EmailHost, smtp_port=EmailPort, HTML_content=html_content, Text_content=text_content ) except Exception as e: print(f"Email Exception {e}")