diff --git a/root/usr/bin/mailstats.py b/root/usr/bin/mailstats.py index a0fee4b..7a4b7b2 100644 --- a/root/usr/bin/mailstats.py +++ b/root/usr/bin/mailstats.py @@ -4,11 +4,16 @@ # # This script provides daily SpamFilter statistics. # +# Mailstats +# +# optional arguments: +# -h, --help show this help message and exit +# -d DATE, --date DATE Specify a valid date (yyyy-mm-dd) for the analysis +# # Re-written in python from Mailstats.pl (Perl) to conform to SME11 / Postfix / qpsmtpd log formats # and html output added # # Todo -# 1. Make "yesterday" parameterised # 2 Other stats # 3. Extra bits for sub tables # @@ -172,32 +177,18 @@ def truncate_microseconds(timestamp): # Remove the microseconds completely if they exist return truncated_timestamp.split('.')[0] -# def filter_yesterdays_entries(log_entries): - # # Determine yesterday's date - # yesterday = (datetime.datetime.now() - datetime.timedelta(days=1)).date() - # # Filter entries for yesterday's date - # yesterday_entries = [] - # for timestamp, data in log_entries: - # truncated_timestamp = truncate_microseconds(timestamp) - # entry_date = datetime.datetime.strptime(truncated_timestamp, '%Y-%m-%d %H:%M:%S').date() - # if entry_date == yesterday: - # parsed_data = parse_data(data) - # yesterday_entries.append((truncated_timestamp, parsed_data)) - - # return yesterday_entries - -def read_in_yesterday_log_file(file_path): +def read_in_relevant_log_file(file_path,analysis_date=yesterday): # Read the file and split each line into a list - timestamp and the rest - # Get current date and calculate yesterday's date log_entries = [] - skip_record_count = 0; + skip_record_count = 0 + ignore_record_count = 0 with codecs.open(file_path, 'rb','utf-8', errors='replace') as file: try: for Line in file: #extract time stamp try: entry = split_timestamp_and_data(Line) - # compare with yesterday + # compare with anal date timestamp_str = truncate_microseconds(entry[0]) except ValueError as e: #print(f"ValueError {e} on timestamp create {timestamp_str}:{entry[0]} {entry[1]}") @@ -209,12 +200,16 @@ def read_in_yesterday_log_file(file_path): timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S") except ValueError as e: print(f"ValueError {e} on timestamp extract {timestamp_str}:{entry[1]}") - if timestamp.date() == yesterday.date(): + #print(f"{timestamp.date()} {analysis_date.date()}") + #quit() + if timestamp.date() == analysis_date.date(): log_entries.append((timestamp, entry[1])) + else: + ignore_record_count += 1 except UnicodeDecodeError as e: #print(f"{Line} {len(log_entries)} {e} ") pass - return [log_entries,skip_record_count] + return [log_entries,skip_record_count,ignore_record_count] def filter_summary_records(log_entries): # Return just the summary records @@ -236,31 +231,6 @@ def sort_log_entries(log_entries): sorted_dict = {entry[0]: entry[1] for entry in sorted_entries} return sorted_dict - -# def read_and_filter_yesterday_log(file_path): - # # Read the file and split each line into a dictionary - # log_entries = [] - # with open(file_path, 'r') as file: - # for line in file: - # if '`' in line: - # parts = line.split(' ') - # if parts: - # # Combine parts to form the complete timestamp - # timestamp = ' '.join(parts[:2]) - # data = ' '.join(parts[2:]) # The rest of the line after date and time - # log_entries.append((timestamp, data)) - - # # Filter the entries to keep only those from yesterday - # filtered_entries = filter_yesterdays_entries(log_entries) - - # # Sort the filtered log entries based on the truncated timestamp - # sorted_entries = sorted(filtered_entries, key=lambda x: datetime.datetime.strptime(x[0], '%Y-%m-%d %H:%M:%S')) - - # # Create a dictionary - # sorted_dict = {entry[0]: entry[1] for entry in sorted_entries} - - # return sorted_dict - def parse_data(data): # Split data string into parts and map to named fields. # Adjust the field names and parsing logic according to your data format. @@ -619,6 +589,7 @@ if __name__ == "__main__": # Command line parameters parser = argparse.ArgumentParser(description="Mailstats") parser.add_argument('-d', '--date', help='Specify a valid date (yyyy-mm-dd) for the analysis', default=formatted_yesterday) + parser.add_argument('-ef', '--emailfile', help='Save an html file of the email sent (y/N)', default='n') args = parser.parse_args() analysis_date = args.date # and check its format is valid @@ -630,6 +601,8 @@ if __name__ == "__main__": #print(analysis_date) #quit() + anaysis_date_obj = datetime.strptime(analysis_date, '%Y-%m-%d') + noemailfile = args.emailfile.lower() == 'n' isThonny = is_running_under_thonny() #E-Smith Config DBs @@ -677,12 +650,12 @@ if __name__ == "__main__": num_hours = 25 # Represents hours from 0 to 23 - adds extra one for column totals and another for percentages log_file = logs_dir+'current.log' - log_entries,skip_count = read_in_yesterday_log_file(log_file) + log_entries,skip_count,ignored_count = read_in_relevant_log_file(log_file,anaysis_date_obj) # if len(log_entries) == 0: # print(f"No records found in {log_file}") # quit() # else: - print(f"Found {len(log_entries)} entries in log for for {analysis_date} skipped {skip_count}") + print(f"Found {len(log_entries)} entries in log for for {anaysis_date_obj.strftime('%Y-%m-%d')} Ignored: {ignored_count} skipped: {skip_count}") summary_log_entries,skip_count = filter_summary_records(log_entries) print(f"Found {len(summary_log_entries)} summary entries and skipped {skip_count} entries") sorted_log_dict = sort_log_entries(summary_log_entries) @@ -756,25 +729,26 @@ if __name__ == "__main__": columnCounts_2d[hour][Ham] += 1 columnCounts_2d[ColTotals][Ham] += 1 #spamassasin - if parsed_data['spam-status'].lower().startswith('yes'): - #Extract other parameters from this string - # example: Yes, score=10.3 required=4.0 autolearn=disable - spam_pattern = r'score=([\d.]+)\s+required=([\d.]+)' - match = re.search(spam_pattern, parsed_data['spam-status']) - if match: - score = float(match.group(1)) - required = float(match.group(2)) - #print(f"{parsed_data['spam-status']} / {score} {required}") - if score >= SARejectLevel: - columnCounts_2d[hour][DelSpam] += 1 - columnCounts_2d[ColTotals][DelSpam] += 1 - elif score >= required: - columnCounts_2d[hour][QuedSpam] += 1 - columnCounts_2d[ColTotals][QuedSpam] += 1 - #Local send - elif DomainName in parsed_data['sendurl']: - columnCounts_2d[hour][Local] += 1 - columnCounts_2d[ColTotals][Local] += 1 + if parsed_data.get('spam-status') is not None and isinstance(parsed_data['spam-status'], str): + if parsed_data['spam-status'].lower().startswith('yes'): + #Extract other parameters from this string + # example: Yes, score=10.3 required=4.0 autolearn=disable + spam_pattern = r'score=([\d.]+)\s+required=([\d.]+)' + match = re.search(spam_pattern, parsed_data['spam-status']) + if match: + score = float(match.group(1)) + required = float(match.group(2)) + #print(f"{parsed_data['spam-status']} / {score} {required}") + if score >= SARejectLevel: + columnCounts_2d[hour][DelSpam] += 1 + columnCounts_2d[ColTotals][DelSpam] += 1 + elif score >= required: + columnCounts_2d[hour][QuedSpam] += 1 + columnCounts_2d[ColTotals][QuedSpam] += 1 + #Local send + elif DomainName in parsed_data['sendurl']: + columnCounts_2d[hour][Local] += 1 + columnCounts_2d[ColTotals][Local] += 1 #Relay or webmail elif not is_private_ip(parsed_data['ip']) and is_private_ip(parsed_data['sendurl1']) and parsed_data['action1'] == 'queued': @@ -875,8 +849,6 @@ if __name__ == "__main__": dmarc_pattern = re.compile(r".*dmarc: pass") total_countries = 0 DMARCOkCount = 0 - # Pick up all log_entries = read_yesterday_log_file(data_file) - #sorted_log_dict = sort_log_entries(log_entries) i = 0 j = 0 @@ -982,17 +954,17 @@ if __name__ == "__main__": # Send html email (default)) filepath = html_page_dir+"mailstats_for_"+analysis_date+".html" html_content = read_html_from_file(filepath) - print(len(html_content)) + #print(len(html_content)) # Replace the Navigation by a "See in browser" prompt replace_str = f"
" - print(len(replace_str)) - print(len(html_content)) + #print(len(replace_str)) + #print(len(html_content)) html_content = replace_between(html_content, "