Fix missing blacklist URLs from report
This commit is contained in:
@@ -1173,11 +1173,63 @@ def display_keys_and_values(data):
|
|||||||
raise ValueError("Input must be a list of dictionaries or a list of lists.")
|
raise ValueError("Input must be a list of dictionaries or a list of lists.")
|
||||||
|
|
||||||
def extract_blacklist_domain(text):
|
def extract_blacklist_domain(text):
|
||||||
match = re.search(r'http://www\.surbl\.org', text)
|
"""
|
||||||
if match:
|
Compare 'text' against comma-separated URL strings from global vars
|
||||||
return "www.surbl.org"
|
RBLList, SBLList, and UBLList. Return the first matching entry or "".
|
||||||
return None
|
Match is done on exact hostname substring OR the base domain (eTLD+1),
|
||||||
|
so 'black.uribl.com' will match text containing 'lookup.uribl.com'.
|
||||||
|
"""
|
||||||
|
s = text if isinstance(text, str) else str(text or "")
|
||||||
|
s_lower = s.lower()
|
||||||
|
logging.info(f"extract blacklist called:{text}")
|
||||||
|
|
||||||
|
combined = ",".join([RBLList, SBLList, UBLList])
|
||||||
|
|
||||||
|
def hostname_from(sval: str) -> str:
|
||||||
|
sval = (sval or "").strip().lower()
|
||||||
|
if "://" in sval:
|
||||||
|
# Strip scheme using simple split to avoid needing urlparse
|
||||||
|
sval = sval.split("://", 1)[1]
|
||||||
|
# Strip path and port if present
|
||||||
|
sval = sval.split("/", 1)[0]
|
||||||
|
sval = sval.split(":", 1)[0]
|
||||||
|
# Remove leading wildcards/dots
|
||||||
|
sval = sval.lstrip(".")
|
||||||
|
if sval.startswith("*."):
|
||||||
|
sval = sval[2:]
|
||||||
|
return sval
|
||||||
|
|
||||||
|
def base_domain(hostname: str) -> str:
|
||||||
|
parts = hostname.split(".")
|
||||||
|
if len(parts) >= 3 and parts[-2] in ("co", "org", "gov", "ac") and parts[-1] == "uk":
|
||||||
|
return ".".join(parts[-3:])
|
||||||
|
if len(parts) >= 2:
|
||||||
|
return ".".join(parts[-2:])
|
||||||
|
return hostname
|
||||||
|
|
||||||
|
def boundary_re(term: str):
|
||||||
|
# Match term when not part of a larger domain label
|
||||||
|
return re.compile(r"(?<![A-Za-z0-9-])" + re.escape(term) + r"(?![A-Za-z0-9-])")
|
||||||
|
|
||||||
|
for part in combined.split(","):
|
||||||
|
entry = part.strip()
|
||||||
|
logging.info(f"Comparing: {entry}")
|
||||||
|
if not entry:
|
||||||
|
continue
|
||||||
|
|
||||||
|
entry_host = hostname_from(entry)
|
||||||
|
entry_base = base_domain(entry_host)
|
||||||
|
|
||||||
|
# 1) Try matching the full entry host (e.g., black.uribl.com)
|
||||||
|
if entry_host and boundary_re(entry_host).search(s_lower):
|
||||||
|
return entry
|
||||||
|
|
||||||
|
# 2) Fallback: match by base domain (e.g., uribl.com) to catch lookup.uribl.com, etc.
|
||||||
|
if entry_base and boundary_re(entry_base).search(s_lower):
|
||||||
|
return entry
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
def set_log_level(level):
|
def set_log_level(level):
|
||||||
"""Dynamically adjust logging level (e.g., 'DEBUG', 'INFO', 'ERROR')."""
|
"""Dynamically adjust logging level (e.g., 'DEBUG', 'INFO', 'ERROR')."""
|
||||||
numeric_level = getattr(logging, level.upper(), None)
|
numeric_level = getattr(logging, level.upper(), None)
|
||||||
@@ -1330,19 +1382,19 @@ if __name__ == "__main__":
|
|||||||
saveData = False
|
saveData = False
|
||||||
|
|
||||||
nolinks = not saveData
|
nolinks = not saveData
|
||||||
# Not sure we need these...
|
# Needed to identify blacklist used to reject emails.
|
||||||
# if (ConfigDB,"qpsmtpd","RHSBL").lower() == 'enabled':
|
if get_value(ConfigDB,"qpsmtpd","RHSBL").lower() == 'enabled':
|
||||||
# RBLList = get_value(ConfigDB,"qpsmtpd","RBLList")
|
RBLList = get_value(ConfigDB,"qpsmtpd","RBLList")
|
||||||
# else:
|
else:
|
||||||
# RBLList = ""
|
RBLList = ""
|
||||||
# if (ConfigDB,"qpsmtpd","RBLList").lower() == 'enabled':
|
if get_value(ConfigDB,"qpsmtpd","DNSBL").lower() == 'enabled':
|
||||||
# SBLLIst = get_value(ConfigDB,"qpsmtpd","SBLLIst")
|
SBLList = get_value(ConfigDB,"qpsmtpd","SBLList")
|
||||||
# else:
|
else:
|
||||||
# RBLList = ""
|
SBLList = ""
|
||||||
# if (ConfigDB,"qpsmtpd","RBLList").lower() == 'enabled':
|
if get_value(ConfigDB,"qpsmtpd","URIBL").lower() == 'enabled':
|
||||||
# UBLList = get_value(ConfigDB,"qpsmtpd","UBLLIst")
|
UBLList = get_value(ConfigDB,"qpsmtpd","UBLList")
|
||||||
# else:
|
else:
|
||||||
# RBLList = ""
|
UBLList = ""
|
||||||
|
|
||||||
FetchmailIP = '127.0.0.200'; #Apparent Ip address of fetchmail deliveries
|
FetchmailIP = '127.0.0.200'; #Apparent Ip address of fetchmail deliveries
|
||||||
WebmailIP = '127.0.0.1'; #Apparent Ip of Webmail sender
|
WebmailIP = '127.0.0.1'; #Apparent Ip of Webmail sender
|
||||||
@@ -1577,7 +1629,8 @@ if __name__ == "__main__":
|
|||||||
error_plugin = parsed_data['error-plugin'].strip()
|
error_plugin = parsed_data['error-plugin'].strip()
|
||||||
if error_plugin == 'rhsbl' or error_plugin == 'dnsbl':
|
if error_plugin == 'rhsbl' or error_plugin == 'dnsbl':
|
||||||
blacklist_domain = extract_blacklist_domain(parsed_data['sender'])
|
blacklist_domain = extract_blacklist_domain(parsed_data['sender'])
|
||||||
blacklist_found[blacklist_domain] += 1
|
if blacklist_domain:
|
||||||
|
blacklist_found[blacklist_domain] += 1
|
||||||
|
|
||||||
#Log the recipients and deny or accept and spam-tagged counts
|
#Log the recipients and deny or accept and spam-tagged counts
|
||||||
# Try to find an existing record for the email
|
# Try to find an existing record for the email
|
||||||
|
Reference in New Issue
Block a user