Files
smeserver-mailstats/root/usr/bin/mailstats.py

2096 lines
78 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# Mailstats.py
#
#
# This script provides daily SpamFilter statistics.
#
# Mailstats
#
# usage: mailstats.py [-h] [-d DATE] [-ef EMAILFILE] [-tf TEXTFILE] [--version]
# [-db DBSAVE]
#
# Mailstats
#
# optional arguments:
# -h, --help show this help message and exit
# -d DATE, --date DATE Specify a valid date (yyyy-mm-dd) for the analysis
# -ef EMAILFILE, --emailfile EMAILFILE
# Save an html file of the email sent (y/N)
# -tf TEXTFILE, --textfile TEXTFILE
# Save a txt file of the html page (y/N)
# --version show program's version number and exit
# -db DBSAVE, --dbsave DBSAVE
# Force save of summary logs in DB (y/N)
#
#
# (June 2024 - bjr) Re-written in Python from Mailstats.pl (Perl) to conform to SME11 / Postfix / qpsmtpd log formats
# and html output added
#
# Todo:
# 2 Other stats
# 3. Extra bits for sub tables - DONE
# 4. Percent char causes sort to fail - look at adding it in the template - DONE
# 5. Chase disparity in counts betweeen old mailstats and this - Some of it DONE
# 6. Count emails delivered over ports 25/587/465 (SMTPS?)
# 7. Arrange that the spec file overwrites the date even if it has been overwritten before
# 8. Allow mailstats pages to be public or private (=> templating the fragment)) - DONE
# 9. Update format of the summarylogs page - DONE but still WIP
# 10. Add in links to summarylogs in web pages - DONE but still WIP
# 11. Move showSummaryLogs.php to individual directory "/opt/mailstats/php"
# 12. Make sure other directories not visible through apache
#
# Future:
# 1. Write summary line for each transaction to DB and link to it through cell in main table -DONE (write to DB))
# 2. Make DB password something more obscure.
# 3. Prune the DB according to parameter - delete corresponding page in opt/mailstats/html
# 4. Prune the html directory according to parameter
#
# Even more Future (if ever))
# 2. Link each summary line through DB to actual transaction lines
#
# Centos7:
# yum install python3-chameleon --enablerepo=epel
# yum install html2text --enablerepo=epel
# yum install mysql-connector-python --enablerepo=epel (not sure if this is required as well the pip3))
# pip3 install mysql-connector
# pip3 install numpy
# pip3 install plotly
# pip3 install pandas
# NOTE: No matplotlib
#
# Rocky8: (probably - not yet checked this)
#
# dnf install python3-chameleon --enablerepo=epel
# dnf install html2text --enablerepo=epel
# dnf install python3-matplotlib
# pip3 install numpy
# pip3 pymysql
# pip3 install pandas
#
#
from datetime import datetime, timedelta
import sys
from chameleon import PageTemplateFile,PageTemplate
import pkg_resources
import re
import ipaddress
import subprocess
import os
from collections import defaultdict
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import codecs
import argparse
import tempfile
#import mysql.connector
import numpy as np
#import plotly.graph_objects as go
#import plotly.express as px
import colorsys
import pymysql
import json
from systemd import journal
import logging
# Configure logging
log_dir_path = "/var/log/mailstats"
# Check if the directory exists, and create it if it doesn't
if not os.path.exists(log_dir_path):
os.makedirs(log_dir_path)
logging.basicConfig(level=logging.INFO, # Default level of messages to log
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Log to console
logging.FileHandler(log_dir_path+"/mailstats.log") # Log to a file
])
enable_graphs = True; #This could be a DB entry if required.
try:
import matplotlib.pyplot as plt
except ImportError:
logging.warning("Matplotlib is not installed - no graphs")
enable_graphs = False;
Mailstats_version = '1.2'
build_date_time = "2024-06-18 12:03:40OURCE"
build_date_time = build_date_time[:19] #Take out crap that sneaks in.
#if build_date_time == "2024-06-18 12:03:40OURCE":
# build_date_time = "Unknown"
script_dir = os.path.dirname(os.path.abspath(__file__))
data_file_path = script_dir+'/../..' #back to the top
now = datetime.now()
yesterday = now - timedelta(days=1)
formatted_yesterday = yesterday.strftime("%Y-%m-%d")
#html_page_path = data_file_path+"/home/e-smith/files/ibays/mesdb/html/mailstats/"
html_page_dir = data_file_path+"/opt/mailstats/html/"
template_dir = data_file_path+"/opt/mailstats/templates/"
logs_dir = data_file_path+"/opt/mailstats/logs/"
# Column numbering (easy to renumber or add one in)
Hour = 0
WebMail = Hour + 1
Local = WebMail + 1
MailMan = Local + 1
Relay = MailMan + 1
DMARC = Relay + 1
Virus = DMARC + 1
RBLDNS = Virus + 1
Geoip = RBLDNS + 1
NonConf = Geoip + 1
RejLoad = NonConf + 1
Karma = RejLoad + 1
DelSpam = Karma + 1
QuedSpam = DelSpam + 1
Ham = QuedSpam + 1
TOTALS = Ham + 1
PERCENT = TOTALS + 1
ColTotals = 24
ColPercent = 25
def strip_ansi_codes(text):
ansi_escape = re.compile(r'\x1b\[[0-9;]*m')
return ansi_escape.sub('', text)
def replace_bracket_content(input_filename, output_filename):
import re
with open(input_filename, 'r', encoding='utf-8') as infile:
content = infile.read()
# Pattern to capture digits/spaces inside brackets
pattern = r'\[([\d\s]*)\]\(\./showSummaryLogs\.php\?date=\d{4}-\d{2}-\d{2}&hour=\d{1,2}\)'
# Pad captured group to 10 characters
replaced_content = re.sub(pattern, lambda m: f"{m.group(1):8}", content)
with open(output_filename, 'w', encoding='utf-8') as outfile:
outfile.write(replaced_content)
return f"Replacements completed. Output written to {output_filename}"
def get_logs_from_Journalctl(date='yesterday'):
# JSON-pretty output example from journalctl
# {
# "__CURSOR" : "s=21b4f015be0c4f1fb71ac439a8365ee7;i=385c;b=dd778625547f4883b572daf53ae93cd4;m=ca99d6d;t=62d6316802b05;x=71b24e9f19f3b99a",
# "__REALTIME_TIMESTAMP" : "1738753462774533",
# "__MONOTONIC_TIMESTAMP" : "212442477",
# "_BOOT_ID" : "dd778625547f4883b572daf53ae93cd4",
# "_MACHINE_ID" : "f20b7edad71a44e59f9e9b68d4870b19",
# "PRIORITY" : "6",
# "SYSLOG_FACILITY" : "3",
# "_UID" : "0",
# "_GID" : "0",
# "_SYSTEMD_SLICE" : "system.slice",
# "_CAP_EFFECTIVE" : "1ffffffffff",
# "_TRANSPORT" : "stdout",
# "_COMM" : "openssl",
# "_EXE" : "/usr/bin/openssl",
# "_HOSTNAME" : "sme11.thereadclan.me.uk",
# "_STREAM_ID" : "8bb0ef8920af4ae09b424a2e30abcdf7",
# "SYSLOG_IDENTIFIER" : "qpsmtpd-init",
# "MESSAGE" : "Generating DH parameters, 2048 bit long safe prime, generator 2",
# "_PID" : "2850",
# }
# and the return from here:
# {
# '_TRANSPORT': 'stdout', 'PRIORITY': 6, 'SYSLOG_FACILITY': 3, '_CAP_EFFECTIVE': '0', '_SYSTEMD_SLICE': 'system.slice',
# '_BOOT_ID': UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f'), '_MACHINE_ID': UUID('f20b7eda-d71a-44e5-9f9e-9b68d4870b19'),
# '_HOSTNAME': 'sme11.thereadclan.me.uk', '_STREAM_ID': '06c860deea374544a2b561f55394d728', 'SYSLOG_IDENTIFIER': 'qpsmtpd-forkserver',
# '_UID': 453, '_GID': 453, '_COMM': 'qpsmtpd-forkser', '_EXE': '/usr/bin/perl',
# '_CMDLINE': '/usr/bin/perl -Tw /usr/bin/qpsmtpd-forkserver -u qpsmtpd -l 0.0.0.0 -p 25 -c 40 -m 5',
# '_SYSTEMD_CGROUP': '/system.slice/qpsmtpd.service', '_SYSTEMD_UNIT': 'qpsmtpd.service',
# '_SYSTEMD_INVOCATION_ID': 'a2b7889a307748daaeb60173d31c5e0f', '_PID': 93647,
# 'MESSAGE': '93647 Connection from localhost [127.0.0.1]',
# '__REALTIME_TIMESTAMP': datetime.datetime(2025, 4, 2, 0, 1, 11, 668929),
# '__MONOTONIC_TIMESTAMP': journal.Monotonic(timestamp=datetime.timedelta(11, 53118, 613602),
# bootid=UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f')),
# '__CURSOR': 's=21b4f015be0c4f1fb71ac439a8365ee7;i=66d2c;b=465c620236ac4a8b98e91581e8fec68f;m=e9a65ed862;t=
# }
"""
Retrieve and parse journalctl logs for a specific date and units,
returning them as a sorted list of dictionaries.
"""
def to_us(ts):
# Convert a journal timestamp (datetime or int/string microseconds) to integer microseconds
if ts is None:
return None
if hasattr(ts, "timestamp"):
return int(ts.timestamp() * 1_000_000)
try:
return int(ts)
except Exception:
return None
try:
# Parse the input date to calculate start and end of the day
if isinstance(date, str) and date.lower() == "yesterday":
target_date = datetime.now() - timedelta(days=1)
elif isinstance(date, datetime):
target_date = date
else:
# Supports either a datetime.date-like object (has year attr) or a string YYYY-MM-DD
try:
target_date = datetime(date.year, date.month, date.day)
except Exception:
target_date = datetime.strptime(str(date), "%Y-%m-%d")
# Define the time range for the specified date
since_dt = datetime(target_date.year, target_date.month, target_date.day, 0, 0, 0, 0)
until_dt = datetime(target_date.year, target_date.month, target_date.day, 23, 59, 59, 999999)
since_microseconds = int(since_dt.timestamp() * 1_000_000)
until_microseconds = int(until_dt.timestamp() * 1_000_000)
# Open the systemd journal (system-only if supported)
try:
j = journal.Reader(flags=journal.SYSTEM_ONLY)
except Exception:
j = journal.Reader()
# Set filters for units (multiple add_match on same field => OR)
j.add_match(_SYSTEMD_UNIT="qpsmtpd.service")
j.add_match(_SYSTEMD_UNIT="uqpsmtpd.service")
j.add_match(_SYSTEMD_UNIT="sqpsmtpd.service")
# Filter by time range: seek to the start of the interval
j.seek_realtime(since_dt)
# Retrieve logs within the time range
logs = []
log_count = 0
error_count = 0
for entry in j:
try:
entry_timestamp = entry.get("__REALTIME_TIMESTAMP", None)
entry_microseconds = to_us(entry_timestamp)
if entry_microseconds is None:
continue
# Early stop once we pass the end of the window
if entry_microseconds > until_microseconds:
break
if entry_microseconds >= since_microseconds:
log_count += 1
# Strip ANSI escape sequences in MESSAGE (if present and is text/bytes)
try:
msg = entry.get("MESSAGE", "")
if isinstance(msg, (bytes, bytearray)):
msg = msg.decode("utf-8", "replace")
# Only call strip if ESC is present
if "\x1b" in msg:
msg = strip_ansi_codes(msg)
entry["MESSAGE"] = msg
except Exception as se:
# Keep original message, just note the issue at debug level
logging.debug(f"strip_ansi_codes failed: {se}")
logs.append(entry)
except Exception as e:
# Be defensive getting context fields to avoid raising inside logging
pid = entry.get("_PID", "?") if isinstance(entry, dict) else "?"
ident = entry.get("SYSLOG_IDENTIFIER", "?") if isinstance(entry, dict) else "?"
logging.warning(f"Error - log line: {log_count} {pid} {ident} : {e}")
error_count += 1
if error_count:
logging.info(f"Had {error_count} errors on journal import - probably non character bytes")
# Sort logs by __REALTIME_TIMESTAMP in ascending order (keep original behavior)
sorted_logs = sorted(logs, key=lambda x: to_us(x.get("__REALTIME_TIMESTAMP")) or 0)
logging.debug(f"Collected {len(sorted_logs)} entries for {since_dt.date()} "
f"between {since_dt} and {until_dt} (scanned {log_count} in-window)")
return sorted_logs
except Exception as e:
logging.error(f"Unexpected error: {e}")
return {}
def transform_to_dict(data, keys, iso_date):
"""
Transforms a 26x17 list of lists into a list of dictionaries with specified keys.
Args:
data (list): A 26x17 list of lists.
keys (list): A 1D array specifying the keys for the dictionaries.
iso_date (str): A date in ISO format to prepend to each row number.
Returns:get_JSOON
list: A list of dictionaries with transformed data.
"""
# Validate input dimensions
if len(data) != 26:
raise ValueError("Input data must have 26 rows.")
if len(keys) != len(data[0]): # Account for the new column
raise ValueError(f"Keys must match the number of columns after transformation {len(keys)} {len(data[0])}")
# Remove rows 25 and 26
filtered_data = data[:24]
# and same for keys
modified_keys = keys[1:-2]
# Add new column with ISO date and row number
transformed_data = []
for i, row in enumerate(filtered_data):
new_column_value = f"{i}" #f"{iso_date},{i}"
transformed_row = [new_column_value] + row[1:-2] # Remove first and last two columns
transformed_data.append(transformed_row)
# Convert each row into a dictionary using supplied keys
result = [dict(zip(["Time"] + modified_keys, row)) for row in transformed_data]
return result
def create_graph(data_dict, graph_type="line", output_file="graph.png",iso_date='1970-01-01'):
"""
Creates a graph from nested list data with hours as x-axis.
Args:
data_dict (list): List structure where:
- Each element is a list representing hour data
- First element is the hour (0-23)
- Remaining elements are counts for different types/categories
graph_type (str): Type of graph to create ("line", "bar", "scatter", "pie").
output_file (str): Path to save the image file.
"""
# Check if data is empty
if not data_dict:
raise ValueError("Input data cannot be empty")
# Extract hours (from the "NewColumn" key)
hours = [row["Time"] for row in data_dict] # First column is the ISO date + row number
# Extract types (keys excluding "NewColumn")
types = [key for key in data_dict[0].keys() if key != "Time"] # Dynamically get keys except "NewColumn"
# Extract counts for each type
counts = {typ: [row[typ] for row in data_dict] for typ in types}
plt.figure(figsize=(10, 6)) # Create a figure
# Generate different types of graphs based on the input parameter
if graph_type == "line":
for typ in types:
plt.plot(hours, counts[typ], label=typ, marker='o')
plt.title(f"Line Graph for {iso_date}")
plt.xlabel("Hours")
plt.ylabel("Counts")
elif graph_type == "bar":
bottom = [0] * len(hours)
for typ in types:
plt.bar(hours, counts[typ], bottom=bottom, label=typ)
bottom = [b + y for b, y in zip(bottom, counts[typ])]
plt.title(f"Bar Graph for {iso_date}")
plt.xlabel("Hours")
plt.ylabel("Counts")
elif graph_type == "scatter":
for typ in types:
plt.scatter(hours, counts[typ], label=typ)
plt.title(f"Scatter Plot for {iso_date}")
plt.xlabel("Hours")
plt.ylabel("Counts")
elif graph_type == "pie":
total_counts = {typ: sum(counts[typ]) for typ in types}
total_sum = sum(total_counts.values())
threshold_percent = 0.01 * total_sum
# Separate filtered counts and "Other" counts
filtered_counts = {}
other_total = 0
for typ, value in total_counts.items():
if value > 0 and value >= threshold_percent:
filtered_counts[typ] = value
else:
other_total += value
# Add "Other" category if there are values below the threshold
if other_total > 0:
filtered_counts["Other"] = other_total
# Prepare data for the pie chart
labels = filtered_counts.keys()
sizes = filtered_counts.values()
# Plot the pie chart
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f"Pie Chart for {iso_date}")
else:
raise ValueError(f"Unsupported graph type: {graph_type}")
if graph_type != "pie":
plt.xticks(hours)
plt.grid(alpha=0.3)
plt.legend()
# Save the graph to a file
plt.tight_layout()
plt.savefig(output_file)
plt.close()
# def convert_to_numeric(data):
# """
# Converts all values in a nested list or dictionary to numeric types (int or float).
# """
# for i in range(len(data)):
# for j in range(1, len(data[i])): # Skip the first column (hour)
# try:
# data[i][j] = float(data[i][j]) # Convert to float
# except ValueError:
# raise ValueError(f"Non-numeric value found: {data[i][j]}")
# return data
def save_summaries_to_db(cursor, conn, date_str, hour, parsed_data):
global count_records_to_db
json_data = json.dumps(parsed_data)
insert_query = """
INSERT INTO SummaryLogs (Date, Hour, logData)
VALUES (%s, %s, %s)
"""
try:
# Check if the cursor is open (pymysql has no explicit is_closed; handle by try/except)
cursor.execute(insert_query, (date_str, hour, json_data))
conn.commit()
count_records_to_db += 1
except pymysql.Error as err:
# Handle cursor closed or other DB errors
if 'closed' in str(err).lower():
logging.error(f"DB Error {date_str} {hour} : Cursor is closed. Check connection handling.")
else:
logging.error(f"DB Error {date_str} {hour} : {err}")
conn.rollback()
except Exception as ex:
logging.error(f"Unexpected DB Error {date_str} {hour} : {ex}")
conn.rollback()
def is_running_under_thonny():
# Check for the 'THONNY_USER_DIR' environment variable
return 'THONNY_USER_DIR' in os.environ
# Routines to access the E-Smith dbs
def parse_entity_line(line):
"""
Parses a single line of key-value pairs.
:param line: Single line string to be parsed
:return: Dictionary with keys and values
"""
parts = line.split('|')
# First part contains the entity name and type in the format 'entity_name=type'
entity_part = parts.pop(0)
entity_name, entity_type = entity_part.split('=')
entity_dict = {'type': entity_type}
for i in range(0, len(parts)-1, 2):
key = parts[i]
value = parts[i+1]
entity_dict[key] = value
return entity_name, entity_dict
def parse_config(config_string):
"""
Parses a multi-line configuration string where each line is an entity with key-value pairs.
:param config_string: Multi-line string to be parsed
:return: Dictionary of dictionaries with entity names as keys
"""
config_dict = {}
lines = config_string.strip().split('\n')
for line in lines:
line = line.strip()
if line.startswith('#'): # Skip lines that start with '#'
continue
entity_name, entity_dict = parse_entity_line(line)
config_dict[entity_name] = entity_dict
return config_dict
def read_config_file(file_path):
"""
Reads a configuration file and parses its contents.
:param file_path: Path to the configuration file
:return: Parsed configuration dictionary
"""
with open(file_path, 'r') as file:
config_string = file.read()
return parse_config(config_string)
def get_value(config_dict, entity, key, default=None):
"""
Retrieves the value corresponding to the given key from a specific entity.
:param config_dict: Dictionary of dictionaries with parsed config
:param entity: Entity from which to retrieve the key's value
:param key: Key whose value needs to be retrieved
:param default: Default value to return if the entity or key does not exist
:return: Value corresponding to the key, or the default value if the entity or key does not exist
"""
return config_dict.get(entity, {}).get(key, default)
def is_private_ip(ip):
try:
# Convert string to an IPv4Address object
ip_addr = ipaddress.ip_address(ip)
except ValueError:
return False
# Define private IP ranges
private_ranges = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
]
# Check if the IP address is within any of these ranges
for private_range in private_ranges:
if ip_addr in private_range:
return True
return False
def truncate_microseconds(timestamp):
# Split timestamp into main part and microseconds
try:
main_part, microseconds = timestamp.split('.')
# Truncate the last three digits of the microseconds
truncated_microseconds = microseconds[:-3]
# Combine the main part and truncated microseconds
truncated_timestamp = f"{main_part}.{truncated_microseconds}"
except Exception as e:
logging.error(f"{e} {timestamp}")
raise ValueError
# Remove the microseconds completely if they exist
return truncated_timestamp.split('.')[0]
def read_in_relevant_log_file(file_path,analysis_date=yesterday):
# Read the file and split each line into a list - timestamp and the rest
log_entries = []
skip_record_count = 0
ignore_record_count = 0
# Get the year of yesterday
yesterday = datetime.now() - timedelta(days=1)
yesterday_year = yesterday.year
line_count = 0;
with codecs.open(file_path, 'rb','utf-8', errors='replace') as file:
try:
for Line in file:
line_count += 1
#extract time stamp
try:
entry = split_timestamp_and_data(Line)
# compare with anal date
timestamp_str = entry[0]; #truncate_microseconds(entry[0])
except ValueError as e:
logging.error(f"ValueError {e} on timestamp create {timestamp_str}:{entry[0]} {entry[1]}")
skip_record_count += 1
continue
# Parse the timestamp string into a datetime object
# Ignoring extra microseconds
try:
timestamp = datetime.strptime(timestamp_str, "%b %d %H:%M:%S")
# and add in gthe year of yesterday
timestamp = timestamp.replace(year=yesterday_year)
except (ValueError, TypeError) as e:
logging.error(f"Error {e} line {line_count} on timestamp extract {timestamp_str}:{entry[1]}")
ignore_record_count += 1
continue
if timestamp.date() == analysis_date.date():
log_entries.append((timestamp, entry[1]))
else:
ignore_record_count += 1
except UnicodeDecodeError as e:
pass
return [log_entries,skip_record_count,ignore_record_count]
def filter_summary_records(log_entries):
# Return just the summary records
filtered_log_entries = []
skipped_entry_count = 0
for line in log_entries:
if '`' in line['MESSAGE']:
filtered_log_entries.append(line)
else:
skipped_entry_count += 1
return [filtered_log_entries,skipped_entry_count]
def sort_log_entries(log_entries):
# Sort the records, based on the timestamp
sorted_entries = sorted(log_entries, key=lambda x: x['__REALTIME_TIMESTAMP'])
# and return a dictionary
sorted_dict = {entry['__REALTIME_TIMESTAMP']: entry['MESSAGE'] for entry in sorted_entries}
return sorted_dict
def parse_data(data):
# Split data string into parts and map to named fields.
# Adjust the field names and parsing logic according to your data format.
# Split at the backtick - before it fields split at space, after, fields split at tab
parts = data.split('`')
fields0 = ["",""] #Add in dummy to make it the same as before, saves changing all the numbers below.
fields1 = parts[0].strip().split() if len(parts) > 0 else []
fields2 = parts[1].split('\t') if len(parts) > 1 else []
# then merge them
fields = fields0 + fields1 + fields2
# and mapping:
try:
return_dict = {
'sme': fields[0].strip() if len(fields) > 0 else "",
'qpsmtpd': fields[1].strip() if len(fields) > 1 else "",
'id': fields[2].strip() if len(fields) > 2 else "",
'action': fields[3].strip() if len(fields) > 3 else "", #5
'logterse': fields[4].strip() if len(fields) > 4 else "",
'ip': fields[5].strip() if len(fields) > 5 else "",
'sendurl': fields[6].strip() if len(fields) > 6 else "", #1
'sendurl1': fields[7].strip() if len(fields) > 7 else "", #2
'from-email': fields[8].strip() if len(fields) > 8 else "", #3
'error-reason': fields[8].strip() if len(fields) > 9 else "", #3
'to-email': fields[9].strip() if len(fields) > 9 else "", #4
'error-plugin': fields[10].strip() if len(fields) > 10 else "", #5
'action1': fields[10].strip() if len(fields) > 10 else "", #5
'error-number' : fields[11].strip() if len(fields) > 11 else "", #6
'sender': fields[12].strip() if len(fields) > 12 else "", #7
'virus': fields[12].strip() if len(fields) > 12 else "", #7
'error-msg' :fields[13].strip() if len(fields) > 13 else "", #7
'spam-status': fields[13].strip() if len(fields) > 13 else "", #8
'error-result': fields[14].strip() if len(fields) > 14 else "",#8
# Add more fields as necessary
}
except:
logging.error(f"error:len:{len(fields)}")
return_dict = create_empty_return()
return return_dict
def safe_strip(lst, index):
if 0 <= index < len(lst):
value = lst[index]
if value is not None:
return value.strip()
return ""
def create_empty_return():
# Return dictionary with all keys, values None
keys = [
'sme', 'qpsmtpd', 'id', 'action', 'logterse', 'ip', 'sendurl', 'sendurl1',
'from-email', 'error-reason', 'to-email', 'error-plugin', 'action1', 'error-number',
'sender', 'virus', 'error-msg', 'spam-status', 'error-result'
]
return {key: "" for key in keys}
# def count_entries_by_hour(log_entries):
# hourly_counts = defaultdict(int)
# for entry in log_entries:
# # Extract hour from the timestamp
# timestamp = entry['timestamp']
# hour = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H')
# hourly_counts[hour] += 1
# return hourly_counts
def initialize_2d_array(num_hours, column_headers_len,reporting_date):
num_hours += 1 # Adjust for the zeroth hour
# Initialize the 2D list with zeroes
return [[0] * column_headers_len for _ in range(num_hours)]
def search_2d_list(target, data):
"""
Search for a target string in a 2D list of variable-length lists of strings.
:param target: str, the string to search for
:param data: list of lists of str, the 2D list to search
:return: int, the row number where the target string is found, or -1 if not found
"""
for row_idx, row in enumerate(data):
if target in row:
return row_idx
return -1 # Return -1 if not found
def check_html2text_installed():
try:
# Check if html2text is installed by running 'which html2text'
result = subprocess.run(
['which', 'html2text'],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# If the command finds html2text, it will output the path
html2text_path = result.stdout.decode('utf-8').strip()
if not html2text_path:
raise FileNotFoundError
logging.debug(f"html2text is installed at: {html2text_path}")
return True
except subprocess.CalledProcessError:
logging.error("html2text is not installed. Please install it using your package manager.", file=sys.stderr)
return False
def html_to_text(input_file, output_file):
if not check_html2text_installed():
sys.exit(1)
try:
# Run the html2text command with -b0 --pad-tables parameters
result = subprocess.run(
['html2text', '-b0', '--pad-tables', input_file],
check=True, # Raise a CalledProcessError on non-zero exit
stdout=subprocess.PIPE, # Capture stdout
stderr=subprocess.PIPE # Capture stderr
)
# Write the stdout from the command to the output file
with open(output_file, 'w', encoding='utf-8') as outfile:
outfile.write(result.stdout.decode('utf-8'))
logging.debug(f"Converted {input_file} to {output_file}")
except subprocess.CalledProcessError as e:
logging.error(f"Error occurred: {e.stderr.decode('utf-8')}", file=sys.stderr)
sys.exit(e.returncode)
def get_html2text_version():
try:
result = subprocess.run(['html2text', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# Ensure the result is treated as a string in Python 3.6+
return result.stdout.strip()
except subprocess.CalledProcessError as e:
logging.error(f"Error occurred while checking html2text version: {e}", file=sys.stderr)
return None
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=50, fill='', print_end="\r"):
"""
Call in a loop to create a terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
logging.error(_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
if total == 0:
raise ValueError("Progress total is zero")
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + '-' * (length - filled_length)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)
# logging.error( New Line on Complete
if iteration == total:
print()
def insert_string_after(original:str, to_insert:str, after:str) -> str:
"""
Insert to_insert into original after the first occurrence of after.
:param original: The original string.
:param to_insert: The string to be inserted.
:param after: The set of characters after which the string will be inserted.
:return: The new string with to_insert inserted after after.
"""
position = original.find(after)
if position == -1:
logging.error(f"insert_string_after:({after}) string is not found in original")
return original
# Position of the insertion point
insert_pos = position + len(after)
return original[:insert_pos] + to_insert + original[insert_pos:]
def split_timestamp_and_data(log_entry: str) -> list:
"""
Split a log entry into timestamp and the rest of the data.
:param log_entry: The log entry as a string.
:return: A list with two entries: [timestamp, rest_of_data].
"""
# The timestamp is always the first part, up to the first space after the milliseconds
# SME11 - the timestamp looks like this: "Dec 29 07:42:00 sme11 qpsmtpd-forkserver[942177]:<the rest>"
#
match = re.match(r'(\w{3} \d{1,2} \d{2}:\d{2}:\d{2}) (.+)', log_entry)
if match:
timestamp = match.group(1)
rest_of_line = match.group(2).strip() # Strip any leading spaces
else:
timestamp = None
rest_of_line = log_entry # If no match, return the whole line
return [timestamp, rest_of_line]
def render_sub_table(table_title, table_headers, found_values, get_character=None, suppress_threshold=False):
#Check if any data provided
if len(found_values) != 0:
# Get the total
original_total = 0 # Initialize total variable
if isinstance(found_values, dict):
# If found_values is a dictionary, we operate as previously
total_sum = sum(found_values.values())
original_total = total_sum
if not BadCountries:
get_character = None
if get_character:
sub_result = [(key, value,
f"{round(value / total_sum * 100, 2)}%",
f"{get_character(key)}") for key, value in found_values.items()]
else:
sub_result = [(key, value,
f"{round(value / total_sum * 100, 2)}%" ) for key, value in found_values.items()]
elif isinstance(found_values, list):
# If found_values is a list of values
if all(isinstance(v, (int, float)) for v in found_values):
total_sum = sum(found_values)
original_total = total_sum
sub_result = [(i, value,
f"{round(value / total_sum * 100, 2)}%") for i, value in enumerate(found_values)]
# If found_values is a list of dictionaries
elif all(isinstance(v, dict) for v in found_values):
# Example assumes first key is used for identification and others are numeric
# Convert to 2D array
sub_result = [list(entry.values()) for entry in found_values]
# Calculate the total of the first numeric entry (index 1)
total = sum(row[1] for row in sub_result)
original_total = total
# Append percentage of the total for each entry
for row in sub_result:
percentage = f"{round(row[1] / total * 100, 2) if total else 0}%" # Handle division by zero
row.append(percentage)
else:
raise ValueError("found_values must be either a list of numbers or a list of dictionaries.")
else:
raise TypeError("found_values must be a dictionary or a list.")
sub_result.sort(key=lambda x: float(x[1]), reverse=True) # Sort by percentage in descending order
# Dynamic threshold calculation
if not suppress_threshold:
dynamic_threshold = max(1, 100 / (original_total**0.5)) if original_total > 0 else 0
dynamic_threshold = round(dynamic_threshold,1)
logging.debug(f"Threshold for {table_title} set to {dynamic_threshold}% ")
else:
dynamic_threshold=0
absolute_floor = 50 # Minimum absolute value threshold
# Filter results using early termination
filtered_sub_result = []
for row in sub_result:
value = row[1]
percentage = (value / original_total * 100) if original_total else 0
# Exit condition: below both thresholds
if percentage < dynamic_threshold and value < absolute_floor:
break
filtered_sub_result.append(row)
sub_result = filtered_sub_result # Keep only significant rows
sub_template_path = template_dir+'mailstats-sub-table.html.pt'
# Load the template
with open(sub_template_path, 'r') as template_file:
template_content = template_file.read()
# Create a Chameleon template instance
try:
template = PageTemplate(template_content)
# Render the template with the 2D array data and column headers
try:
rendered_html = template(array_2d=sub_result, column_headers=table_headers,
title=table_title, classname=get_first_word(table_title),
threshold=dynamic_threshold)
except Exception as e:
raise ValueError(f"{table_title}: A chameleon controller render error occurred: {e}")
except Exception as e:
raise ValueError(f"{table_title}: A chameleon controller template error occurred: {e}")
else:
rendered_html = f"<div class='{get_first_word(table_title)}'><h2>{table_title}</h2>No data for {table_title}</div>"
return rendered_html
def get_character_in_reject_list(code):
if code in BadCountries:
return "*"
else:
return ""
def get_first_word(text):
return text.split(None, 1)[0]
def read_html_from_file(filepath):
"""
Reads HTML content from a given file.
Args:
filepath (str): Path to the HTML file.
Returns:
str: HTML content of the file.
"""
# Need to add in here the contents of the css file at the end of the head section.
with open(filepath, 'r', encoding='utf-8') as file:
html_contents = file.read()
logging.debug("Reading from html file")
# Get Filepath
css_path = os.path.dirname(filepath)+"/../css/mailstats.css"
# Read in CSS
with open(css_path, 'r', encoding='utf-8') as file:
css_contents = file.read()
html_contents = insert_string_after(html_contents,"\n<style>"+css_contents+"</style>","<!--css here-->")
return html_contents
def read_text_from_file(filepath):
"""
Reads plain text content from a given file.
Args:
filepath (str): Path to the text file.
Returns:
str: Text content of the file.
"""
try:
with open(filepath, 'r', encoding='utf-8') as file:
return file.read()
except:
logging.error(f"{filepath} not found")
return
def send_email(subject, from_email, to_email, smtp_server, smtp_port, HTML_content=None, Text_content=None, smtp_user=None, smtp_password=None):
"""
Sends an HTML email.
Args:
html_content (str): The HTML content to send in the email.
subject (str): The subject of the email.
from_email (str): The sender's email address.
to_email (str): The recipient's email address.
smtp_server (str): SMTP server address.
smtp_port (int): SMTP server port.
smtp_user (str, optional): SMTP server username. Default is None.
smtp_password (str, optional): SMTP server password. Default is None.
"""
#Example (which works!)
# send_email(
# subject="Your subject",
# from_email="mailstats@bjsystems.co.uk",
# to_email="brianr@bjsystems.co.uk",
# smtp_server="mail.bjsystems.co.uk",
# smtp_port=25
# HTML_content=html_content,
# Text_content=Text_content,
# )
# Set up the email
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = from_email
msg['To'] = to_email
if HTML_content:
part = MIMEText(HTML_content, 'html')
msg.attach(part)
if Text_content:
part = MIMEText(Text_content, 'plain')
msg.attach(part)
# Sending the email
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls() # Upgrade the connection to secure
if smtp_user and smtp_password:
server.login(smtp_user, smtp_password) # Authenticate only if credentials are provided
server.sendmail(from_email, to_email, msg.as_string())
def replace_between(text, start, end, replacement):
# Escaping start and end in case they contain special regex characters
pattern = re.escape(start) + '.*?' + re.escape(end)
# Using re.DOTALL to match any character including newline
replaced_text = re.sub(pattern, replacement, text, flags=re.DOTALL)
return replaced_text
def assemble_heading_row(label,value):
return f"<tr><td>{label}</td><td>{value}</td><tr>"
def get_heading():
#
# Needs from anaytsis
# SATagLevel - done
# SARejectLevel - done
# warnnoreject - done
# totalexamined - done
# emailperhour - done
# spamavg - done
# rejectspamavg - done
# hamavg - done
# DMARCSendCount - done
# hamcount - done
# DMARCOkCount - deone
# Clam Version/DB Count/Last DB update
clam_output = subprocess.getoutput("freshclam -V")
clam_info = assemble_heading_row("Clam Version/DB Count/Last DB update:", clam_output)
# SpamAssassin Version
sa_output = subprocess.getoutput("spamassassin -V")
sa_info = assemble_heading_row("SpamAssassin Version: ",sa_output)
# Tag level and Reject level
tag_reject_info = assemble_heading_row("Tag level:",SATagLevel)
tag_reject_info += assemble_heading_row("Reject level: ",f"{SARejectLevel} {warnnoreject}")
# SMTP connection stats
smtp_stats = assemble_heading_row("External SMTP connections accepted:",totalexternalsmtpsessions)
smtp_stats += assemble_heading_row("Internal SMTP connections accepted:",totalinternalsmtpsessions)
if len(connection_type_counts)>0:
for connection_type in connection_type_counts.keys():
smtp_stats += assemble_heading_row(f"\nCount of {connection_type} connections:",connection_type_counts[connection_type])
if len(total_ports)>0:
for port_number in total_ports.keys():
smtp_stats += assemble_heading_row(f"\nCount of port {port_number} connections: ",total_ports[port_number])
rows = [
assemble_heading_row("Emails per hour:", f"{(emailperhour if emailperhour is not None else 0):.1f}/hr"),
assemble_heading_row("Average spam score (accepted):", f"{(spamavg if spamavg is not None else 0):.2f}"),
assemble_heading_row("Average spam score (rejected):", f"{(rejectspamavg if rejectspamavg is not None else 0):.2f}"),
assemble_heading_row("Average ham score:", f"{(hamavg if hamavg is not None else 0):.2f}"),
assemble_heading_row("Number of DMARC reporting emails sent:", f"{DMARCSendCount if DMARCSendCount is not None else 0} (not shown on table)"),
]
smtp_stats += " ".join(rows) # or "\n".join(rows) if assemble_heading_row doesnt add its own newline
# DMARC approved emails
dmarc_info = ""
if hamcount != 0:
dmarc_ok_percentage = DMARCOkCount * 100 / hamcount
dmarc_info = assemble_heading_row("Number of emails approved through DMARC:",f"{DMARCOkCount or 0} ({dmarc_ok_percentage:.2f}% of Ham count)")
# Accumulate all strings
#header_str = "<br />".join([clam_info, sa_info, tag_reject_info, smtp_stats, dmarc_info])
# switch newlines to <br />
#header_str = header_str.replace("\n","<br />")
header_str1 = clam_info + sa_info + tag_reject_info
header_str2 = smtp_stats + dmarc_info
return header_str1,header_str2
def scan_mail_users():
#
# Count emails left in junkmail folders for each user
#
base_path = '/home/e-smith/files/users'
users_info = defaultdict(int)
# List of junk mail directories to check
junk_mail_directories = [
'Maildir/.Junk/cur',
'Maildir/.Junk/new',
'Maildir/.Junkmail/cur',
'Maildir/.Junkmail/new'
'Maildir/.junk/cur',
'Maildir/.junk/new',
'Maildir/.junkmail/cur',
'Maildir/.junkmail/new'
]
# Iterate through each user directory
for user in os.listdir(base_path):
user_path = os.path.join(base_path, user)
# Check if it is a directory
if os.path.isdir(user_path):
total_junk_count = 0
# Check each junk mail path and accumulate counts
for junk_dir in junk_mail_directories:
junk_mail_path = os.path.join(user_path, junk_dir)
# Check if the Junk directory actually exists
if os.path.exists(junk_mail_path):
try:
# Count the number of junk mail files in that directory
junk_count = len(os.listdir(junk_mail_path))
total_junk_count += junk_count
except Exception as e:
logging.error(f"Error counting junk mails in {junk_mail_path} for user {user}: {e}")
if total_junk_count != 0:
users_info[user] = total_junk_count
return users_info
def get_first_email_with_domain(email_string, domain):
"""
Returns the first email address in the comma-separated string that matches the specified domain.
If there is only one email, it returns that email regardless of the domain.
Args:
email_string (str): A string of comma-separated email addresses.
domain (str): The domain to filter email addresses by.
Returns:
str: The first email address that matches the domain, or the single email if only one is provided, or None if no match is found.
"""
# Remove leading and trailing whitespace and split the email string
emails = [email.strip() for email in email_string.split(',')]
# Check if there is only one email
if len(emails) == 1:
return emails[0] # Return the single email directly
# Iterate through the list of emails
for email in emails:
# Check if the email ends with the specified domain
if email.endswith('@' + domain):
return email # Return the first matching email
return None # Return None if no matching email is found
def display_keys_and_values(data):
"""
Display all keys and values for a list of dictionaries or an array (list of lists).
Args:
data (list): A list of dictionaries or a list of lists.
"""
if not isinstance(data, list):
raise ValueError("Input must be a list.")
if all(isinstance(item, dict) for item in data):
# Handle list of dictionaries
for index, dictionary in enumerate(data):
print(f"Item {index + 1}:")
for key, value in dictionary.items():
print(f" {key}: {value}")
print() # Add a blank line between items
elif all(isinstance(item, list) for item in data):
# Handle array (list of lists)
for index, item in enumerate(data):
print(f"Item {index + 1}:")
for i, value in enumerate(item):
print(f" Column {i + 1}: {value}")
print() # Add a blank line between items
else:
raise ValueError("Input must be a list of dictionaries or a list of lists.")
def extract_blacklist_domain(text):
"""
Compare 'text' against comma-separated URL strings from global vars
RBLList, SBLList, and UBLList. Return the first matching entry or "".
Match is done on exact hostname substring OR the base domain (eTLD+1),
so 'black.uribl.com' will match text containing 'lookup.uribl.com'.
"""
s = text if isinstance(text, str) else str(text or "")
s_lower = s.lower()
logging.debug(f"extract blacklist called:{text}")
combined = ",".join([RBLList, SBLList, UBLList])
def hostname_from(sval: str) -> str:
sval = (sval or "").strip().lower()
if "://" in sval:
# Strip scheme using simple split to avoid needing urlparse
sval = sval.split("://", 1)[1]
# Strip path and port if present
sval = sval.split("/", 1)[0]
sval = sval.split(":", 1)[0]
# Remove leading wildcards/dots
sval = sval.lstrip(".")
if sval.startswith("*."):
sval = sval[2:]
return sval
def base_domain(hostname: str) -> str:
parts = hostname.split(".")
if len(parts) >= 3 and parts[-2] in ("co", "org", "gov", "ac") and parts[-1] == "uk":
return ".".join(parts[-3:])
if len(parts) >= 2:
return ".".join(parts[-2:])
return hostname
def boundary_re(term: str):
# Match term when not part of a larger domain label
return re.compile(r"(?<![A-Za-z0-9-])" + re.escape(term) + r"(?![A-Za-z0-9-])")
for part in combined.split(","):
entry = part.strip()
logging.debug(f"Comparing: {entry}")
if not entry:
continue
entry_host = hostname_from(entry)
entry_base = base_domain(entry_host)
# 1) Try matching the full entry host (e.g., black.uribl.com)
if entry_host and boundary_re(entry_host).search(s_lower):
return entry
# 2) Fallback: match by base domain (e.g., uribl.com) to catch lookup.uribl.com, etc.
if entry_base and boundary_re(entry_base).search(s_lower):
return entry
return ""
def set_log_level(level):
"""Dynamically adjust logging level (e.g., 'DEBUG', 'INFO', 'ERROR')."""
numeric_level = getattr(logging, level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {level}")
logging.setLevel(numeric_level)
def format_duration(seconds: float) -> str:
"""Convert seconds to human-readable HH:MM:SS format."""
return str(timedelta(seconds=seconds))
DB_CONFIG_PATH = '/etc/mailstats/db.php'
def parse_php_config(path):
# Read file as text and extract key-value pairs using regex
try:
with open(path, 'r') as f:
content = f.read()
cfg = {}
for match in re.finditer(r"'(\w+)'\s*=>\s*'([^']*)'", content):
cfg[match.group(1)] = match.group(2)
return cfg
except Exception as e:
logging.error(f"Could not parse PHP config file: {e}")
return {}
def load_db_config():
db_host = os.environ.get('MAILSTATS_DB_HOST', 'localhost')
db_user = os.environ.get('MAILSTATS_DB_USER', '')
db_pass = os.environ.get('MAILSTATS_DB_PASS', '')
db_name = os.environ.get('MAILSTATS_DB_NAME', '')
if db_user == '' or db_pass == '' or db_name == '':
if os.path.isfile(DB_CONFIG_PATH) and os.access(DB_CONFIG_PATH, os.R_OK):
cfg = parse_php_config(DB_CONFIG_PATH)
db_host = cfg.get('host', db_host)
db_user = cfg.get('user', db_user)
db_pass = cfg.get('pass', db_pass)
db_name = cfg.get('name', db_name)
if db_user == '' or db_pass == '' or db_name == '':
logging.error('DB credentials missing (env and config file).')
raise RuntimeError('DB credentials missing (env and config file)')
return db_host, db_user, db_pass, db_name
if __name__ == "__main__":
start_time = datetime.now()
try:
chameleon_version = pkg_resources.get_distribution("Chameleon").version
except pkg_resources.DistributionNotFound:
chameleon_version = "Version information not available"
python_version = sys.version
#python_version = python_version[:8]
python_version = re.match(r'^\d+\.\d+\.\d+',python_version).group(0); #Extract the version number
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M")
# Command line parameters
parser = argparse.ArgumentParser(description="Mailstats")
parser.add_argument('-d', '--date', help='Specify a valid date (yyyy-mm-dd) for the analysis', default=formatted_yesterday)
parser.add_argument('-ef', '--emailfile', help='Save an html file of the email sent (y/N)', default='n')
parser.add_argument('-tf', '--textfile', help='Save a txt file of the html page (y/N)', default='n')
parser.add_argument('--version', action='version', version='%(prog)s '+Mailstats_version+" built on "+build_date_time)
parser.add_argument('-db', '--dbsave', help='Force save of summary logs in DB (y/N)', default='n')
args = parser.parse_args()
analysis_date = args.date
# and check its format is valid
try:
datetime.strptime(analysis_date, '%Y-%m-%d')
except ValueError:
logging.error("Specify a valid date (yyyy-mm-dd) for the analysis")
quit(1)
anaysis_date_obj = datetime.strptime(analysis_date, '%Y-%m-%d')
noemailfile = args.emailfile.lower() == 'n'
notextfile = args.textfile.lower() == 'n'
isThonny = is_running_under_thonny()
forceDbSave = args.dbsave.lower() == 'y'
#E-Smith Config DBs
if isThonny:
db_dir = "/home/brianr/SME11Build/GITFiles/smecontribs/smeserver-mailstats/"
else:
db_dir = "/home/e-smith/db/"
#From SMEServer DB
ConfigDB = read_config_file(db_dir+"configuration")
DomainName = get_value(ConfigDB, "DomainName", "type") #'bjsystems.co.uk' # $cdb->get('DomainName')->value;
SystemName = get_value(ConfigDB, "SystemName", "type")
hello_string = "Mailstats:"+Mailstats_version+' for '+SystemName+"."+DomainName+" for "+analysis_date+" printed at:"+formatted_datetime
logging.info(hello_string)
version_string = "Chameleon:"+chameleon_version+" Python:"+python_version
if isThonny:
version_string = version_string + "...under Thonny"
logging.debug(f"{version_string} and built on {build_date_time}")
RHSenabled = get_value(ConfigDB, "qpsmtpd", "RHSBL","disabled") == "enabled" #True #( $cdb->get('qpsmtpd')->prop('RHSBL') eq 'enabled' );
DNSenabled = get_value(ConfigDB, "qpsmtpd", "DNSBL","disabled") == "enabled" #True #( $cdb->get('qpsmtpd')->prop('DNSBL') eq 'enabled' );
SARejectLevel = int(get_value(ConfigDB, "spamassassin", "RejectLevel","12")) #12 #$cdb->get('spamassassin')->prop('RejectLevel');
SATagLevel = int(get_value(ConfigDB, "spamassassin", "TagLevel","4")) #4 #$cdb->get('spamassassin')->prop('TagLevel');
if SARejectLevel == 0:
warnnoreject = "(*Warning* 0 = no reject)"
else:
warnnoreject = ""
EmailAddress = get_value(ConfigDB,"mailstats","Email","admin@"+DomainName)
if '@' not in EmailAddress:
EmailAddress = EmailAddress+"@"+DomainName
EmailTextorHTML = get_value(ConfigDB,"mailstats","TextorHTML","Both") #Text or Both or None
EmailHost = get_value(ConfigDB,"mailstats","EmailHost","localhost") #Default will be localhost
EmailPort = int(get_value(ConfigDB,"mailstats","EmailPort","25"))
EMailSMTPUser = get_value(ConfigDB,"mailstats","EmailUser") #None = default => no authenticatioon needed
EMailSMTPPassword = get_value(ConfigDB,"mailstats","EmailPassword")
BadCountries = get_value(ConfigDB,"qpsmtpd","BadCountries")
wanted_mailstats_email = get_value(ConfigDB,"mailstats","CountMailstatsEmail", "no")
count_records_to_db = 0;
# Db save control
saveData = get_value(ConfigDB,"mailstats","SaveDataToMySQL","no") == 'yes' or forceDbSave
logging.debug(f"Save Mailstats to DB set:{saveData} ")
if saveData:
# Database config retrieval
try:
DBHost, DBUser, DBPassw, DBName = load_db_config()
DBPort = 3306 # If you want configurability, load this from config too
UnixSocket = "/var/lib/mysql/mysql.sock"
except RuntimeError as err:
logging.error(f"Database config error: {err}")
saveData = False
# Try to establish a database connection
try:
conn = pymysql.connect(
host=DBHost,
user=DBUser,
password=DBPassw,
database=DBName,
port=DBPort,
unix_socket=UnixSocket,
cursorclass=pymysql.cursors.DictCursor
)
cursor = conn.cursor()
# Check if the table exists before creating it
check_table_query = "SHOW TABLES LIKE 'SummaryLogs'"
cursor.execute(check_table_query)
table_exists = cursor.fetchone()
if not table_exists:
cursor.execute("""
CREATE TABLE IF NOT EXISTS SummaryLogs (
id INT AUTO_INCREMENT PRIMARY KEY,
Date DATE,
Hour INT,
logData TEXT
)
""")
# Delete existing records for the given date
try:
delete_query = """
DELETE FROM SummaryLogs
WHERE Date = %s
"""
cursor.execute(delete_query, (analysis_date,))
rows_deleted = cursor.rowcount
if rows_deleted > 0:
logging.debug(f"Deleted {rows_deleted} rows for {analysis_date}")
except pymysql.Error as e:
logging.error(f"SQL Delete failed ({delete_query}) ({e})")
# Commit changes & close resources after all DB operations
conn.commit()
#cursor.close()
#conn.close()
except pymysql.Error as e:
logging.error(f"Unable to connect to {DBName} on {DBHost} port {DBPort} error ({e})")
saveData = False
nolinks = not saveData
# Needed to identify blacklist used to reject emails.
if get_value(ConfigDB,"qpsmtpd","RHSBL").lower() == 'enabled':
RBLList = get_value(ConfigDB,"qpsmtpd","RBLList")
else:
RBLList = ""
if get_value(ConfigDB,"qpsmtpd","DNSBL").lower() == 'enabled':
SBLList = get_value(ConfigDB,"qpsmtpd","SBLList")
else:
SBLList = ""
if get_value(ConfigDB,"qpsmtpd","URIBL").lower() == 'enabled':
UBLList = get_value(ConfigDB,"qpsmtpd","UBLList")
else:
UBLList = ""
FetchmailIP = '127.0.0.200'; #Apparent Ip address of fetchmail deliveries
WebmailIP = '127.0.0.1'; #Apparent Ip of Webmail sender
localhost = 'localhost'; #Apparent sender for webmail
FETCHMAIL = 'FETCHMAIL'; #Sender from fetchmail when Ip address not 127.0.0.200 - when qpsmtpd denies the email
MAILMAN = "bounces"; #sender when mailman sending when orig is localhost
DMARCDomain="dmarc"; #Pattern to recognised DMARC sent emails (this not very reliable, as the email address could be anything)
DMARCOkPattern="dmarc: pass"; #Pattern to use to detect DMARC approval
num_hours = 25 # Represents hours from 0 to 23 - adds extra one for column totals and another for percentages
#log_file = logs_dir+'current.log'
#log_entries,skip_count,ignored_count = read_in_relevant_log_file(log_file,anaysis_date_obj)
log_entries = get_logs_from_Journalctl(analysis_date)
logging.debug(f"Found {len(log_entries)} entries in log for for {anaysis_date_obj.strftime('%Y-%m-%d')}") #Ignored: {ignored_count} skipped: {skip_count}")
summary_log_entries,skip_count = filter_summary_records(log_entries)
logging.debug(f"Found {len(summary_log_entries)} summary entries and skipped {skip_count} entries")
sorted_log_dict = sort_log_entries(summary_log_entries)
logging.debug(f"Sorted {len(sorted_log_dict)} entries")
#print(f"{sorted_log_dict}")
#quit(1)
columnHeaders = ['Count','WebMail','Local','MailMan','Relay','DMARC','Virus','RBL/DNS','Geoip.','Non.Conf.','Karma','Rej.Load','Del.Spam','Qued.Spam?',' Ham','TOTALS','PERCENT']
# dict for each colum identifying plugin that increments count
columnPlugin = [''] * 17
columnPlugin[Hour] = []
columnPlugin[WebMail] = []
columnPlugin[Local] = []
columnPlugin[MailMan] = []
columnPlugin[DMARC] = ['dmarc']
columnPlugin[Virus] = ['pattern_filter', 'virus::pattern_filter','virus::clamav','virus::clamdscan']
columnPlugin[RBLDNS] = ['rhsbl', 'dnsbl','uribl']
columnPlugin[Geoip] = ['check_badcountries']
columnPlugin[NonConf] = ['check_earlytalker','check_relay','check_norelay', 'require_resolvable_fromhost'
,'check_basicheaders','check_badmailfrom','check_badrcptto_patterns'
,'check_badrcptto','check_spamhelo','check_goodrcptto extn','rcpt_ok'
,'check_goodrcptto','check_smtp_forward','count_unrecognized_commands','tls','auth::auth_cvm_unix_local'
,'auth::auth_imap', 'earlytalker','resolvable_fromhost','relay','headers','mailfrom','badrcptto','helo'
,'check_smtp_forward','sender_permitted_from']
columnPlugin[RejLoad] = ['loadcheck']
columnPlugin[DelSpam] = []
columnPlugin[QuedSpam] = []
columnPlugin[Ham] = []
columnPlugin[TOTALS] = []
columnPlugin[PERCENT] = []
columnPlugin[Karma] = ['karma']
columnHeaders_len = len(columnHeaders)
columnCounts_2d = initialize_2d_array(num_hours, columnHeaders_len,analysis_date)
virus_pattern = re.compile(r"Virus found: (.*)")
found_viruses = defaultdict(int)
recipients_found = []
found_qpcodes = defaultdict(int)
total_ports = defaultdict(int)
blacklist_found = defaultdict(int)
qpcodes_pattern = re.compile(r"(\(.*\)).*'")
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' #extract email from rejected message
i = 0;
sorted_len= len(sorted_log_dict)
#unless none to show
spamavg = 0;
spamqueuedcount = 0
hamcount = 0
hamavg = 0
rejectspamcount = 0
rejectspamavg = 0
DMARCSendCount = 0
totalexamined = 0
total_qpsmtpd = 0
total_sqpsmtpd = 0
total_uqpsmtpd = 0
count_ignored_mailstats = 0;
if sorted_len > 0:
if isThonny:
# Initial call to logging.error( the progress bar
print_progress_bar(0, sorted_len, prefix='Progress:', suffix='Complete', length=50)
for timestamp, data in sorted_log_dict.items():
i += 1
totalexamined += 1
if isThonny:
print_progress_bar(i, sorted_len, prefix='Scanning for main table:', suffix='Complete', length=50)
# Count of in which hour it falls
# Parse the timestamp string into a datetime object
dt = timestamp
hour = dt.hour
# parse the data
parsed_data = parse_data(data)
#Take out the mailstats email if necessay
if wanted_mailstats_email == 'no':
if 'mailstats' in parsed_data['from-email'] and DomainName in parsed_data['from-email']:
count_ignored_mailstats +=1
continue
# Save the data here if necessary
if saveData:
save_summaries_to_db(cursor,conn,anaysis_date_obj.strftime('%Y-%m-%d'),hour,parsed_data)
#Count the number of emails through each of qpsmtpd, uqpsmtpd and sqpsmtpd
# the forkserver column in the log indicates it.
if parsed_data['qpsmtpd'].startswith ('qpsmtpd'):
total_ports['25'] +=1
elif parsed_data['qpsmtpd'].startswith ('sqpsmtpd'):
total_ports['465'] +=1
elif parsed_data['qpsmtpd'].startswith ('uqpsmtpd'):
total_ports['587'] +=1
# Increment Count in which headings it falls
#Hourly count and column total
columnCounts_2d[hour][Hour] += 1
columnCounts_2d[ColTotals][Hour] += 1
#Row Totals
columnCounts_2d[hour][TOTALS] += 1
#Total totals
columnCounts_2d[ColTotals][TOTALS] += 1
# first spot the fetchmail and 'local' deliveries.
#Local send
if DomainName in parsed_data['sendurl']:
columnCounts_2d[hour][Local] += 1
columnCounts_2d[ColTotals][Local] += 1
#Relay or webmail
elif not is_private_ip(parsed_data['ip']) and is_private_ip(parsed_data['sendurl1']) and parsed_data['action1'] == 'queued':
#Relay
columnCounts_2d[hour][Relay] += 1
columnCounts_2d[ColTotals][Relay] += 1
elif WebmailIP in parsed_data['sendurl1'] and not is_private_ip(parsed_data['ip']):
#webmail
columnCounts_2d[hour][WebMail] += 1
columnCounts_2d[ColTotals][WebMail] += 1
elif localhost in parsed_data['sendurl']:
# but not if it comes from fetchmail
if not FETCHMAIL in parsed_data['sendurl1']:
# might still be from mailman here
if MAILMAN in parsed_data['sendurl1']:
#$mailmansendcount++;
#$localsendtotal++;
columnCounts_2d[hour][MailMan] += 1
columnCounts_2d[ColTotals][MailMan] += 1
#$counts{$abshour}{$CATMAILMAN}++;
#$localflag = 1;
else:
#Or sent to the DMARC server
#check for email address in $DMARC_Report_emails string
#my $logemail = $log_items[4];
if DMARCDomain in parsed_data['from-email']: #(index($DMARC_Report_emails,$logemail)>=0) or
#$localsendtotal++;
DMARCSendCount += 1
#localflag = 1;
else:
# ignore incoming localhost spoofs
if parsed_data['error-msg'] and not 'msg denied before queued' in parsed_data['error-msg']:
#Webmail
#$localflag = 1;
#$WebMailsendtotal++;
columnCounts_2d[hour][WebMail] += 1
columnCounts_2d[ColTotals][WebMail] += 1
#$WebMailflag = 1;
else:
#$localflag = 1;
#$WebMailsendtotal++;
#$WebMailflag = 1;
columnCounts_2d[hour][WebMail] += 1
columnCounts_2d[ColTotals][WebMail] += 1
#Queued email
if parsed_data['action1'] == 'queued':
columnCounts_2d[hour][Ham] += 1
columnCounts_2d[ColTotals][Ham] += 1
# spamassassin not rejected
if parsed_data.get('spam-status') is not None and isinstance(parsed_data['spam-status'], str):
if parsed_data['spam-status'].lower().startswith('no'):
#Extract other parameters from this string
# example: No, score=-3.9
spam_pattern = re.compile(r'score=(-?\d+\.\d+) required=(-?\d+\.\d+)')
match = re.search(spam_pattern, parsed_data['spam-status'])
if match:
score = float(match.group(1))
if score < float(SATagLevel):
# Accumulate allowed score (inc negatives?)
hamavg += score
hamcount += 1
#spamassasin rejects
Isqueuedspam = False;
if parsed_data.get('spam-status') is not None and isinstance(parsed_data['spam-status'], str):
if parsed_data['spam-status'].lower().startswith('yes'):
#Extract other parameters from this string
# example: Yes, score=10.3 required=4.0 autolearn=disable
spam_pattern = re.compile(r'score=(-?\d+\.\d+) required=(-?\d+\.\d+)')
match = re.search(spam_pattern, parsed_data['spam-status'])
if match:
score = float(match.group(1))
required = float(match.group(2))
if score >= SARejectLevel:
columnCounts_2d[hour][DelSpam] += 1
columnCounts_2d[ColTotals][DelSpam] += 1
rejectspamavg += score
rejectspamcount += 1
elif score >= required:
columnCounts_2d[hour][QuedSpam] += 1
columnCounts_2d[ColTotals][QuedSpam] += 1
spamavg += score
spamqueuedcount += 1
Isqueuedspam = True #for recipient stats below
# Count the qpsmtpd codes
if parsed_data['error-plugin'].strip() == 'naughty':
if parsed_data['error-msg'].startswith("(dnsbl)"):
columnCounts_2d[hour][RBLDNS]+= 1
columnCounts_2d[ColTotals][RBLDNS]+= 1
elif parsed_data['error-msg'].startswith("(karma)"):
columnCounts_2d[hour][KARMA] += 1
columnCounts_2d[ColTotals][KARMA]+= 1
elif parsed_data['error-msg'].startswith("(helo)"):
columnCounts_2d[hour][RBLDNS] += 1
columnCounts_2d[ColTotals][RBLDNS]+= 1
else:
match = qpcodes_pattern.match(parsed_data['action1'])
if match:
rejReason = match.group(1)
found_qpcodes[parsed_data['error-plugin']+"-"+rejReason] += 1
else:
found_qpcodes[parsed_data['action1']] += 1
#Check for blacklist rejection
error_plugin = parsed_data['error-plugin'].strip()
if error_plugin == 'rhsbl' or error_plugin == 'dnsbl':
blacklist_domain = extract_blacklist_domain(parsed_data['sender'])
if blacklist_domain:
blacklist_found[blacklist_domain] += 1
#Log the recipients and deny or accept and spam-tagged counts
# Try to find an existing record for the email
action = parsed_data["action1"] # Extract action
if parsed_data['error-plugin'] == 'check_smtp_forward':
#extract rejected email address from sender
match = re.search(email_pattern, parsed_data['sender'])
# If a match is found, return the email address
if match:
email = match.group(0)
else:
email = "unknown (no email found in smtp reject message)"
elif parsed_data['error-plugin'] == 'check_badcountries':
email = "Unknown (Bad Country)"
elif not is_private_ip(parsed_data['ip']) and parsed_data["to-email"]:
#Only look at internal recipients from outside
#Take out the chevrons
email = parsed_data["to-email"].replace('<', '').replace('>', '')
email = get_first_email_with_domain(email,DomainName) # Extract email
if not email:
logging.error(f"Incoming email with no internal email address: {email} {DomainName}")
email = "Unknown (no internal email found)"
else:
if not is_private_ip(parsed_data['ip']):
email = "Unknown (non conf?)"
else:
email = None
if email:
record = next((item for item in recipients_found if item['email'] == email), None)
if not record:
# If email is not in the array, we add it
record = {"email": email,"accept": 0,"deny": 0,"spam-tagged": 0}
recipients_found.append(record)
# Update the deny or accept count based on action
if action != "queued":
record["deny"] += 1
else:
record["accept"] += 1
#and see if it is spam tagged
if Isqueuedspam:
record["spam-tagged"] += 1
#Now increment the column which the plugin name indicates
if parsed_data['error-msg'] and "msg denied before queued" in parsed_data['error-msg'] and parsed_data['virus']:
if parsed_data['error-plugin']:
row = search_2d_list(parsed_data['error-plugin'],columnPlugin)
if not row == -1:
columnCounts_2d[hour][row] += 1
columnCounts_2d[ColTotals][row] += 1
# a few ad hoc extra extractons of data
if row == Virus:
match = virus_pattern.match(parsed_data['virus'])
if match:
found_viruses[match.group(1)] += 1
else:
found_viruses[parsed_data['virus']] += 1
else:
found_qpcodes[parsed_data['error-plugin']] += 1
if isThonny:
logging.error() #seperate the [progress bar]
if count_ignored_mailstats > 0:
logging.debug(f"Ignored {count_ignored_mailstats} mailstats emails")
# Compute percentages
total_Count = columnCounts_2d[ColTotals][TOTALS]
#Column of percentages
for row in range(ColTotals):
if total_Count == 0:
percentage_of_total = 0
else:
percentage_of_total = f"{round(round(columnCounts_2d[row][TOTALS] / total_Count,4) * 100,1)}%"
columnCounts_2d[row][PERCENT] = percentage_of_total
#Row of percentages
for col in range(TOTALS):
if total_Count == 0:
percentage_of_total = 0
else:
percentage_of_total = f"{round(round(columnCounts_2d[ColTotals][col] / total_Count,4) * 100,1)}%"
columnCounts_2d[ColPercent][col] = percentage_of_total
# and drop in the 100% to make it look correct!
columnCounts_2d[ColPercent][PERCENT] = '100%'
columnCounts_2d[ColTotals][PERCENT] = '100%'
columnCounts_2d[ColPercent][TOTALS] = '100%'
#other stats
emailperhour = (totalexamined / 24)
if not spamqueuedcount == 0:
spamavg = spamavg / spamqueuedcount
if not rejectspamcount == 0:
rejectspamavg = rejectspamavg / rejectspamcount
if not hamcount == 0:
hamavg = hamavg / hamcount
# Now scan for the other lines in the log of interest
found_countries = defaultdict(int)
geoip_pattern = re.compile(r".*check_badcountries: GeoIP Country: (.*)")
dmarc_pattern = re.compile(r".*dmarc: pass")
helo_pattern = re.compile(r".*Accepted connection.*?from (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) \/ ([\w.-]+)")
connect_type_pattern = re.compile(r".*connect via (.*)")
tls_type_pattern = re.compile(r".*Go ahead with (.*)")
total_countries = 0
DMARCOkCount = 0
totalinternalsmtpsessions = 0
totalexternalsmtpsessions = 0
i = 0
j = 0
log_len = len(log_entries)
connection_type_counts = defaultdict(int)
if log_len > 0:
if isThonny:
print_progress_bar(0, log_len, prefix='Progress:', suffix='Complete', length=50)
for data in log_entries:
i += 1
if isThonny:
print_progress_bar(i, log_len, prefix='Scanning for sub tables:', suffix='Complete', length=50)
# Match initial connection message
IsInternal = True
try:
match = helo_pattern.match(data['MESSAGE'])
if match:
ip = match.group(1)
fqdn = match.group(2)
if is_private_ip(ip):
totalinternalsmtpsessions += 1
else:
totalexternalsmtpsessions += 1
IsInternal = False
continue
except Exception as e:
logging.error(f" Helo pattern error {e} {data['MESSAGE']} {analysis_date}")
continue
#Pull out Geoip countries for analysis table
try:
match = geoip_pattern.match(data['MESSAGE'])
if match:
j += 1
country = match.group(1)
found_countries[country] += 1
total_countries += 1
continue
except Exception as e:
logging.error(f"Geoip pattern error {e} {data['MESSAGE']} {analysis_date}")
continue
#Pull out DMARC approvals
match = dmarc_pattern.match(data['MESSAGE'])
if match:
DMARCOkCount += 1
continue
#Pull out type of connection
match = connect_type_pattern.match(data['MESSAGE'])
if match:
connection_type = match.group(1)
connection_type_counts[connection_type] += 1
continue
match = tls_type_pattern.match(data['MESSAGE'])
if match:
connection_type = match.group(1)
connection_type_counts[connection_type] += 1
continue
#Compute next and previous dates
day_format = "%Y-%m-%d"
# Convert the time string to a datetime object
date_obj = datetime.strptime(analysis_date, day_format)
# Compute the next date by adding one day
next_date = date_obj + timedelta(days=1)
# Compute the previous date by subtracting one day
previous_date = date_obj - timedelta(days=1)
# Convert the datetime objects back to strings in the desired format
next_date_str = next_date.strftime(day_format)
previous_date_str = previous_date.strftime(day_format)
# Create graphs of data
# yLabels = [f'{i:02d}:00' for i in range(len(columnCounts_2d))]
# stacked_Bar_html = create_stacked_bar_graph(columnCounts_2d,columnHeaders,yLabels,html_page_dir+'stacked_bar_'+analysis_date+'.html')
# heatmap_html = create_heatmap(columnCounts_2d,columnHeaders,yLabels,html_page_dir+'heatmap_'+analysis_date+'.html')
# line_graph_html = create_line_chart(columnCounts_2d,columnHeaders,yLabels,html_page_dir+'line_graph_'+analysis_date+'.html')
columnCounts_2d_dict = transform_to_dict(columnCounts_2d,columnHeaders,analysis_date)
#Export as json for testing
# with open("/opt/mailstats/html/colCounts_2d.json", "w") as json_file:
# json.dump(columnCounts_2d, json_file)
# with open("/opt/mailstats/html/colCounts_2d-dict", "w") as json_file:
# json.dump(columnCounts_2d_dict, json_file)
# with open("/opt/mailstats/html/keys.json", "w") as json_file:
# json.dump(columnHeaders, json_file)
if enable_graphs:
create_graph(columnCounts_2d_dict, "line", html_page_dir+"line_graph_"+analysis_date+".png",analysis_date)
create_graph(columnCounts_2d_dict, "bar", html_page_dir+"bar_graph_"+analysis_date+".png",analysis_date)
create_graph(columnCounts_2d_dict, "scatter", html_page_dir+"scatter_graph_"+analysis_date+".png",analysis_date)
create_graph(columnCounts_2d_dict, "pie", html_page_dir+"pie_chart_"+analysis_date+".png",analysis_date)
#Now apply the results to the chameleon template - main table
# Path to the template file
template_path = template_dir+'mailstats.html.pt'
# Load the template
with open(template_path, 'r') as template_file:
template_content = template_file.read()
#Use the hello string to create a suitable heading for the web page
html_title = hello_string.replace("printed at:"," <span class='greyed-out'>printed at:")
html_title += "</span>"
# Create a Chameleon template instance
try:
template = PageTemplate(template_content)
# Render the template with the 2D array data and column headers
try:
rendered_html = template(array_2d=columnCounts_2d, column_headers=columnHeaders,
reporting_date=analysis_date,
title=html_title,
version=version_string,
nolinks=nolinks,
PreviousDate=previous_date_str,
NextDate=next_date_str,
DomainName=DomainName,
SystemName=SystemName,
enable_graphs=enable_graphs
)
except Exception as e:
logging.error(f"Chameleon template Exception {e}")
except Exception as e:
logging.error(f"Chameleon render Exception {e}")
total_html = rendered_html
# Add in the header information
header_rendered_html1,header_rendered_html2 = get_heading()
total_html = insert_string_after(total_html,header_rendered_html1, "<!---Add in table1 information here -->")
total_html = insert_string_after(total_html,header_rendered_html2, "<!---Add in table2 information here -->")
header_rendered_html = header_rendered_html1 + header_rendered_html2
#add in the subservient tables..(remeber they appear in the reverse order of below!)
#virus codes
virus_headers = ["Virus",'Count','Percent']
virus_title = 'Viruses found'
virus_rendered_html = render_sub_table(virus_title,virus_headers,found_viruses,suppress_threshold=True)
# Add it to the total
total_html = insert_string_after(total_html,virus_rendered_html, "<!---Add in sub tables here -->")
#qpsmtd codes
qpsmtpd_headers = ["Reason",'Count','Percent']
qpsmtpd_title = 'Qpsmtpd codes league table'
qpsmtpd_rendered_html = render_sub_table(qpsmtpd_title,qpsmtpd_headers,found_qpcodes)
# Add it to the total
total_html = insert_string_after(total_html,qpsmtpd_rendered_html, "<!---Add in sub tables here -->")
#Junk mails
junk_mail_count_headers = ['Username','Count', 'Percent']
junk_mail_counts = scan_mail_users()
junk_mail_count_title = 'Junk mail counts'
junk_rendered_html = render_sub_table(junk_mail_count_title,junk_mail_count_headers,junk_mail_counts,suppress_threshold=True)
# Add it to the total
total_html = insert_string_after(total_html,junk_rendered_html, "<!---Add in sub tables here -->")
#Recipient counts
recipient_count_headers = ["Email",'Queued','Rejected','Spam tagged','Accepted Percent']
recipient_count_title = 'Incoming email recipients'
recipient_rendered_html = render_sub_table(recipient_count_title,recipient_count_headers,recipients_found,suppress_threshold=True)
# Add it to the total
total_html = insert_string_after(total_html,recipient_rendered_html, "<!---Add in sub tables here -->")
#Geoip Country codes
geoip_headers = ['Country','Count','Percent','Rejected?']
geoip_title = 'Geoip results'
geoip_rendered_html = render_sub_table(geoip_title,geoip_headers,found_countries,get_character_in_reject_list)
# Add it to the total
total_html = insert_string_after(total_html,geoip_rendered_html, "<!---Add in sub tables here -->")
#Blacklist counts
blacklist_headers = ['URL','Count','Percent']
blacklist_title = 'Blacklist used'
blacklist_rendered_html = render_sub_table(blacklist_title,blacklist_headers,blacklist_found,suppress_threshold=True)
# Add it to the total
total_html = insert_string_after(total_html,blacklist_rendered_html, "<!---Add in sub tables here -->")
if saveData:
# Close the connection
cursor.close()
conn.close()
# Write the rendered HTML to a file
output_path = html_page_dir+'mailstats_for_'+analysis_date
output_path = output_path.replace(' ','_')
with open(output_path+'.html', 'w') as output_file:
output_file.write(total_html)
#and create a text version if the local version of html2text is suffiicent
if get_html2text_version() == '2019.9.26':
# Get temporary file
temp_file_name = tempfile.mktemp()
temp_file_name1 = tempfile.mktemp()
# see if html has links in the table entries, if not then use the current html file, else generate one
if not nolinks:
# i.e. links in html
# Render the template with the 2D array data and column headers
try:
rendered_html = template(array_2d=columnCounts_2d, column_headers=columnHeaders,
reporting_date=analysis_date,
title="",
version=version_string,
nolinks=True,
PreviousDate=previous_date_str,
NextDate=next_date_str,
DomainName=DomainName,
SystemName=SystemName,
enable_graphs=False
)
except Exception as e:
logging.error(f"Chameleon template Exception {e}")
# Need to add the sub tables
full_rendered_html = ''.join([
html_title+"<br />",
header_rendered_html,
rendered_html,
blacklist_rendered_html,
geoip_rendered_html,
recipient_rendered_html,
junk_rendered_html,
qpsmtpd_rendered_html,
virus_rendered_html
])
# delete next and prev
start = full_rendered_html.find("Previous")
end = full_rendered_html.find("Table")
full_rendered_html = full_rendered_html[:start] + full_rendered_html[end:]
with open(temp_file_name, 'w') as output_file:
output_file.write(full_rendered_html)
else:
temp_file_name = output_path+'.html'
html_to_text(temp_file_name,temp_file_name1)
logging.debug(f"Rendered HTML saved to {temp_file_name1}")
# and save it if required
if not notextfile:
text_file_path = output_path+'.txt'
# and rename it
os.rename(temp_file_name1, text_file_path)
else:
text_file_path = temp_file_name1
else:
text_file_path = ""
logging.debug(f"Written {count_records_to_db} records to DB")
html_content = None
text_content = None
#Now see if Email required
if EmailTextorHTML:
if EmailTextorHTML == "HTML" or EmailTextorHTML == "Both":
# Send html email (default))
filepath = html_page_dir+"mailstats_for_"+analysis_date+".html"
html_content = read_html_from_file(filepath)
# Replace the Navigation by a "See in browser" prompt
replace_str = f"<div class='divseeinbrowser'><a class='seeinbrowser' href='http://{SystemName}.{DomainName}/mailstats/mailstats_for_{analysis_date}.html'>See in browser</a></div>"
html_content = replace_between(html_content, "<div class='linksattop'>", ">Next</a></div>", replace_str)
if not noemailfile:
# Write out the email html to a web page
email_file = html_page_dir + "Email_mailstats_for_"+analysis_date
with open(email_file+'.html', 'w') as output_file:
output_file.write(html_content)
if EmailTextorHTML == "Text" or EmailTextorHTML == "Both":
#filepath = html_page_dir+"mailstats_for_"+analysis_date+".txt"
if not text_file_path == "":
text_content = read_text_from_file(text_file_path)
else:
text_content = "No text avaiable (as html2text was not installed) "
if EMailSMTPUser:
# Send authenticated
logging.debug("Sending authenticated")
send_email(
subject="Mailstats for "+analysis_date,
from_email="mailstats@"+DomainName,
to_email=EmailAddress,
smtp_server=EmailHost,
smtp_port=EmailPort,
HTML_content=html_content,
Text_content=text_content,
smtp_user=EMailSMTPUser,
smtp_password=EMailSMTPPassword
)
else:
# No authentication
logging.debug(f"Sending non authenticated {EmailAddress} {EmailHost}")
try:
send_email(
subject="Mailstats for "+analysis_date,
from_email="mailstats@"+DomainName,
to_email=EmailAddress,
smtp_server=EmailHost,
smtp_port=EmailPort,
HTML_content=html_content,
Text_content=text_content
)
except Exception as e:
logging.error(f"Email Exception {e}")
finish_time = datetime.now()
duration = (finish_time - start_time).total_seconds()
logging.info(
f"Mailstats finished at {finish_time.strftime('%Y-%m-%d %H:%M:%S')}"+f" Time taken: {duration:.2f} seconds"
)