2025-07-16 14:13:26 +01:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
"""
|
2025-07-20 10:13:38 +01:00
|
|
|
Enhanced SME Server Utilities Module - Python 3.6.8 Compatible
|
2025-07-16 14:13:26 +01:00
|
|
|
|
|
|
|
This module provides utilities for interfacing with SME Server's
|
2025-07-20 10:13:38 +01:00
|
|
|
configuration database and system commands with enhanced password
|
|
|
|
strength validation.
|
2025-07-16 14:13:26 +01:00
|
|
|
|
|
|
|
Compatible with Python 3.6.8
|
|
|
|
"""
|
|
|
|
|
|
|
|
import subprocess
|
|
|
|
import os
|
|
|
|
import logging
|
|
|
|
import re
|
2025-07-20 10:13:38 +01:00
|
|
|
import hashlib
|
2025-07-16 14:13:26 +01:00
|
|
|
|
|
|
|
# Set up logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
class SMEServerError(Exception):
|
|
|
|
"""Custom exception for SME Server related errors"""
|
|
|
|
pass
|
|
|
|
|
|
|
|
class SMEConfigDB:
|
|
|
|
"""Interface for SME Server configuration database"""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.db_command = '/sbin/e-smith/db'
|
|
|
|
|
|
|
|
def _run_db_command(self, args):
|
|
|
|
"""Run a database command and return success status and output"""
|
|
|
|
try:
|
|
|
|
cmd = [self.db_command] + args
|
|
|
|
result = subprocess.run(
|
|
|
|
cmd,
|
|
|
|
capture_output=True,
|
|
|
|
text=True,
|
|
|
|
timeout=30
|
|
|
|
)
|
|
|
|
return result.returncode == 0, result.stdout.strip()
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
logger.error("Database command timed out: {}".format(args))
|
|
|
|
return False, "Command timed out"
|
|
|
|
except FileNotFoundError:
|
|
|
|
logger.error("Database command not found: {}".format(self.db_command))
|
|
|
|
return False, "Database command not available"
|
|
|
|
except Exception as e:
|
|
|
|
logger.error("Error running database command: {}".format(e))
|
|
|
|
return False, str(e)
|
|
|
|
|
|
|
|
def get_account_info(self, username):
|
|
|
|
"""Get account information from the accounts database"""
|
|
|
|
success, output = self._run_db_command(['accounts', 'show', username])
|
|
|
|
|
|
|
|
if not success:
|
|
|
|
return None
|
|
|
|
|
|
|
|
# Parse the output into a dictionary
|
|
|
|
info = {'username': username}
|
|
|
|
for line in output.split('\n'):
|
|
|
|
if '=' in line:
|
|
|
|
key, value = line.split('=', 1)
|
|
|
|
info[key] = value
|
|
|
|
|
|
|
|
return info
|
|
|
|
|
|
|
|
def account_exists(self, username):
|
|
|
|
"""Check if an account exists in the accounts database"""
|
|
|
|
success, _ = self._run_db_command(['accounts', 'show', username])
|
|
|
|
return success
|
|
|
|
|
|
|
|
def get_account_type(self, username):
|
|
|
|
"""Get the type of an account (user, admin, etc.)"""
|
|
|
|
info = self.get_account_info(username)
|
|
|
|
return info.get('type') if info else None
|
|
|
|
|
|
|
|
def is_user_account(self, username):
|
|
|
|
"""Check if the account is a user account (not system account)"""
|
|
|
|
account_type = self.get_account_type(username)
|
|
|
|
return account_type == 'user'
|
2025-07-20 10:13:38 +01:00
|
|
|
|
|
|
|
def get_password_strength_setting(self):
|
|
|
|
"""Get the password strength setting from configuration database"""
|
|
|
|
success, output = self._run_db_command(['configuration', 'getprop', 'passwordstrength', 'Passwordstrength'])
|
|
|
|
|
|
|
|
if success and output:
|
|
|
|
strength = output.strip().lower()
|
|
|
|
if strength in ['none', 'normal', 'strong']:
|
|
|
|
return strength
|
|
|
|
|
|
|
|
# Default to 'normal' if not set or invalid
|
|
|
|
return 'normal'
|
|
|
|
|
|
|
|
def set_password_strength_setting(self, strength):
|
|
|
|
"""Set the password strength setting in configuration database"""
|
|
|
|
if strength.lower() not in ['none', 'normal', 'strong']:
|
|
|
|
return False, "Invalid strength level. Must be 'none', 'normal', or 'strong'"
|
|
|
|
|
|
|
|
success, output = self._run_db_command(['configuration', 'setprop', 'passwordstrength', 'Passwordstrength', strength.lower()])
|
|
|
|
return success, output
|
|
|
|
|
|
|
|
class PasswordStrengthValidator:
|
|
|
|
"""Enhanced password strength validation with configurable levels"""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
# Common weak passwords and patterns for strong validation
|
|
|
|
self.common_passwords = {
|
|
|
|
'password', 'password123', '123456', '123456789', 'qwerty', 'abc123',
|
|
|
|
'password1', 'admin', 'administrator', 'root', 'user', 'guest',
|
|
|
|
'welcome', 'login', 'pass', 'secret', 'default', 'changeme',
|
|
|
|
'letmein', 'monkey', 'dragon', 'master', 'shadow', 'superman',
|
|
|
|
'michael', 'jennifer', 'jordan', 'michelle', 'daniel', 'andrew'
|
|
|
|
}
|
|
|
|
|
|
|
|
self.common_patterns = [
|
|
|
|
r'^(.)\1+$', # All same character
|
|
|
|
r'^\d+$', # All numbers
|
|
|
|
r'^[a-z]+$', # All lowercase
|
|
|
|
r'^[A-Z]+$', # All uppercase
|
|
|
|
r'^(abc|123|qwe|asd|zxc)', # Common sequences
|
|
|
|
r'(password|admin|user|guest|login)', # Common words
|
|
|
|
]
|
|
|
|
|
|
|
|
def validate_password_strength(self, password, strength_level='normal'):
|
|
|
|
"""Validate password based on configured strength level"""
|
|
|
|
errors = []
|
|
|
|
|
|
|
|
if strength_level == 'none':
|
|
|
|
# No validation - only basic length check
|
|
|
|
if len(password) < 1:
|
|
|
|
errors.append("Password cannot be empty")
|
|
|
|
return errors
|
|
|
|
|
|
|
|
elif strength_level == 'normal':
|
|
|
|
# Normal validation: 12+ chars, complexity requirements
|
|
|
|
errors.extend(self._validate_normal_strength(password))
|
|
|
|
|
|
|
|
elif strength_level == 'strong':
|
|
|
|
# Strong validation: Normal + crypto testing
|
|
|
|
errors.extend(self._validate_normal_strength(password))
|
|
|
|
errors.extend(self._validate_strong_crypto(password))
|
|
|
|
|
|
|
|
return errors
|
|
|
|
|
|
|
|
def _validate_normal_strength(self, password):
|
|
|
|
"""Validate normal strength requirements"""
|
|
|
|
errors = []
|
|
|
|
|
|
|
|
# Minimum 12 characters
|
|
|
|
if len(password) < 12:
|
|
|
|
errors.append("Password must be at least 12 characters long")
|
|
|
|
|
|
|
|
# Maximum length check
|
|
|
|
if len(password) > 127:
|
|
|
|
errors.append("Password must be no more than 127 characters long")
|
|
|
|
|
|
|
|
# Character type requirements
|
|
|
|
has_upper = bool(re.search(r'[A-Z]', password))
|
|
|
|
has_lower = bool(re.search(r'[a-z]', password))
|
|
|
|
has_numeric = bool(re.search(r'\d', password))
|
|
|
|
has_non_alpha = bool(re.search(r'[^a-zA-Z0-9]', password))
|
|
|
|
|
|
|
|
missing_types = []
|
|
|
|
if not has_upper:
|
|
|
|
missing_types.append("uppercase letter")
|
|
|
|
if not has_lower:
|
|
|
|
missing_types.append("lowercase letter")
|
|
|
|
if not has_numeric:
|
|
|
|
missing_types.append("number")
|
|
|
|
if not has_non_alpha:
|
|
|
|
missing_types.append("special character")
|
|
|
|
|
|
|
|
if missing_types:
|
|
|
|
errors.append("Password must contain at least one: {}".format(", ".join(missing_types)))
|
|
|
|
|
|
|
|
return errors
|
|
|
|
|
|
|
|
def _validate_strong_crypto(self, password):
|
|
|
|
"""Validate strong crypto requirements"""
|
|
|
|
errors = []
|
|
|
|
|
|
|
|
# Check against common passwords
|
|
|
|
if password.lower() in self.common_passwords:
|
|
|
|
errors.append("Password is too common and easily guessable")
|
|
|
|
|
|
|
|
# Check for common patterns
|
|
|
|
for pattern in self.common_patterns:
|
|
|
|
if re.search(pattern, password.lower()):
|
|
|
|
errors.append("Password contains common patterns that are easily guessable")
|
|
|
|
break
|
|
|
|
|
|
|
|
# Check for keyboard patterns
|
|
|
|
keyboard_patterns = [
|
|
|
|
'qwertyuiop', 'asdfghjkl', 'zxcvbnm',
|
|
|
|
'1234567890', '0987654321',
|
|
|
|
'qwerty', 'asdfgh', 'zxcvbn'
|
|
|
|
]
|
|
|
|
|
|
|
|
password_lower = password.lower()
|
|
|
|
for pattern in keyboard_patterns:
|
|
|
|
if pattern in password_lower or pattern[::-1] in password_lower:
|
|
|
|
errors.append("Password contains keyboard patterns that are easily guessable")
|
|
|
|
break
|
|
|
|
|
|
|
|
# Check for repeated sequences
|
|
|
|
if self._has_repeated_sequences(password):
|
|
|
|
errors.append("Password contains repeated sequences that reduce security")
|
|
|
|
|
|
|
|
# Check for dictionary words (basic check)
|
|
|
|
if self._contains_dictionary_words(password):
|
|
|
|
errors.append("Password contains common dictionary words")
|
|
|
|
|
|
|
|
return errors
|
|
|
|
|
|
|
|
def _has_repeated_sequences(self, password):
|
|
|
|
"""Check for repeated character sequences"""
|
|
|
|
for i in range(len(password) - 2):
|
|
|
|
sequence = password[i:i+3]
|
|
|
|
if password.count(sequence) > 1:
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def _contains_dictionary_words(self, password):
|
|
|
|
"""Basic check for common dictionary words"""
|
|
|
|
common_words = [
|
|
|
|
'password', 'admin', 'user', 'login', 'welcome', 'secret',
|
|
|
|
'computer', 'internet', 'security', 'system', 'network',
|
|
|
|
'server', 'database', 'application', 'software', 'hardware'
|
|
|
|
]
|
|
|
|
|
|
|
|
password_lower = password.lower()
|
|
|
|
for word in common_words:
|
|
|
|
if len(word) >= 4 and word in password_lower:
|
|
|
|
return True
|
|
|
|
return False
|
2025-07-16 14:13:26 +01:00
|
|
|
|
|
|
|
class SMEPasswordManager:
|
2025-07-20 10:13:38 +01:00
|
|
|
"""Handle password operations for SME Server with enhanced validation"""
|
2025-07-16 14:13:26 +01:00
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.config_db = SMEConfigDB()
|
2025-07-20 10:13:38 +01:00
|
|
|
self.strength_validator = PasswordStrengthValidator()
|
2025-07-16 14:13:26 +01:00
|
|
|
|
|
|
|
def validate_username(self, username):
|
|
|
|
"""Validate username format and existence"""
|
|
|
|
if not username:
|
|
|
|
return False, "Username cannot be empty"
|
|
|
|
|
|
|
|
# Check username format (alphanumeric, underscore, hyphen)
|
|
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', username):
|
|
|
|
return False, "Username contains invalid characters"
|
|
|
|
|
|
|
|
# Check if account exists
|
|
|
|
if not self.config_db.account_exists(username):
|
|
|
|
return False, "User account does not exist"
|
|
|
|
|
|
|
|
# Check if it's a user account (not system account)
|
|
|
|
if not self.config_db.is_user_account(username):
|
|
|
|
return False, "Account is not a user account"
|
|
|
|
|
|
|
|
return True, "Username is valid"
|
|
|
|
|
|
|
|
def validate_password_strength(self, password):
|
2025-07-20 10:13:38 +01:00
|
|
|
"""Validate password meets configured SME Server requirements"""
|
|
|
|
# Get current password strength setting
|
|
|
|
strength_level = self.config_db.get_password_strength_setting()
|
2025-07-16 14:13:26 +01:00
|
|
|
|
2025-07-20 10:13:38 +01:00
|
|
|
# Use the enhanced validator
|
|
|
|
errors = self.strength_validator.validate_password_strength(password, strength_level)
|
2025-07-16 14:13:26 +01:00
|
|
|
|
2025-07-20 10:13:38 +01:00
|
|
|
return errors
|
|
|
|
|
|
|
|
def get_password_strength_info(self):
|
|
|
|
"""Get current password strength setting and requirements"""
|
|
|
|
strength_level = self.config_db.get_password_strength_setting()
|
2025-07-16 14:13:26 +01:00
|
|
|
|
2025-07-20 10:13:38 +01:00
|
|
|
requirements = {
|
|
|
|
'level': strength_level,
|
|
|
|
'description': self._get_strength_description(strength_level)
|
|
|
|
}
|
2025-07-16 14:13:26 +01:00
|
|
|
|
2025-07-20 10:13:38 +01:00
|
|
|
return requirements
|
|
|
|
|
|
|
|
def _get_strength_description(self, strength_level):
|
|
|
|
"""Get human-readable description of strength requirements"""
|
|
|
|
descriptions = {
|
|
|
|
'none': 'No specific password requirements',
|
|
|
|
'normal': 'Minimum 12 characters with uppercase, lowercase, number, and special character',
|
|
|
|
'strong': 'Normal requirements plus protection against common passwords and patterns'
|
|
|
|
}
|
|
|
|
return descriptions.get(strength_level, 'Unknown strength level')
|
2025-07-16 14:13:26 +01:00
|
|
|
|
|
|
|
def verify_current_password(self, username, password):
|
|
|
|
"""Verify the current password for a user using system authentication"""
|
|
|
|
try:
|
|
|
|
# Use the 'su' command to verify password
|
|
|
|
# This is safer than directly accessing shadow files
|
|
|
|
process = subprocess.Popen(
|
|
|
|
['su', username, '-c', 'true'],
|
|
|
|
stdin=subprocess.PIPE,
|
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
stderr=subprocess.PIPE,
|
|
|
|
text=True
|
|
|
|
)
|
|
|
|
|
|
|
|
stdout, stderr = process.communicate(input=password + '\n', timeout=10)
|
|
|
|
return process.returncode == 0
|
|
|
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
logger.error("Password verification timed out")
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
|
|
logger.error("Error verifying password: {}".format(e))
|
|
|
|
return False
|
|
|
|
|
|
|
|
def change_password(self, username, new_password):
|
|
|
|
"""Change user password and signal the update event"""
|
|
|
|
try:
|
|
|
|
# First, change the password using passwd command
|
|
|
|
process = subprocess.Popen(
|
|
|
|
['passwd', username],
|
|
|
|
stdin=subprocess.PIPE,
|
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
stderr=subprocess.PIPE,
|
|
|
|
text=True
|
|
|
|
)
|
|
|
|
|
|
|
|
# Send the new password twice (passwd asks for confirmation)
|
|
|
|
input_data = "{}\n{}\n".format(new_password, new_password)
|
|
|
|
stdout, stderr = process.communicate(input=input_data, timeout=30)
|
|
|
|
|
|
|
|
if process.returncode != 0:
|
|
|
|
logger.error("passwd command failed: {}".format(stderr))
|
|
|
|
return False, "Failed to change password: {}".format(stderr)
|
|
|
|
|
|
|
|
# Signal the password update event
|
|
|
|
signal_result = subprocess.run(
|
|
|
|
['/sbin/e-smith/signal-event', 'password-update', username],
|
|
|
|
capture_output=True,
|
|
|
|
text=True,
|
|
|
|
timeout=60
|
|
|
|
)
|
|
|
|
|
|
|
|
if signal_result.returncode != 0:
|
|
|
|
logger.error("signal-event failed: {}".format(signal_result.stderr))
|
|
|
|
return False, "Password changed but failed to update system: {}".format(signal_result.stderr)
|
|
|
|
|
|
|
|
logger.info("Password successfully changed for user: {}".format(username))
|
|
|
|
return True, "Password changed successfully"
|
|
|
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
logger.error("Password change operation timed out")
|
|
|
|
return False, "Password change operation timed out"
|
|
|
|
except FileNotFoundError as e:
|
|
|
|
logger.error("Required command not found: {}".format(e))
|
|
|
|
return False, "System command not available"
|
|
|
|
except Exception as e:
|
|
|
|
logger.error("Unexpected error changing password: {}".format(e))
|
|
|
|
return False, "Unexpected error: {}".format(str(e))
|
|
|
|
|
|
|
|
class SMESystemInfo:
|
|
|
|
"""Get SME Server system information"""
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_version():
|
|
|
|
"""Get SME Server version"""
|
|
|
|
try:
|
|
|
|
# Try to read version from release file
|
|
|
|
version_files = [
|
|
|
|
'/etc/e-smith-release',
|
|
|
|
'/etc/sme-release',
|
|
|
|
'/etc/redhat-release'
|
|
|
|
]
|
|
|
|
|
|
|
|
for version_file in version_files:
|
|
|
|
if os.path.exists(version_file):
|
|
|
|
with open(version_file, 'r') as f:
|
|
|
|
return f.read().strip()
|
|
|
|
|
|
|
|
return "SME Server (version unknown)"
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
return "SME Server (version unknown)"
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_copyright_info():
|
|
|
|
"""Get copyright information"""
|
|
|
|
return {
|
|
|
|
'mitel': 'Copyright 1999-2006 Mitel Corporation',
|
|
|
|
'rights': 'All rights reserved.',
|
|
|
|
'koozali': 'Copyright (C) 2013 - 2021 Koozali Foundation Inc.'
|
|
|
|
}
|
|
|
|
|