Refactor import and categorise for re-use of data in second scan

This commit is contained in:
Brian Read 2024-06-03 16:15:27 +01:00
parent ad1962753b
commit b2440be6d0

View File

@ -10,7 +10,7 @@
# Todo
# 1. Make "yesterday" parameterised
#
import datetime
from datetime import datetime, timedelta
import sys
from chameleon import PageTemplateFile,PageTemplate
import pkg_resources
@ -25,6 +25,9 @@ Mailstats_version = '1.2'
script_dir = os.path.dirname(os.path.abspath(__file__))
data_file_path = script_dir+'/../../../'
now = datetime.now()
yesterday = now - timedelta(days=1)
formatted_yesterday = yesterday.strftime("%Y-%m-%d")
# Column numbering
Hour = 0
@ -68,59 +71,109 @@ def is_private_ip(ip):
def truncate_microseconds(timestamp):
# Split timestamp into main part and microseconds
try:
main_part, microseconds = timestamp.split('.')
# Truncate the last three digits of the microseconds
truncated_microseconds = microseconds[:-3]
# Combine the main part and truncated microseconds
truncated_timestamp = f"{main_part}.{truncated_microseconds}"
except Exception as e:
print(f"{e} {timestamp}")
raise ValueError
# Remove the microseconds completely if they exist
return truncated_timestamp.split('.')[0]
def filter_yesterdays_entries(log_entries):
# Determine yesterday's date
yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).date()
# def filter_yesterdays_entries(log_entries):
# # Determine yesterday's date
# yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).date()
# # Filter entries for yesterday's date
# yesterday_entries = []
# for timestamp, data in log_entries:
# truncated_timestamp = truncate_microseconds(timestamp)
# entry_date = datetime.datetime.strptime(truncated_timestamp, '%Y-%m-%d %H:%M:%S').date()
# if entry_date == yesterday:
# parsed_data = parse_data(data)
# yesterday_entries.append((truncated_timestamp, parsed_data))
# Filter entries for yesterday's date
yesterday_entries = []
for timestamp, data in log_entries:
truncated_timestamp = truncate_microseconds(timestamp)
entry_date = datetime.datetime.strptime(truncated_timestamp, '%Y-%m-%d %H:%M:%S').date()
if entry_date == yesterday:
parsed_data = parse_data(data)
yesterday_entries.append((truncated_timestamp, parsed_data))
# return yesterday_entries
return yesterday_entries
def read_and_filter_yesterday_log(file_path):
# Read the file and split each line into a dictionary
def read_in_yesterday_log_file(file_path):
# Read the file and split each line into a list - timestamp and the rest
# Get current date and calculate yesterday's date
log_entries = []
skip_record_count = 0;
with open(file_path, 'r') as file:
for line in file:
if '`' in line:
parts = line.split(' ')
if parts:
# Combine parts to form the complete timestamp
timestamp = ' '.join(parts[:2])
data = ' '.join(parts[2:]) # The rest of the line after date and time
log_entries.append((timestamp, data))
for Line in file:
#extract time stamp
try:
entry = split_timestamp_and_data(Line)
# compare with yesterday
timestamp_str = truncate_microseconds(entry[0])
except ValueError as e:
#print(f"ValueError {e} on timestamp create {timestamp_str}:{entry[0]} {entry[1]}")
skip_record_count += 1
continue
# Parse the timestamp string into a datetime object
# Ignoring extra microseconds
try:
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
except ValueError as e:
print(f"ValueError {e} on timestamp extract {timestamp_str}:{entry[1]}")
if timestamp.date() == yesterday.date():
log_entries.append((timestamp, entry[1]))
return [log_entries,skip_record_count]
# Filter the entries to keep only those from yesterday
filtered_entries = filter_yesterdays_entries(log_entries)
def filter_summary_records(log_entries):
# Return just the summary records
filtered_log_entries = []
skipped_entry_count = 0
for line in log_entries:
#print(line)
#quit()
if '`' in line[1]:
filtered_log_entries.append(line)
else:
skipped_entry_count += 1
return [filtered_log_entries,skipped_entry_count]
# Sort the filtered log entries based on the truncated timestamp
sorted_entries = sorted(filtered_entries, key=lambda x: datetime.datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S'))
# Create a dictionary
def sort_log_entries(log_entries):
# Sort the records, based on the timestamp
sorted_entries = sorted(log_entries, key=lambda x: x[0])
# and return a dictionary
sorted_dict = {entry[0]: entry[1] for entry in sorted_entries}
return sorted_dict
# def read_and_filter_yesterday_log(file_path):
# # Read the file and split each line into a dictionary
# log_entries = []
# with open(file_path, 'r') as file:
# for line in file:
# if '`' in line:
# parts = line.split(' ')
# if parts:
# # Combine parts to form the complete timestamp
# timestamp = ' '.join(parts[:2])
# data = ' '.join(parts[2:]) # The rest of the line after date and time
# log_entries.append((timestamp, data))
# # Filter the entries to keep only those from yesterday
# filtered_entries = filter_yesterdays_entries(log_entries)
# # Sort the filtered log entries based on the truncated timestamp
# sorted_entries = sorted(filtered_entries, key=lambda x: datetime.datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S'))
# # Create a dictionary
# sorted_dict = {entry[0]: entry[1] for entry in sorted_entries}
# return sorted_dict
def parse_data(data):
# Split data string into parts and map to named fields.
# Adjust the field names and parsing logic according to your data format.
# Split at the backtick - before it fields split at space, after, fields split at tab
parts = data.split('`')
#print(parts[0],parts[1])
#print(f"{parts[0]}:{parts[1]}")
fields1 = parts[0].strip().split() if len(parts) > 0 else []
fields2 = parts[1].split('\t') if len(parts) > 1 else []
# then merge them
@ -251,6 +304,8 @@ def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, lengt
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
if total == 0:
raise ValueError("Progress total is zero")
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + '-' * (length - filled_length)
@ -269,17 +324,36 @@ def insert_string_after(original:str, to_insert:str, after:str) -> str:
:return: The new string with to_insert inserted after after.
"""
position = original.find(after)
print(position)
#print(position)
if position == -1:
# 'after' string is not found in 'original'
return original
print(f"{len(after)}")
#print(f"{len(after)}")
# Position of the insertion point
insert_pos = position + len(after)
return original[:insert_pos] + to_insert + original[insert_pos:]
def split_timestamp_and_data(log_entry: str) -> list:
"""
Split a log entry into timestamp and the rest of the data.
:param log_entry: The log entry as a string.
:return: A list with two entries: [timestamp, rest_of_data].
"""
# The timestamp is always the first part, up to the first space after the milliseconds
parts = log_entry.split(' ', 2)
if len(parts) < 3:
raise ValueError(f"The log entry format is incorrect {parts}")
timestamp = ' '.join(parts[:2])
rest_of_data = parts[2]
#print(f"{timestamp} {rest_of_data}")
return [timestamp, rest_of_data]
if __name__ == "__main__":
try:
chameleon_version = pkg_resources.get_distribution("Chameleon").version
@ -287,10 +361,8 @@ if __name__ == "__main__":
chameleon_version = "Version information not available"
python_version = sys.version
python_version = python_version[:8]
current_datetime = datetime.datetime.now()
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M")
yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).date()
formatted_yesterday = yesterday.strftime("%Y-%m-%d")
#From SMEServer DB
DomainName = 'bjsystems.co.uk' # $cdb->get('DomainName')->value;
@ -306,14 +378,25 @@ if __name__ == "__main__":
MAILMAN = "bounces"; #sender when mailman sending when orig is localhost
DMARCDomain="dmarc"; #Pattern to recognised DMARC sent emails (this not very reliable, as the email address could be anything)
DMARCOkPattern="dmarc: pass"; #Pattern to use to detect DMARC approval
hello_string = "Mailstats:"+Mailstats_version+' for '+DomainName+" at "+formatted_datetime
hello_string = "Mailstats:"+Mailstats_version+' for '+DomainName+" at "+formatted_datetime+" for "+formatted_yesterday
print(hello_string)
version_string = "Chameleon:"+chameleon_version+" Python:"+python_version
print(version_string)
num_hours = 25 # Represents hours from 0 to 23 - adds extra one for column totals and another for percentages
data_file = data_file_path+'current.log'
sorted_log_dict = read_and_filter_yesterday_log(data_file)
log_entries,skip_count = read_in_yesterday_log_file(data_file)
if len(log_entries) == 0:
print(f"No records found in {data_file}")
quit()
else:
print(f"Found {len(log_entries)} entries in log for for {formatted_yesterday} skipped {skip_count}")
summary_log_entries,skip_count = filter_summary_records(log_entries)
print(f"Found {len(summary_log_entries)} summary entries and skipped {skip_count} entries")
sorted_log_dict = sort_log_entries(summary_log_entries)
print(f"Sorted {len(sorted_log_dict)} entries")
columnHeaders = ['Count','WebMail','Local','MailMan','Relay','DMARC','Virus','RBL/DNS','Geoip.','Non.Conf.','Karma','Rej.Load','Del.Spam','Qued.Spam?',' Ham','TOTALS','PERCENT']
# dict for each colum identifying plugin that increments count
columnPlugin = [''] * 17
@ -353,13 +436,17 @@ if __name__ == "__main__":
print_progress_bar(0, sorted_len, prefix='Progress:', suffix='Complete', length=50)
for timestamp, data in sorted_log_dict.items():
i += 1
print_progress_bar(i, sorted_len, prefix='Progress:', suffix='Complete', length=50)
print_progress_bar(i, sorted_len, prefix='Scanning for main table:', suffix='Complete', length=50)
#print(f"{i*100/len}%")
# Count of in which hour it falls
#hour = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H')
# Parse the timestamp string into a datetime object
dt = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
dt = timestamp
hour = dt.hour
# parse the data
#print(data)
parsed_data = parse_data(data)
#print(f"parsed_data['action']:{parsed_data['action']}\n")
# Increment Count in which headings it falls
#Hourly count and column total
@ -370,19 +457,19 @@ if __name__ == "__main__":
#Total totals
columnCounts_2d[ColTotals][TOTALS] += 1
#Queued email
if data['action'] == '(queue)':
if parsed_data['action'] == '(queue)':
columnCounts_2d[hour][Ham] += 1
columnCounts_2d[ColTotals][Ham] += 1
#spamassasin
if data['spam-status'].lower().startswith('yes'):
if parsed_data['spam-status'].lower().startswith('yes'):
#Extract other parameters from this string
# example: Yes, score=10.3 required=4.0 autolearn=disable
spam_pattern = r'score=([\d.]+)\s+required=([\d.]+)'
match = re.search(spam_pattern, data['spam-status'])
match = re.search(spam_pattern, parsed_data['spam-status'])
if match:
score = float(match.group(1))
required = float(match.group(2))
#print(f"{data['spam-status']} / {score} {required}")
#print(f"{parsed_data['spam-status']} / {score} {required}")
if score >= SARejectLevel:
columnCounts_2d[hour][DelSpam] += 1
columnCounts_2d[ColTotals][DelSpam] += 1
@ -390,26 +477,26 @@ if __name__ == "__main__":
columnCounts_2d[hour][QuedSpam] += 1
columnCounts_2d[ColTotals][QuedSpam] += 1
#Local send
elif DomainName in data['sendurl']:
elif DomainName in parsed_data['sendurl']:
columnCounts_2d[hour][Local] += 1
columnCounts_2d[ColTotals][Local] += 1
#Relay or webmail
elif not is_private_ip(data['ip']) and is_private_ip(data['sendurl1']) and data['action1'] == 'queued':
elif not is_private_ip(parsed_data['ip']) and is_private_ip(parsed_data['sendurl1']) and parsed_data['action1'] == 'queued':
#Relay
if data['action1'] == 'queued':
if parsed_data['action1'] == 'queued':
columnCounts_2d[hour][Relay] += 1
columnCounts_2d[ColTotals][Relay] += 1
elif WebmailIP in data['sendurl1'] and not is_private_ip(data['ip']):
elif WebmailIP in parsed_data['sendurl1'] and not is_private_ip(parsed_data['ip']):
#webmail
columnCounts_2d[hour][WebMail] += 1
columnCounts_2d[ColTotals][WebMail] += 1
elif localhost in data['sendurl']:
elif localhost in parsed_data['sendurl']:
# but not if it comes from fetchmail
if not FETCHMAIL in data['sendurl1']:
if not FETCHMAIL in parsed_data['sendurl1']:
# might still be from mailman here
if MAILMAN in data['sendurl1']:
if MAILMAN in parsed_data['sendurl1']:
#$mailmansendcount++;
#$localsendtotal++;
columnCounts_2d[hour][MailMan] += 1
@ -420,13 +507,13 @@ if __name__ == "__main__":
#Or sent to the DMARC server
#check for email address in $DMARC_Report_emails string
#my $logemail = $log_items[4];
if DMARCDomain in data['from-email']: #(index($DMARC_Report_emails,$logemail)>=0) or
if DMARCDomain in parsed_data['from-email']: #(index($DMARC_Report_emails,$logemail)>=0) or
#$localsendtotal++;
#$DMARCSendCount++;
localflag = 1;
else:
# ignore incoming localhost spoofs
if not 'msg denied before queued' in data['error-msg']:
if not 'msg denied before queued' in parsed_data['error-msg']:
#Webmail
#$localflag = 1;
#$WebMailsendtotal++;
@ -441,51 +528,58 @@ if __name__ == "__main__":
columnCounts_2d[ColTotals][WebMail] += 1
#Now increment the column which the plugin name indicates
if data ['action'] == '(deny)' and data['error-plugin']:
#print(f"Found plugin {data['error-plugin']}")
if data['error-plugin']:
row = search_2d_list(data['error-plugin'],columnPlugin)
if parsed_data['action'] == '(deny)' and parsed_data['error-plugin']:
#print(f"Found plugin {parsed_data['error-plugin']}")
if parsed_data['error-plugin']:
row = search_2d_list(parsed_data['error-plugin'],columnPlugin)
if not row == -1:
#print(f"Found row: {row}")
columnCounts_2d[hour][row] += 1
columnCounts_2d[ColTotals][row] += 1
# a few ad hoc extra extractons of data
if row == Virus:
match = virus_pattern.match(data['action1'])
match = virus_pattern.match(parsed_data['action1'])
if match:
found_viruses[match.group(1)] += 1
else:
found_viruses[data['action1']] += 1
elif data['error-plugin'] == 'naughty':
match = qpcodes_pattern.match(data['action1'])
found_viruses[parsed_data['action1']] += 1
elif parsed_data['error-plugin'] == 'naughty':
match = qpcodes_pattern.match(parsed_data['action1'])
if match:
rejReason = match.group(1)
found_qpcodes[data['error-plugin']+"-"+rejReason] += 1
found_qpcodes[parsed_data['error-plugin']+"-"+rejReason] += 1
else:
found_qpcodes['Unknown'] += 1
else:
found_qpcodes[data['action1']] += 1
found_qpcodes[parsed_data['action1']] += 1
print()
# Now scan for the other lines in the log of interest
found_countries = defaultdict(int)
geoip_pattern = re.compile(r"check_badcountries: GeoIP Country: (.*)")
dmarc_pattern = re.compile(r"dmarc: pass")
total_countries = 0
DMARCOkCount = 0
with open(data_file, 'r') as file:
# Pick up all log_entries = read_yesterday_log_file(data_file)
sorted_log_dict = sort_log_entries(log_entries)
i = 0
for line in file:
sorted_len = len(sorted_log_dict)
print_progress_bar(0, sorted_len, prefix='Progress:', suffix='Complete', length=50)
for timestamp, data in sorted_log_dict.items():
i += 1
print_progress_bar(i, sorted_len, prefix='Scanning for sub tables:', suffix='Complete', length=50)
#Pull out Geoip countries for analysis table
match = geoip_pattern.match(line)
match = geoip_pattern.match(data)
if match:
country = match.group(1)
found_countries[country] += 1
total_countries += 1
break
#Pull out DMARC approvals
match = dmarc_pattern.match(line)
match = dmarc_pattern.match(data)
if match:
DMARCOkCount += 1
break
@ -539,3 +633,5 @@ if __name__ == "__main__":
html_to_text(output_path+'.html',output_path+'.txt')
print(f"Rendered HTML saved to {output_path}.html/txt")