Sort out failure on spamstats not found

This commit is contained in:
Brian Read 2024-06-16 17:15:23 +01:00
parent 767ade0e0d
commit d5c387d12e

View File

@ -4,11 +4,16 @@
#
# This script provides daily SpamFilter statistics.
#
# Mailstats
#
# optional arguments:
# -h, --help show this help message and exit
# -d DATE, --date DATE Specify a valid date (yyyy-mm-dd) for the analysis
#
# Re-written in python from Mailstats.pl (Perl) to conform to SME11 / Postfix / qpsmtpd log formats
# and html output added
#
# Todo
# 1. Make "yesterday" parameterised
# 2 Other stats
# 3. Extra bits for sub tables
#
@ -172,32 +177,18 @@ def truncate_microseconds(timestamp):
# Remove the microseconds completely if they exist
return truncated_timestamp.split('.')[0]
# def filter_yesterdays_entries(log_entries):
# # Determine yesterday's date
# yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).date()
# # Filter entries for yesterday's date
# yesterday_entries = []
# for timestamp, data in log_entries:
# truncated_timestamp = truncate_microseconds(timestamp)
# entry_date = datetime.datetime.strptime(truncated_timestamp, '%Y-%m-%d %H:%M:%S').date()
# if entry_date == yesterday:
# parsed_data = parse_data(data)
# yesterday_entries.append((truncated_timestamp, parsed_data))
# return yesterday_entries
def read_in_yesterday_log_file(file_path):
def read_in_relevant_log_file(file_path,analysis_date=yesterday):
# Read the file and split each line into a list - timestamp and the rest
# Get current date and calculate yesterday's date
log_entries = []
skip_record_count = 0;
skip_record_count = 0
ignore_record_count = 0
with codecs.open(file_path, 'rb','utf-8', errors='replace') as file:
try:
for Line in file:
#extract time stamp
try:
entry = split_timestamp_and_data(Line)
# compare with yesterday
# compare with anal date
timestamp_str = truncate_microseconds(entry[0])
except ValueError as e:
#print(f"ValueError {e} on timestamp create {timestamp_str}:{entry[0]} {entry[1]}")
@ -209,12 +200,16 @@ def read_in_yesterday_log_file(file_path):
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
except ValueError as e:
print(f"ValueError {e} on timestamp extract {timestamp_str}:{entry[1]}")
if timestamp.date() == yesterday.date():
#print(f"{timestamp.date()} {analysis_date.date()}")
#quit()
if timestamp.date() == analysis_date.date():
log_entries.append((timestamp, entry[1]))
else:
ignore_record_count += 1
except UnicodeDecodeError as e:
#print(f"{Line} {len(log_entries)} {e} ")
pass
return [log_entries,skip_record_count]
return [log_entries,skip_record_count,ignore_record_count]
def filter_summary_records(log_entries):
# Return just the summary records
@ -236,31 +231,6 @@ def sort_log_entries(log_entries):
sorted_dict = {entry[0]: entry[1] for entry in sorted_entries}
return sorted_dict
# def read_and_filter_yesterday_log(file_path):
# # Read the file and split each line into a dictionary
# log_entries = []
# with open(file_path, 'r') as file:
# for line in file:
# if '`' in line:
# parts = line.split(' ')
# if parts:
# # Combine parts to form the complete timestamp
# timestamp = ' '.join(parts[:2])
# data = ' '.join(parts[2:]) # The rest of the line after date and time
# log_entries.append((timestamp, data))
# # Filter the entries to keep only those from yesterday
# filtered_entries = filter_yesterdays_entries(log_entries)
# # Sort the filtered log entries based on the truncated timestamp
# sorted_entries = sorted(filtered_entries, key=lambda x: datetime.datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S'))
# # Create a dictionary
# sorted_dict = {entry[0]: entry[1] for entry in sorted_entries}
# return sorted_dict
def parse_data(data):
# Split data string into parts and map to named fields.
# Adjust the field names and parsing logic according to your data format.
@ -619,6 +589,7 @@ if __name__ == "__main__":
# Command line parameters
parser = argparse.ArgumentParser(description="Mailstats")
parser.add_argument('-d', '--date', help='Specify a valid date (yyyy-mm-dd) for the analysis', default=formatted_yesterday)
parser.add_argument('-ef', '--emailfile', help='Save an html file of the email sent (y/N)', default='n')
args = parser.parse_args()
analysis_date = args.date
# and check its format is valid
@ -630,6 +601,8 @@ if __name__ == "__main__":
#print(analysis_date)
#quit()
anaysis_date_obj = datetime.strptime(analysis_date, '%Y-%m-%d')
noemailfile = args.emailfile.lower() == 'n'
isThonny = is_running_under_thonny()
#E-Smith Config DBs
@ -677,12 +650,12 @@ if __name__ == "__main__":
num_hours = 25 # Represents hours from 0 to 23 - adds extra one for column totals and another for percentages
log_file = logs_dir+'current.log'
log_entries,skip_count = read_in_yesterday_log_file(log_file)
log_entries,skip_count,ignored_count = read_in_relevant_log_file(log_file,anaysis_date_obj)
# if len(log_entries) == 0:
# print(f"No records found in {log_file}")
# quit()
# else:
print(f"Found {len(log_entries)} entries in log for for {analysis_date} skipped {skip_count}")
print(f"Found {len(log_entries)} entries in log for for {anaysis_date_obj.strftime('%Y-%m-%d')} Ignored: {ignored_count} skipped: {skip_count}")
summary_log_entries,skip_count = filter_summary_records(log_entries)
print(f"Found {len(summary_log_entries)} summary entries and skipped {skip_count} entries")
sorted_log_dict = sort_log_entries(summary_log_entries)
@ -756,25 +729,26 @@ if __name__ == "__main__":
columnCounts_2d[hour][Ham] += 1
columnCounts_2d[ColTotals][Ham] += 1
#spamassasin
if parsed_data['spam-status'].lower().startswith('yes'):
#Extract other parameters from this string
# example: Yes, score=10.3 required=4.0 autolearn=disable
spam_pattern = r'score=([\d.]+)\s+required=([\d.]+)'
match = re.search(spam_pattern, parsed_data['spam-status'])
if match:
score = float(match.group(1))
required = float(match.group(2))
#print(f"{parsed_data['spam-status']} / {score} {required}")
if score >= SARejectLevel:
columnCounts_2d[hour][DelSpam] += 1
columnCounts_2d[ColTotals][DelSpam] += 1
elif score >= required:
columnCounts_2d[hour][QuedSpam] += 1
columnCounts_2d[ColTotals][QuedSpam] += 1
#Local send
elif DomainName in parsed_data['sendurl']:
columnCounts_2d[hour][Local] += 1
columnCounts_2d[ColTotals][Local] += 1
if parsed_data.get('spam-status') is not None and isinstance(parsed_data['spam-status'], str):
if parsed_data['spam-status'].lower().startswith('yes'):
#Extract other parameters from this string
# example: Yes, score=10.3 required=4.0 autolearn=disable
spam_pattern = r'score=([\d.]+)\s+required=([\d.]+)'
match = re.search(spam_pattern, parsed_data['spam-status'])
if match:
score = float(match.group(1))
required = float(match.group(2))
#print(f"{parsed_data['spam-status']} / {score} {required}")
if score >= SARejectLevel:
columnCounts_2d[hour][DelSpam] += 1
columnCounts_2d[ColTotals][DelSpam] += 1
elif score >= required:
columnCounts_2d[hour][QuedSpam] += 1
columnCounts_2d[ColTotals][QuedSpam] += 1
#Local send
elif DomainName in parsed_data['sendurl']:
columnCounts_2d[hour][Local] += 1
columnCounts_2d[ColTotals][Local] += 1
#Relay or webmail
elif not is_private_ip(parsed_data['ip']) and is_private_ip(parsed_data['sendurl1']) and parsed_data['action1'] == 'queued':
@ -875,8 +849,6 @@ if __name__ == "__main__":
dmarc_pattern = re.compile(r".*dmarc: pass")
total_countries = 0
DMARCOkCount = 0
# Pick up all log_entries = read_yesterday_log_file(data_file)
#sorted_log_dict = sort_log_entries(log_entries)
i = 0
j = 0
@ -982,17 +954,17 @@ if __name__ == "__main__":
# Send html email (default))
filepath = html_page_dir+"mailstats_for_"+analysis_date+".html"
html_content = read_html_from_file(filepath)
print(len(html_content))
#print(len(html_content))
# Replace the Navigation by a "See in browser" prompt
replace_str = f"<div class='divseeinbrowser' style='text-align:center;'><a class='seeinbrowser' href='http://{DomainName}/mailstats/mailstats_for_{analysis_date}.html'>See in browser</a></div>"
print(len(replace_str))
print(len(html_content))
#print(len(replace_str))
#print(len(html_content))
html_content = replace_between(html_content, "<div class='linksattop'>", ">Next</a></div>", replace_str)
# Write out te email html to a web page
email_file = html_page_dir + "Email_mailstats_for_"+analysis_date
with open(email_file+'.html', 'w') as output_file:
output_file.write(html_content)
#print(html_content)
if not noemailfile:
# Write out te email html to a web page
email_file = html_page_dir + "Email_mailstats_for_"+analysis_date
with open(email_file+'.html', 'w') as output_file:
output_file.write(html_content)
if EmailTextOrHTML == "Text" or EmailTextOrHTML == "Both":
filepath = html_page_dir+"mailstats_for_"+analysis_date+".txt"
text_content = read_text_from_file(filepath)