Optimise journal access speeding up processing
This commit is contained in:
@@ -173,100 +173,145 @@ def replace_bracket_content(input_filename, output_filename):
|
|||||||
|
|
||||||
|
|
||||||
def get_logs_from_Journalctl(date='yesterday'):
|
def get_logs_from_Journalctl(date='yesterday'):
|
||||||
# JSON-pretty output example from journalctl
|
# JSON-pretty output example from journalctl
|
||||||
# {
|
# {
|
||||||
# "__CURSOR" : "s=21b4f015be0c4f1fb71ac439a8365ee7;i=385c;b=dd778625547f4883b572daf53ae93cd4;m=ca99d6d;t=62d6316802b05;x=71b24e9f19f3b99a",
|
# "__CURSOR" : "s=21b4f015be0c4f1fb71ac439a8365ee7;i=385c;b=dd778625547f4883b572daf53ae93cd4;m=ca99d6d;t=62d6316802b05;x=71b24e9f19f3b99a",
|
||||||
# "__REALTIME_TIMESTAMP" : "1738753462774533",
|
# "__REALTIME_TIMESTAMP" : "1738753462774533",
|
||||||
# "__MONOTONIC_TIMESTAMP" : "212442477",
|
# "__MONOTONIC_TIMESTAMP" : "212442477",
|
||||||
# "_BOOT_ID" : "dd778625547f4883b572daf53ae93cd4",
|
# "_BOOT_ID" : "dd778625547f4883b572daf53ae93cd4",
|
||||||
# "_MACHINE_ID" : "f20b7edad71a44e59f9e9b68d4870b19",
|
# "_MACHINE_ID" : "f20b7edad71a44e59f9e9b68d4870b19",
|
||||||
# "PRIORITY" : "6",
|
# "PRIORITY" : "6",
|
||||||
# "SYSLOG_FACILITY" : "3",
|
# "SYSLOG_FACILITY" : "3",
|
||||||
# "_UID" : "0",
|
# "_UID" : "0",
|
||||||
# "_GID" : "0",
|
# "_GID" : "0",
|
||||||
# "_SYSTEMD_SLICE" : "system.slice",
|
# "_SYSTEMD_SLICE" : "system.slice",
|
||||||
# "_CAP_EFFECTIVE" : "1ffffffffff",
|
# "_CAP_EFFECTIVE" : "1ffffffffff",
|
||||||
# "_TRANSPORT" : "stdout",
|
# "_TRANSPORT" : "stdout",
|
||||||
# "_COMM" : "openssl",
|
# "_COMM" : "openssl",
|
||||||
# "_EXE" : "/usr/bin/openssl",
|
# "_EXE" : "/usr/bin/openssl",
|
||||||
# "_HOSTNAME" : "sme11.thereadclan.me.uk",
|
# "_HOSTNAME" : "sme11.thereadclan.me.uk",
|
||||||
# "_STREAM_ID" : "8bb0ef8920af4ae09b424a2e30abcdf7",
|
# "_STREAM_ID" : "8bb0ef8920af4ae09b424a2e30abcdf7",
|
||||||
# "SYSLOG_IDENTIFIER" : "qpsmtpd-init",
|
# "SYSLOG_IDENTIFIER" : "qpsmtpd-init",
|
||||||
# "MESSAGE" : "Generating DH parameters, 2048 bit long safe prime, generator 2",
|
# "MESSAGE" : "Generating DH parameters, 2048 bit long safe prime, generator 2",
|
||||||
# "_PID" : "2850",
|
# "_PID" : "2850",
|
||||||
# }
|
# }
|
||||||
# and the return from here:
|
# and the return from here:
|
||||||
# {
|
# {
|
||||||
# '_TRANSPORT': 'stdout', 'PRIORITY': 6, 'SYSLOG_FACILITY': 3, '_CAP_EFFECTIVE': '0', '_SYSTEMD_SLICE': 'system.slice',
|
# '_TRANSPORT': 'stdout', 'PRIORITY': 6, 'SYSLOG_FACILITY': 3, '_CAP_EFFECTIVE': '0', '_SYSTEMD_SLICE': 'system.slice',
|
||||||
# '_BOOT_ID': UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f'), '_MACHINE_ID': UUID('f20b7eda-d71a-44e5-9f9e-9b68d4870b19'),
|
# '_BOOT_ID': UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f'), '_MACHINE_ID': UUID('f20b7eda-d71a-44e5-9f9e-9b68d4870b19'),
|
||||||
# '_HOSTNAME': 'sme11.thereadclan.me.uk', '_STREAM_ID': '06c860deea374544a2b561f55394d728', 'SYSLOG_IDENTIFIER': 'qpsmtpd-forkserver',
|
# '_HOSTNAME': 'sme11.thereadclan.me.uk', '_STREAM_ID': '06c860deea374544a2b561f55394d728', 'SYSLOG_IDENTIFIER': 'qpsmtpd-forkserver',
|
||||||
# '_UID': 453, '_GID': 453, '_COMM': 'qpsmtpd-forkser', '_EXE': '/usr/bin/perl',
|
# '_UID': 453, '_GID': 453, '_COMM': 'qpsmtpd-forkser', '_EXE': '/usr/bin/perl',
|
||||||
# '_CMDLINE': '/usr/bin/perl -Tw /usr/bin/qpsmtpd-forkserver -u qpsmtpd -l 0.0.0.0 -p 25 -c 40 -m 5',
|
# '_CMDLINE': '/usr/bin/perl -Tw /usr/bin/qpsmtpd-forkserver -u qpsmtpd -l 0.0.0.0 -p 25 -c 40 -m 5',
|
||||||
# '_SYSTEMD_CGROUP': '/system.slice/qpsmtpd.service', '_SYSTEMD_UNIT': 'qpsmtpd.service',
|
# '_SYSTEMD_CGROUP': '/system.slice/qpsmtpd.service', '_SYSTEMD_UNIT': 'qpsmtpd.service',
|
||||||
# '_SYSTEMD_INVOCATION_ID': 'a2b7889a307748daaeb60173d31c5e0f', '_PID': 93647,
|
# '_SYSTEMD_INVOCATION_ID': 'a2b7889a307748daaeb60173d31c5e0f', '_PID': 93647,
|
||||||
# 'MESSAGE': '93647 Connection from localhost [127.0.0.1]',
|
# 'MESSAGE': '93647 Connection from localhost [127.0.0.1]',
|
||||||
# '__REALTIME_TIMESTAMP': datetime.datetime(2025, 4, 2, 0, 1, 11, 668929),
|
# '__REALTIME_TIMESTAMP': datetime.datetime(2025, 4, 2, 0, 1, 11, 668929),
|
||||||
# '__MONOTONIC_TIMESTAMP': journal.Monotonic(timestamp=datetime.timedelta(11, 53118, 613602),
|
# '__MONOTONIC_TIMESTAMP': journal.Monotonic(timestamp=datetime.timedelta(11, 53118, 613602),
|
||||||
# bootid=UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f')),
|
# bootid=UUID('465c6202-36ac-4a8b-98e9-1581e8fec68f')),
|
||||||
# '__CURSOR': 's=21b4f015be0c4f1fb71ac439a8365ee7;i=66d2c;b=465c620236ac4a8b98e91581e8fec68f;m=e9a65ed862;t=
|
# '__CURSOR': 's=21b4f015be0c4f1fb71ac439a8365ee7;i=66d2c;b=465c620236ac4a8b98e91581e8fec68f;m=e9a65ed862;t=
|
||||||
# }
|
# }
|
||||||
"""
|
"""
|
||||||
Retrieve and parse journalctl logs for a specific date and units,
|
Retrieve and parse journalctl logs for a specific date and units,
|
||||||
returning them as a sorted list of dictionaries.
|
returning them as a sorted list of dictionaries.
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
# Parse the input date to calculate the start and end of the day
|
|
||||||
if date.lower() == "yesterday":
|
|
||||||
target_date = datetime.now() - timedelta(days=1)
|
|
||||||
else:
|
|
||||||
target_date = datetime.strptime(date, "%Y-%m-%d")
|
|
||||||
|
|
||||||
# Define the time range for the specified date
|
|
||||||
since = target_date.strftime("%Y-%m-%d 00:00:00")
|
|
||||||
until = target_date.strftime("%Y-%m-%d 23:59:59")
|
|
||||||
|
|
||||||
# Convert times to microseconds for querying
|
|
||||||
since_microseconds = int(datetime.strptime(since, "%Y-%m-%d %H:%M:%S").timestamp() * 1_000_000)
|
|
||||||
until_microseconds = int(datetime.strptime(until, "%Y-%m-%d %H:%M:%S").timestamp() * 1_000_000)
|
|
||||||
|
|
||||||
# Open the systemd journal
|
|
||||||
j = journal.Reader()
|
|
||||||
|
|
||||||
# Set filters for units
|
|
||||||
j.add_match(_SYSTEMD_UNIT="qpsmtpd.service")
|
|
||||||
j.add_match(_SYSTEMD_UNIT="uqpsmtpd.service")
|
|
||||||
j.add_match(_SYSTEMD_UNIT="sqpsmtpd.service")
|
|
||||||
|
|
||||||
# Filter by time range
|
|
||||||
j.seek_realtime(since_microseconds // 1_000_000) # Convert back to seconds for seeking
|
|
||||||
|
|
||||||
# Retrieve logs within the time range
|
|
||||||
logs = []
|
|
||||||
log_count = 0
|
|
||||||
error_count = 0
|
|
||||||
for entry in j:
|
|
||||||
try:
|
|
||||||
entry_timestamp = entry.get('__REALTIME_TIMESTAMP', None)
|
|
||||||
entry_microseconds = int(entry_timestamp.timestamp() * 1_000_000)
|
|
||||||
if entry_timestamp and since_microseconds <= entry_microseconds <= until_microseconds:
|
|
||||||
log_count += 1
|
|
||||||
# takeout ASCII Escape sequences from the message
|
|
||||||
entry['MESSAGE'] = strip_ansi_codes(entry['MESSAGE'])
|
|
||||||
logs.append(entry)
|
|
||||||
except Exception as e:
|
|
||||||
logging.warning(f"Error - log line: {log_count} {entry['_PID']} {entry['SYSLOG_IDENTIFIER']} : {e}")
|
|
||||||
error_count += 1
|
|
||||||
if error_count:
|
|
||||||
logging.info(f"Had {error_count} errors on journal import - probably non character bytes")
|
|
||||||
# Sort logs by __REALTIME_TIMESTAMP in ascending order
|
|
||||||
sorted_logs = sorted(logs, key=lambda x: x.get("__REALTIME_TIMESTAMP", 0))
|
|
||||||
|
|
||||||
return sorted_logs
|
|
||||||
|
|
||||||
except Exception as e:
|
def to_us(ts):
|
||||||
logging.error(f"Unexpected error: {e}")
|
# Convert a journal timestamp (datetime or int/string microseconds) to integer microseconds
|
||||||
return {}
|
if ts is None:
|
||||||
|
return None
|
||||||
|
if hasattr(ts, "timestamp"):
|
||||||
|
return int(ts.timestamp() * 1_000_000)
|
||||||
|
try:
|
||||||
|
return int(ts)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Parse the input date to calculate start and end of the day
|
||||||
|
if isinstance(date, str) and date.lower() == "yesterday":
|
||||||
|
target_date = datetime.now() - timedelta(days=1)
|
||||||
|
elif isinstance(date, datetime):
|
||||||
|
target_date = date
|
||||||
|
else:
|
||||||
|
# Supports either a datetime.date-like object (has year attr) or a string YYYY-MM-DD
|
||||||
|
try:
|
||||||
|
target_date = datetime(date.year, date.month, date.day)
|
||||||
|
except Exception:
|
||||||
|
target_date = datetime.strptime(str(date), "%Y-%m-%d")
|
||||||
|
|
||||||
|
# Define the time range for the specified date
|
||||||
|
since_dt = datetime(target_date.year, target_date.month, target_date.day, 0, 0, 0, 0)
|
||||||
|
until_dt = datetime(target_date.year, target_date.month, target_date.day, 23, 59, 59, 999999)
|
||||||
|
since_microseconds = int(since_dt.timestamp() * 1_000_000)
|
||||||
|
until_microseconds = int(until_dt.timestamp() * 1_000_000)
|
||||||
|
|
||||||
|
# Open the systemd journal (system-only if supported)
|
||||||
|
try:
|
||||||
|
j = journal.Reader(flags=journal.SYSTEM_ONLY)
|
||||||
|
except Exception:
|
||||||
|
j = journal.Reader()
|
||||||
|
|
||||||
|
# Set filters for units (multiple add_match on same field => OR)
|
||||||
|
j.add_match(_SYSTEMD_UNIT="qpsmtpd.service")
|
||||||
|
j.add_match(_SYSTEMD_UNIT="uqpsmtpd.service")
|
||||||
|
j.add_match(_SYSTEMD_UNIT="sqpsmtpd.service")
|
||||||
|
|
||||||
|
# Filter by time range: seek to the start of the interval
|
||||||
|
j.seek_realtime(since_dt)
|
||||||
|
|
||||||
|
# Retrieve logs within the time range
|
||||||
|
logs = []
|
||||||
|
log_count = 0
|
||||||
|
error_count = 0
|
||||||
|
|
||||||
|
for entry in j:
|
||||||
|
try:
|
||||||
|
entry_timestamp = entry.get("__REALTIME_TIMESTAMP", None)
|
||||||
|
entry_microseconds = to_us(entry_timestamp)
|
||||||
|
if entry_microseconds is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Early stop once we pass the end of the window
|
||||||
|
if entry_microseconds > until_microseconds:
|
||||||
|
break
|
||||||
|
|
||||||
|
if entry_microseconds >= since_microseconds:
|
||||||
|
log_count += 1
|
||||||
|
# Strip ANSI escape sequences in MESSAGE (if present and is text/bytes)
|
||||||
|
try:
|
||||||
|
msg = entry.get("MESSAGE", "")
|
||||||
|
if isinstance(msg, (bytes, bytearray)):
|
||||||
|
msg = msg.decode("utf-8", "replace")
|
||||||
|
# Only call strip if ESC is present
|
||||||
|
if "\x1b" in msg:
|
||||||
|
msg = strip_ansi_codes(msg)
|
||||||
|
entry["MESSAGE"] = msg
|
||||||
|
except Exception as se:
|
||||||
|
# Keep original message, just note the issue at debug level
|
||||||
|
logging.debug(f"strip_ansi_codes failed: {se}")
|
||||||
|
|
||||||
|
logs.append(entry)
|
||||||
|
except Exception as e:
|
||||||
|
# Be defensive getting context fields to avoid raising inside logging
|
||||||
|
pid = entry.get("_PID", "?") if isinstance(entry, dict) else "?"
|
||||||
|
ident = entry.get("SYSLOG_IDENTIFIER", "?") if isinstance(entry, dict) else "?"
|
||||||
|
logging.warning(f"Error - log line: {log_count} {pid} {ident} : {e}")
|
||||||
|
error_count += 1
|
||||||
|
|
||||||
|
if error_count:
|
||||||
|
logging.info(f"Had {error_count} errors on journal import - probably non character bytes")
|
||||||
|
|
||||||
|
# Sort logs by __REALTIME_TIMESTAMP in ascending order (keep original behavior)
|
||||||
|
sorted_logs = sorted(logs, key=lambda x: to_us(x.get("__REALTIME_TIMESTAMP")) or 0)
|
||||||
|
|
||||||
|
logging.debug(f"Collected {len(sorted_logs)} entries for {since_dt.date()} "
|
||||||
|
f"between {since_dt} and {until_dt} (scanned {log_count} in-window)")
|
||||||
|
|
||||||
|
return sorted_logs
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Unexpected error: {e}")
|
||||||
|
return {}
|
||||||
|
|
||||||
def transform_to_dict(data, keys, iso_date):
|
def transform_to_dict(data, keys, iso_date):
|
||||||
"""
|
"""
|
||||||
|
Reference in New Issue
Block a user