Add in recipient email table

This commit is contained in:
Brian Read 2025-01-02 09:50:44 +00:00
parent 9be485a1a9
commit 372d2b45dd
3 changed files with 168 additions and 36 deletions

View File

@ -8,10 +8,9 @@
</thead>
<tbody>
<tr tal:repeat="item array_2d">
<td tal:repeat="cell item">${cell}<span tal:condition="repeat.cell.index == 2">%</span></td>
<td tal:repeat="cell item">${cell}</td>
</tr>
</tbody>
</table>
</div>
</div>

View File

@ -173,7 +173,8 @@
</script>
<!--
<p class="cssvalid">
<a href="http://jigsaw.w3.org/css-validator/check/referer">
<img style="border:0;width:88px;height:31px"
@ -187,6 +188,7 @@
src="http://www.w3.org/Icons/valid-xhtml10"
alt="Valid XHTML 1.0!" height="31" width="88" /></a>
</p>
-->
</div>
</body>
</html>
</html>

View File

@ -742,35 +742,78 @@ def split_timestamp_and_data(log_entry: str) -> list:
return [timestamp, rest_of_line]
def render_sub_table(table_title,table_headers,found_values,get_character=None):
# Get the total
total_sum = sum(found_values.values())
# and add in list with second element the percentage
# Create a list of tuples with each tuple containing (key, value, percentage)
if get_character:
sub_result = [(key, value,
f"{round(value / total_sum * 100, 2)}",
f"{get_character(key)}") for key, value in found_values.items()
]
else:
sub_result = [(key, value,
f"{round(value / total_sum * 100, 2)}") for key, value in found_values.items()
]
sub_result.sort(key=lambda x: float(x[2]), reverse=True) # Sort by percentage in descending order
sub_template_path = template_dir+'mailstats-sub-table.html.pt'
# Load the template
with open(sub_template_path, 'r') as template_file:
template_content = template_file.read()
# Create a Chameleon template instance
try:
template = PageTemplate(template_content)
# Render the template with the 2D array data and column headers
#print(f"render_sub_table:{table_title} {found_values}")
#Check if any data provided
if len(found_values) != 0:
# Get the total
if isinstance(found_values, dict):
# If found_values is a dictionary, we operate as previously
total_sum = sum(found_values.values())
if get_character:
sub_result = [(key, value,
f"{round(value / total_sum * 100, 2)}%",
f"{get_character(key)}") for key, value in found_values.items()]
else:
sub_result = [(key, value,
f"{round(value / total_sum * 100, 2)}%" ) for key, value in found_values.items()]
elif isinstance(found_values, list):
# If found_values is a list of values
if all(isinstance(v, (int, float)) for v in found_values):
total_sum = sum(found_values)
sub_result = [(i, value,
f"{round(value / total_sum * 100, 2)}%") for i, value in enumerate(found_values)]
# If found_values is a list of dictionaries
elif all(isinstance(v, dict) for v in found_values):
# Example assumes first key is used for identification and others are numeric
# Convert to 2D array
sub_result = [list(entry.values()) for entry in found_values]
# Calculate the total of the first numeric entry (index 1)
total = sum(row[1] for row in sub_result)
# Append percentage of the total for each entry
for row in sub_result:
percentage = f"{round(row[1] / total * 100, 2) if total else 0}%" # Handle division by zero
row.append(percentage)
# total_sum = sum(d.get('value', 0) for d in found_values) # Adjust 'value' if necessary
# if total_sum != 0:
# if get_character:
# sub_result = [(d.get('key', i), d.get('value', 0),
# f"{round(d.get('value', 0) / total_sum * 100, 2)}%",
# f"{get_character(d.get('key', i))}") for i, d in enumerate(found_values)]
# else:
# sub_result = [(d.get('key', i), d.get('value', 0),
# f"{round(d.get('value', 0) / total_sum * 100, 2)}%") for i, d in enumerate(found_values)]
# else:
# if get_character:
# sub_result = [(d.get('key', i), d.get('value', 0),
# f"{get_character(d.get('key', i))}") for i, d in enumerate(found_values)]
# else:
# sub_result = [(d.get('key', i), d.get('value', 0)) for i, d in enumerate(found_values)]
else:
raise ValueError("found_values must be either a list of numbers or a list of dictionaries.")
else:
raise TypeError("found_values must be a dictionary or a list.")
#print(f"Sub:{sub_result}")
sub_result.sort(key=lambda x: float(x[1]), reverse=True) # Sort by percentage in descending order
sub_template_path = template_dir+'mailstats-sub-table.html.pt'
# Load the template
with open(sub_template_path, 'r') as template_file:
template_content = template_file.read()
# Create a Chameleon template instance
try:
rendered_html = template(array_2d=sub_result, column_headers=table_headers, title=table_title)
template = PageTemplate(template_content)
# Render the template with the 2D array data and column headers
try:
rendered_html = template(array_2d=sub_result, column_headers=table_headers, title=table_title)
except Exception as e:
raise ValueError(f"{table_title}: A chameleon controller render error occurred: {e}")
except Exception as e:
raise ValueError(f"{table_title}: A chameleon controller render error occurred: {e}")
except Exception as e:
raise ValueError(f"{table_title}: A chameleon controller template error occurred: {e}")
raise ValueError(f"{table_title}: A chameleon controller template error occurred: {e}")
else:
rendered_html = f"<div class={table_title}><h2>{table_title}</h2>No data for {table_title}</div>"
return rendered_html
def get_character_in_reject_list(code):
@ -922,6 +965,44 @@ def get_heading():
# switch newlines to <br />
header_str = header_str.replace("\n","<br />")
return header_str
def scan_mail_users():
#
# Count emails left in junkmail folders for each user
#
base_path = '/home/e-smith/files/users'
users_info = defaultdict(int)
# List of junk mail directories to check
junk_mail_directories = [
'Maildir/.Junk/cur',
'Maildir/.Junk/new',
'Maildir/.Junkmail/cur',
'Maildir/.Junkmail/new'
]
# Iterate through each user directory
for user in os.listdir(base_path):
user_path = os.path.join(base_path, user)
# Check if it is a directory
if os.path.isdir(user_path):
total_junk_count = 0
# Check each junk mail path and accumulate counts
for junk_dir in junk_mail_directories:
junk_mail_path = os.path.join(user_path, junk_dir)
# Check if the Junk directory actually exists
if os.path.exists(junk_mail_path):
try:
# Count the number of junk mail files in that directory
junk_count = len(os.listdir(junk_mail_path))
total_junk_count += junk_count
except Exception as e:
print(f"Error counting junk mails in {junk_mail_path} for user {user}: {e}")
if total_junk_count != 0:
users_info[user] = total_junk_count
return users_info
if __name__ == "__main__":
try:
@ -949,7 +1030,7 @@ if __name__ == "__main__":
datetime.strptime(analysis_date, '%Y-%m-%d')
except ValueError:
print("Specify a valid date (yyyy-mm-dd) for the analysis")
(quit)()
quit(1)
anaysis_date_obj = datetime.strptime(analysis_date, '%Y-%m-%d')
noemailfile = args.emailfile.lower() == 'n'
@ -1109,6 +1190,8 @@ if __name__ == "__main__":
virus_pattern = re.compile(r"Virus found: (.*)")
found_viruses = defaultdict(int)
recipients_found = []
found_qpcodes = defaultdict(int)
qpcodes_pattern = re.compile(r"(\(.*\)).*'")
i = 0;
@ -1240,6 +1323,7 @@ if __name__ == "__main__":
hamavg += score
hamcount += 1
#spamassasin rejects
Isqueuedspam = False;
if parsed_data.get('spam-status') is not None and isinstance(parsed_data['spam-status'], str):
if parsed_data['spam-status'].lower().startswith('yes'):
#Extract other parameters from this string
@ -1259,6 +1343,7 @@ if __name__ == "__main__":
columnCounts_2d[ColTotals][QuedSpam] += 1
spamavg += score
spamqueuedcount += 1
Isqueuedspam = True #for recipient stats below
@ -1281,6 +1366,26 @@ if __name__ == "__main__":
else:
found_qpcodes[parsed_data['action1']] += 1
#Log the recipients and deny or accept and spam-tagged counts
# Try to find an existing record for the email
email = parsed_data["from-email"] # Extract email
action = parsed_data["action"] # Extract action
#print(f"{email} {action}")
record = next((item for item in recipients_found if item['email'] == email), None)
if not record:
# If email is not in the array, we add it
record = {"email": email, "deny": 0, "accept": 0,"spam-tagged": 0}
recipients_found.append(record)
# Update the deny or accept count based on action
if action != "(queue)":
record["deny"] += 1
else:
record["accept"] += 1
#and see if it is spam tagged
if Isqueuedspam:
record["spam-tagged"] += 1
#Now increment the column which the plugin name indicates
if parsed_data['action'] == '(deny)' and parsed_data['error-plugin']:
if parsed_data['error-plugin']:
@ -1299,6 +1404,7 @@ if __name__ == "__main__":
found_qpcodes[parsed_data['action1']] += 1
if isThonny:
print() #seperate the [progress bar]
# Compute percentages
total_Count = columnCounts_2d[ColTotals][TOTALS]
#Column of percentages
@ -1345,7 +1451,7 @@ if __name__ == "__main__":
log_len = len(log_entries)
#connection_type_counts = defaultdict(int)
connection_type_counts = {"qpsmtp":total_qpsmtpd,"sqpsmtp":total_sqpsmtpd,"uqpsmtp":total_uqpsmtpd}
print(f"Con:{connection_type_counts}")
#print(f"Con:{connection_type_counts}")
if log_len > 0:
if isThonny:
print_progress_bar(0, log_len, prefix='Progress:', suffix='Complete', length=50)
@ -1452,19 +1558,44 @@ if __name__ == "__main__":
#add in the subservient tables..
#qpsmtd codes
#print(f"{found_qpcodes}")
qpsmtpd_headers = ["Reason",'Count','Percent']
qpsmtpd_title = 'Qpsmtpd codes league table:'
qpsmtpd_title = 'Qpsmtpd codes league table'
rendered_html = render_sub_table(qpsmtpd_title,qpsmtpd_headers,found_qpcodes)
# Add it to the total
total_html = insert_string_after(total_html,rendered_html, "<!---Add in sub tables here -->")
#Geoip Country codes
geoip_headers = ['Country','Count','Percent','Rejected?']
geoip_title = 'Geoip results:'
geoip_title = 'Geoip results'
rendered_html = render_sub_table(geoip_title,geoip_headers,found_countries,get_character_in_reject_list)
# Add it to the total
total_html = insert_string_after(total_html,rendered_html, "<!---Add in sub tables here -->")
#Junk mails
junk_mail_count_headers = ['Usernane','Count', 'Percent']
junk_mail_counts = scan_mail_users()
junk_mail_count_title = 'Junk mail counts'
rendered_html = render_sub_table(junk_mail_count_title,junk_mail_count_headers,junk_mail_counts)
# Add it to the total
total_html = insert_string_after(total_html,rendered_html, "<!---Add in sub tables here -->")
#virus codes
virus_headers = ["Virus",'Count','Percent']
virus_title = 'Virus types found'
rendered_html = render_sub_table(virus_title,virus_headers,found_viruses)
# Add it to the total
total_html = insert_string_after(total_html,rendered_html, "<!---Add in sub tables here -->")
#Recipient counts
#print(f"{recipients_found}")
recipient_count_headers = ["Email",'Queued','Rejected','Spam tagged','Accepted Percent']
recipient_count_title = 'Recipient count and status '
rendered_html = render_sub_table(recipient_count_title,recipient_count_headers,recipients_found)
# Add it to the total
total_html = insert_string_after(total_html,rendered_html, "<!---Add in sub tables here -->")
if saveData:
# Close the connection
cursor.close()