#!/usr/bin/env python3 """ SME Server Utilities Module - Python 3.6.8 Compatible This module provides utilities for interfacing with SME Server's configuration database and system commands. Compatible with Python 3.6.8 """ import subprocess import os import logging import re # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SMEServerError(Exception): """Custom exception for SME Server related errors""" pass class SMEConfigDB: """Interface for SME Server configuration database""" def __init__(self): self.db_command = '/sbin/e-smith/db' def _run_db_command(self, args): """Run a database command and return success status and output""" try: cmd = [self.db_command] + args result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 ) return result.returncode == 0, result.stdout.strip() except subprocess.TimeoutExpired: logger.error("Database command timed out: {}".format(args)) return False, "Command timed out" except FileNotFoundError: logger.error("Database command not found: {}".format(self.db_command)) return False, "Database command not available" except Exception as e: logger.error("Error running database command: {}".format(e)) return False, str(e) def get_account_info(self, username): """Get account information from the accounts database""" success, output = self._run_db_command(['accounts', 'show', username]) if not success: return None # Parse the output into a dictionary info = {'username': username} for line in output.split('\n'): if '=' in line: key, value = line.split('=', 1) info[key] = value return info def account_exists(self, username): """Check if an account exists in the accounts database""" success, _ = self._run_db_command(['accounts', 'show', username]) return success def get_account_type(self, username): """Get the type of an account (user, admin, etc.)""" info = self.get_account_info(username) return info.get('type') if info else None def is_user_account(self, username): """Check if the account is a user account (not system account)""" account_type = self.get_account_type(username) return account_type == 'user' class SMEPasswordManager: """Handle password operations for SME Server""" def __init__(self): self.config_db = SMEConfigDB() def validate_username(self, username): """Validate username format and existence""" if not username: return False, "Username cannot be empty" # Check username format (alphanumeric, underscore, hyphen) if not re.match(r'^[a-zA-Z0-9_-]+$', username): return False, "Username contains invalid characters" # Check if account exists if not self.config_db.account_exists(username): return False, "User account does not exist" # Check if it's a user account (not system account) if not self.config_db.is_user_account(username): return False, "Account is not a user account" return True, "Username is valid" def validate_password_strength(self, password): """Validate password meets SME Server requirements""" errors = [] if len(password) < 7: errors.append("Password must be at least 7 characters long") if len(password) > 127: errors.append("Password must be no more than 127 characters long") # Check for at least one letter and one number has_letter = bool(re.search(r'[a-zA-Z]', password)) has_number = bool(re.search(r'\d', password)) if not (has_letter and has_number): errors.append("Password must contain at least one letter and one number") # Check for forbidden characters (some systems don't allow certain chars) forbidden_chars = [':', ';', '|', '&', '!', '\\', '"', "'"] for char in forbidden_chars: if char in password: errors.append("Password cannot contain the character: {}".format(char)) break return errors def verify_current_password(self, username, password): """Verify the current password for a user using system authentication""" try: # Use the 'su' command to verify password # This is safer than directly accessing shadow files process = subprocess.Popen( ['su', username, '-c', 'true'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(input=password + '\n', timeout=10) return process.returncode == 0 except subprocess.TimeoutExpired: logger.error("Password verification timed out") return False except Exception as e: logger.error("Error verifying password: {}".format(e)) return False def change_password(self, username, new_password): """Change user password and signal the update event""" try: # First, change the password using passwd command process = subprocess.Popen( ['passwd', username], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Send the new password twice (passwd asks for confirmation) input_data = "{}\n{}\n".format(new_password, new_password) stdout, stderr = process.communicate(input=input_data, timeout=30) if process.returncode != 0: logger.error("passwd command failed: {}".format(stderr)) return False, "Failed to change password: {}".format(stderr) # Signal the password update event signal_result = subprocess.run( ['/sbin/e-smith/signal-event', 'password-update', username], capture_output=True, text=True, timeout=60 ) if signal_result.returncode != 0: logger.error("signal-event failed: {}".format(signal_result.stderr)) return False, "Password changed but failed to update system: {}".format(signal_result.stderr) logger.info("Password successfully changed for user: {}".format(username)) return True, "Password changed successfully" except subprocess.TimeoutExpired: logger.error("Password change operation timed out") return False, "Password change operation timed out" except FileNotFoundError as e: logger.error("Required command not found: {}".format(e)) return False, "System command not available" except Exception as e: logger.error("Unexpected error changing password: {}".format(e)) return False, "Unexpected error: {}".format(str(e)) class SMESystemInfo: """Get SME Server system information""" @staticmethod def get_version(): """Get SME Server version""" try: # Try to read version from release file version_files = [ '/etc/e-smith-release', '/etc/sme-release', '/etc/redhat-release' ] for version_file in version_files: if os.path.exists(version_file): with open(version_file, 'r') as f: return f.read().strip() return "SME Server (version unknown)" except Exception: return "SME Server (version unknown)" @staticmethod def get_copyright_info(): """Get copyright information""" return { 'mitel': 'Copyright 1999-2006 Mitel Corporation', 'rights': 'All rights reserved.', 'koozali': 'Copyright (C) 2013 - 2021 Koozali Foundation Inc.' }