Fix SQL creation of user and move log anaysis to use python systemd - journal lib
This commit is contained in:
		| @@ -91,4 +91,7 @@ CREATE TABLE IF NOT EXISTS `time` ( | |||||||
| ) ENGINE=MyISAM DEFAULT CHARSET=latin1; | ) ENGINE=MyISAM DEFAULT CHARSET=latin1; | ||||||
|  |  | ||||||
|  |  | ||||||
| grant all privileges on mailstats.* to 'mailstats'@'localhost' identified by 'mailstats'; | CREATE USER 'mailstats'@'localhost' IDENTIFIED BY 'mailstats'; | ||||||
|  | GRANT ALL PRIVILEGES ON mailstats.* TO 'mailstats'@'localhost'; | ||||||
|  | FLUSH PRIVILEGES; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -56,7 +56,7 @@ | |||||||
| # pip3 install numpy | # pip3 install numpy | ||||||
| # pip3 install plotly | # pip3 install plotly | ||||||
| # pip3 install pandas | # pip3 install pandas | ||||||
| # pip3 install matplotlib | # NOTE: No matplotlib | ||||||
| # | # | ||||||
| # Rocky8: (probably - not yet checked this) | # Rocky8: (probably - not yet checked this) | ||||||
| # | # | ||||||
| @@ -90,6 +90,8 @@ import plotly.express as px | |||||||
| import colorsys | import colorsys | ||||||
| import pymysql | import pymysql | ||||||
| import json | import json | ||||||
|  | from systemd import journal | ||||||
|  |  | ||||||
| enable_graphs = True; | enable_graphs = True; | ||||||
| try: | try: | ||||||
| 	import matplotlib.pyplot as plt | 	import matplotlib.pyplot as plt | ||||||
| @@ -136,6 +138,94 @@ PERCENT = TOTALS + 1 | |||||||
| ColTotals = 24 | ColTotals = 24 | ||||||
| ColPercent = 25 | ColPercent = 25 | ||||||
|  |  | ||||||
|  | def get_logs_from_Journalctl(date='yesterday'): | ||||||
|  | 	# JSON-pretty output example from journalctl | ||||||
|  | 	# { | ||||||
|  | 	# "__CURSOR" : "s=21b4f015be0c4f1fb71ac439a8365ee7;i=385c;b=dd778625547f4883b572daf53ae93cd4;m=ca99d6d;t=62d6316802b05;x=71b24e9f19f3b99a", | ||||||
|  | 	# "__REALTIME_TIMESTAMP" : "1738753462774533", | ||||||
|  | 	# "__MONOTONIC_TIMESTAMP" : "212442477", | ||||||
|  | 	# "_BOOT_ID" : "dd778625547f4883b572daf53ae93cd4", | ||||||
|  | 	# "_MACHINE_ID" : "f20b7edad71a44e59f9e9b68d4870b19", | ||||||
|  | 	# "PRIORITY" : "6", | ||||||
|  | 	# "SYSLOG_FACILITY" : "3", | ||||||
|  | 	# "_UID" : "0", | ||||||
|  | 	# "_GID" : "0", | ||||||
|  | 	# "_SYSTEMD_SLICE" : "system.slice", | ||||||
|  | 	# "_CAP_EFFECTIVE" : "1ffffffffff", | ||||||
|  | 	# "_TRANSPORT" : "stdout", | ||||||
|  | 	# "_COMM" : "openssl", | ||||||
|  | 	# "_EXE" : "/usr/bin/openssl", | ||||||
|  | 	# "_HOSTNAME" : "sme11.thereadclan.me.uk", | ||||||
|  | 	# "_STREAM_ID" : "8bb0ef8920af4ae09b424a2e30abcdf7", | ||||||
|  | 	# "SYSLOG_IDENTIFIER" : "qpsmtpd-init", | ||||||
|  | 	# "MESSAGE" : "Generating DH parameters, 2048 bit long safe prime, generator 2", | ||||||
|  | 	# "_PID" : "2850", | ||||||
|  | 	# } | ||||||
|  | 	# and the return from here: | ||||||
|  | 	# { | ||||||
|  | 		# '_TRANSPORT': 'stdout', 'PRIORITY': 6, 'SYSLOG_FACILITY': 3, '_CAP_EFFECTIVE': '0', '_SYSTEMD_SLICE': 'system.slice', | ||||||
|  | 		# '_BOOT_ID': UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f'), '_MACHINE_ID': UUID('f20b7eda-d71a-44e5-9f9e-9b68d4870b19'), | ||||||
|  | 		# '_HOSTNAME': 'sme11.thereadclan.me.uk', '_STREAM_ID': '06c860deea374544a2b561f55394d728', 'SYSLOG_IDENTIFIER': 'qpsmtpd-forkserver', | ||||||
|  | 		# '_UID': 453, '_GID': 453, '_COMM': 'qpsmtpd-forkser', '_EXE': '/usr/bin/perl', | ||||||
|  | 		# '_CMDLINE': '/usr/bin/perl -Tw /usr/bin/qpsmtpd-forkserver -u qpsmtpd -l 0.0.0.0 -p 25 -c 40 -m 5', | ||||||
|  | 		# '_SYSTEMD_CGROUP': '/system.slice/qpsmtpd.service', '_SYSTEMD_UNIT': 'qpsmtpd.service', | ||||||
|  | 		# '_SYSTEMD_INVOCATION_ID': 'a2b7889a307748daaeb60173d31c5e0f', '_PID': 93647, | ||||||
|  | 		# 'MESSAGE': '93647 Connection from localhost [127.0.0.1]', | ||||||
|  | 		# '__REALTIME_TIMESTAMP': datetime.datetime(2025, 4, 2, 0, 1, 11, 668929), | ||||||
|  | 		# '__MONOTONIC_TIMESTAMP': journal.Monotonic(timestamp=datetime.timedelta(11, 53118, 613602), | ||||||
|  | 		# bootid=UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f')), | ||||||
|  | 		# '__CURSOR': 's=21b4f015be0c4f1fb71ac439a8365ee7;i=66d2c;b=465c620236ac4a8b98e91581e8fec68f;m=e9a65ed862;t= | ||||||
|  | 	# } | ||||||
|  | 	""" | ||||||
|  | 	Retrieve and parse journalctl logs for a specific date and units, | ||||||
|  | 	returning them as a sorted list of dictionaries. | ||||||
|  | 	""" | ||||||
|  | 	try: | ||||||
|  | 		# Parse the input date to calculate the start and end of the day | ||||||
|  | 		if date.lower() == "yesterday": | ||||||
|  | 			target_date = datetime.now() - timedelta(days=1) | ||||||
|  | 		else: | ||||||
|  | 			target_date = datetime.strptime(date, "%Y-%m-%d") | ||||||
|  | 		 | ||||||
|  | 		# Define the time range for the specified date | ||||||
|  | 		since = target_date.strftime("%Y-%m-%d 00:00:00") | ||||||
|  | 		until = target_date.strftime("%Y-%m-%d 23:59:59") | ||||||
|  | 		 | ||||||
|  | 		# Convert times to microseconds for querying | ||||||
|  | 		since_microseconds = int(datetime.strptime(since, "%Y-%m-%d %H:%M:%S").timestamp() * 1_000_000) | ||||||
|  | 		until_microseconds = int(datetime.strptime(until, "%Y-%m-%d %H:%M:%S").timestamp() * 1_000_000) | ||||||
|  | 		print(f"{since_microseconds}  {until_microseconds} ") | ||||||
|  | 		 | ||||||
|  | 		# Open the systemd journal | ||||||
|  | 		j = journal.Reader() | ||||||
|  | 		 | ||||||
|  | 		# Set filters for units | ||||||
|  | 		j.add_match(_SYSTEMD_UNIT="qpsmtpd.service") | ||||||
|  | 		j.add_match(_SYSTEMD_UNIT="uqpsmtpd.service") | ||||||
|  | 		j.add_match(_SYSTEMD_UNIT="sqpsmtpd.service") | ||||||
|  | 		 | ||||||
|  | 		# Filter by time range | ||||||
|  | 		j.seek_realtime(since_microseconds // 1_000_000)  # Convert back to seconds for seeking | ||||||
|  | 		 | ||||||
|  | 		# Retrieve logs within the time range | ||||||
|  | 		logs = [] | ||||||
|  | 		for entry in j: | ||||||
|  | 			entry_timestamp = entry.get('__REALTIME_TIMESTAMP', None) | ||||||
|  | 			entry_microseconds = int(entry_timestamp.timestamp() * 1_000_000) | ||||||
|  | 			#print(f"{entry_microseconds}") | ||||||
|  | 			if entry_timestamp and since_microseconds <= entry_microseconds <= until_microseconds: | ||||||
|  | 				logs.append(entry) | ||||||
|  | 		 | ||||||
|  | 		# Sort logs by __REALTIME_TIMESTAMP in ascending order | ||||||
|  | 		sorted_logs = sorted(logs, key=lambda x: x.get("__REALTIME_TIMESTAMP", 0)) | ||||||
|  | 		 | ||||||
|  | 		return sorted_logs | ||||||
|  |  | ||||||
|  | 	except Exception as e: | ||||||
|  | 		print(f"Unexpected error: {e}") | ||||||
|  | 		return {} | ||||||
|  |  | ||||||
|  |  | ||||||
| def transform_to_dict(data, keys, iso_date): | def transform_to_dict(data, keys, iso_date): | ||||||
|     """ |     """ | ||||||
|     Transforms a 26x17 list of lists into a list of dictionaries with specified keys. |     Transforms a 26x17 list of lists into a list of dictionaries with specified keys. | ||||||
| @@ -145,7 +235,7 @@ def transform_to_dict(data, keys, iso_date): | |||||||
|         keys (list): A 1D array specifying the keys for the dictionaries. |         keys (list): A 1D array specifying the keys for the dictionaries. | ||||||
|         iso_date (str): A date in ISO format to prepend to each row number. |         iso_date (str): A date in ISO format to prepend to each row number. | ||||||
|  |  | ||||||
|     Returns: |     Returns:get_JSOON | ||||||
|         list: A list of dictionaries with transformed data. |         list: A list of dictionaries with transformed data. | ||||||
|     """ |     """ | ||||||
|     # Validate input dimensions |     # Validate input dimensions | ||||||
| @@ -446,7 +536,7 @@ def filter_summary_records(log_entries): | |||||||
| 	filtered_log_entries = [] | 	filtered_log_entries = [] | ||||||
| 	skipped_entry_count = 0 | 	skipped_entry_count = 0 | ||||||
| 	for line in log_entries: | 	for line in log_entries: | ||||||
| 		if '`' in line[1]: | 		if '`' in line['MESSAGE']: | ||||||
| 			filtered_log_entries.append(line) | 			filtered_log_entries.append(line) | ||||||
| 		else: | 		else: | ||||||
| 			skipped_entry_count += 1 | 			skipped_entry_count += 1 | ||||||
| @@ -454,9 +544,9 @@ def filter_summary_records(log_entries): | |||||||
| 	 | 	 | ||||||
| def sort_log_entries(log_entries): | def sort_log_entries(log_entries): | ||||||
| 	# Sort the records, based on the timestamp | 	# Sort the records, based on the timestamp | ||||||
| 	sorted_entries = sorted(log_entries, key=lambda x: x[0]) | 	sorted_entries = sorted(log_entries, key=lambda x: x['__REALTIME_TIMESTAMP']) | ||||||
| 	# and return a dictionary | 	# and return a dictionary | ||||||
| 	sorted_dict = {entry[0]: entry[1] for entry in sorted_entries} | 	sorted_dict = {entry['__REALTIME_TIMESTAMP']: entry['MESSAGE'] for entry in sorted_entries} | ||||||
| 	return sorted_dict | 	return sorted_dict | ||||||
| 	 | 	 | ||||||
| def parse_data(data): | def parse_data(data): | ||||||
| @@ -464,10 +554,13 @@ def parse_data(data): | |||||||
| 	# Adjust the field names and parsing logic according to your data format. | 	# Adjust the field names and parsing logic according to your data format. | ||||||
| 	# Split at the backtick - before it fields split at space, after, fields split at tab | 	# Split at the backtick - before it fields split at space, after, fields split at tab | ||||||
| 	parts = data.split('`') | 	parts = data.split('`') | ||||||
|  | 	fields0 = ["",""] | ||||||
| 	fields1 = parts[0].strip().split() if len(parts) > 0 else [] | 	fields1 = parts[0].strip().split() if len(parts) > 0 else [] | ||||||
| 	fields2 = parts[1].split('\t') if len(parts) > 1 else [] | 	fields2 = parts[1].split('\t') if len(parts) > 1 else [] | ||||||
| 	# then merge them | 	# then merge them | ||||||
| 	fields = fields1 + fields2 | 	fields = fields0 + fields1 + fields2 | ||||||
|  | 	print(f"{fields}") | ||||||
|  | 	#quit(1) | ||||||
| #	if fields[4] == ''local'host': | #	if fields[4] == ''local'host': | ||||||
| #		i = 0 | #		i = 0 | ||||||
| #		print(f"len:{len(fields)}") | #		print(f"len:{len(fields)}") | ||||||
| @@ -1073,8 +1166,10 @@ if __name__ == "__main__": | |||||||
| 	 | 	 | ||||||
| 	# Db save control | 	# Db save control | ||||||
| 	saveData = get_value(ConfigDB,"mailstats","SaveDataToMySQL","no") == 'yes' or forceDbSave | 	saveData = get_value(ConfigDB,"mailstats","SaveDataToMySQL","no") == 'yes' or forceDbSave | ||||||
|  | 	print(f"Save Mailstats to DB set:{saveData} ") | ||||||
|  |  | ||||||
| 	if saveData: | 	if saveData: | ||||||
|  | 		# Connect to MySQL DB for saving | ||||||
| 		DBName = "mailstats" | 		DBName = "mailstats" | ||||||
| 		DBHost = get_value(ConfigDB, 'mailstats', 'DBHost', "localhost") | 		DBHost = get_value(ConfigDB, 'mailstats', 'DBHost', "localhost") | ||||||
| 		DBPort = int(get_value(ConfigDB, 'mailstats', 'DBPort', "3306"))  # Ensure port is an integer | 		DBPort = int(get_value(ConfigDB, 'mailstats', 'DBPort', "3306"))  # Ensure port is an integer | ||||||
| @@ -1151,13 +1246,15 @@ if __name__ == "__main__": | |||||||
|  |  | ||||||
| 	num_hours = 25  # Represents hours from 0 to 23 - adds extra one for column totals and another for percentages | 	num_hours = 25  # Represents hours from 0 to 23 - adds extra one for column totals and another for percentages | ||||||
|  |  | ||||||
| 	log_file = logs_dir+'current.log' | 	#log_file = logs_dir+'current.log' | ||||||
| 	log_entries,skip_count,ignored_count = read_in_relevant_log_file(log_file,anaysis_date_obj) | 	#log_entries,skip_count,ignored_count = read_in_relevant_log_file(log_file,anaysis_date_obj) | ||||||
| 	print(f"Found {len(log_entries)} entries in log for for {anaysis_date_obj.strftime('%Y-%m-%d')} Ignored: {ignored_count} skipped: {skip_count}") | 	log_entries = get_logs_from_Journalctl(analysis_date) | ||||||
|  | 	print(f"Found {len(log_entries)} entries in log for for {anaysis_date_obj.strftime('%Y-%m-%d')}")  #Ignored: {ignored_count} skipped: {skip_count}") | ||||||
| 	summary_log_entries,skip_count = filter_summary_records(log_entries) | 	summary_log_entries,skip_count = filter_summary_records(log_entries) | ||||||
| 	print(f"Found {len(summary_log_entries)} summary entries and skipped {skip_count} entries") | 	print(f"Found {len(summary_log_entries)} summary entries and skipped {skip_count} entries") | ||||||
| 	sorted_log_dict = sort_log_entries(summary_log_entries) | 	sorted_log_dict = sort_log_entries(summary_log_entries) | ||||||
| 	print(f"Sorted {len(sorted_log_dict)} entries") | 	print(f"Sorted {len(sorted_log_dict)} entries") | ||||||
|  | 	#quit(1) | ||||||
|  |  | ||||||
| 	columnHeaders = ['Count','WebMail','Local','MailMan','Relay','DMARC','Virus','RBL/DNS','Geoip.','Non.Conf.','Karma','Rej.Load','Del.Spam','Qued.Spam?','  Ham','TOTALS','PERCENT'] | 	columnHeaders = ['Count','WebMail','Local','MailMan','Relay','DMARC','Virus','RBL/DNS','Geoip.','Non.Conf.','Karma','Rej.Load','Del.Spam','Qued.Spam?','  Ham','TOTALS','PERCENT'] | ||||||
| 	# dict for each colum identifying plugin that increments count | 	# dict for each colum identifying plugin that increments count | ||||||
| @@ -1497,7 +1594,7 @@ if __name__ == "__main__": | |||||||
| 			# Match initial connection message | 			# Match initial connection message | ||||||
| 			IsInternal = True | 			IsInternal = True | ||||||
| 			try: | 			try: | ||||||
| 				match = helo_pattern.match(data[1]) | 				match = helo_pattern.match(data['MESSAGE']) | ||||||
| 				if match: | 				if match: | ||||||
| 					ip  = match.group(1) | 					ip  = match.group(1) | ||||||
| 					fqdn = match.group(2) | 					fqdn = match.group(2) | ||||||
| @@ -1508,12 +1605,12 @@ if __name__ == "__main__": | |||||||
| 						IsInternal = False | 						IsInternal = False | ||||||
| 					continue | 					continue | ||||||
| 			except Exception as e: | 			except Exception as e: | ||||||
| 				print(f" Helo pattern error {e} {data[1]} {analysis_date}") | 				print(f" Helo pattern error {e} {data['MESSAGE']} {analysis_date}") | ||||||
| 				continue | 				continue | ||||||
| 					 | 					 | ||||||
| 			#Pull out Geoip countries for analysis table | 			#Pull out Geoip countries for analysis table | ||||||
| 			try: | 			try: | ||||||
| 				match = geoip_pattern.match(data[1]) | 				match = geoip_pattern.match(data['MESSAGE']) | ||||||
| 				if match: | 				if match: | ||||||
| 					j += 1 | 					j += 1 | ||||||
| 					country = match.group(1) | 					country = match.group(1) | ||||||
| @@ -1521,17 +1618,17 @@ if __name__ == "__main__": | |||||||
| 					total_countries += 1 | 					total_countries += 1 | ||||||
| 					continue | 					continue | ||||||
| 			except Exception as e: | 			except Exception as e: | ||||||
| 				print(f"Geoip pattern error {e} {data[1]} {analysis_date}") | 				print(f"Geoip pattern error {e} {data['MESSAGE']} {analysis_date}") | ||||||
| 				continue | 				continue | ||||||
| 			 | 			 | ||||||
| 			#Pull out DMARC approvals | 			#Pull out DMARC approvals | ||||||
| 			match = dmarc_pattern.match(data[1]) | 			match = dmarc_pattern.match(data['MESSAGE']) | ||||||
| 			if match: | 			if match: | ||||||
| 				DMARCOkCount += 1 | 				DMARCOkCount += 1 | ||||||
| 				continue | 				continue | ||||||
|  |  | ||||||
| 			#Pull out type of connection | 			#Pull out type of connection | ||||||
| 			match = connect_type_pattern.match(data[1]) | 			match = connect_type_pattern.match(data['MESSAGE']) | ||||||
| 			if match: | 			if match: | ||||||
| 				connection_type = match.group(1) | 				connection_type = match.group(1) | ||||||
| 				#print(f"ct:{connection_type}") | 				#print(f"ct:{connection_type}") | ||||||
| @@ -1539,7 +1636,7 @@ if __name__ == "__main__": | |||||||
| 				#print(f"Count:{connection_type_counts[connection_type]}") | 				#print(f"Count:{connection_type_counts[connection_type]}") | ||||||
| 				continue | 				continue | ||||||
| 				 | 				 | ||||||
| 			match = tls_type_pattern.match(data[1]) | 			match = tls_type_pattern.match(data['MESSAGE']) | ||||||
| 			if match: | 			if match: | ||||||
| 				connection_type = match.group(1) | 				connection_type = match.group(1) | ||||||
| 				#print(f"ct:{connection_type}") | 				#print(f"ct:{connection_type}") | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user