Files

437 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Corrected SME Server Utilities Module - Python 3.6.8 Compatible
This module provides utilities for interfacing with SME Server's
configuration database using the correct passwordstrength structure:
passwordstrength=configuration
Admin=strong
Ibays=strong
Users=strong
Compatible with Python 3.6.8
"""
import subprocess
import os
import logging
import re
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SMEServerError(Exception):
"""Custom exception for SME Server related errors"""
pass
class SMEConfigDB:
"""Interface for SME Server configuration database"""
def __init__(self):
self.db_command = '/sbin/e-smith/db'
def _run_db_command(self, args):
"""Run a database command and return success status and output"""
try:
cmd = [self.db_command] + args
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
return result.returncode == 0, result.stdout.strip()
except subprocess.TimeoutExpired:
logger.error("Database command timed out: {}".format(args))
return False, "Command timed out"
except FileNotFoundError:
logger.error("Database command not found: {}".format(self.db_command))
return False, "Database command not available"
except Exception as e:
logger.error("Error running database command: {}".format(e))
return False, str(e)
def get_account_info(self, username):
"""Get account information from the accounts database"""
success, output = self._run_db_command(['accounts', 'show', username])
if not success:
return None
# Parse the output into a dictionary
info = {'username': username}
for line in output.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
info[key] = value
return info
def account_exists(self, username):
"""Check if an account exists in the accounts database"""
success, _ = self._run_db_command(['accounts', 'show', username])
return success
def get_account_type(self, username):
"""Get the type of an account (user, admin, etc.)"""
info = self.get_account_info(username)
return info.get('type') if info else None
def is_user_account(self, username):
"""Check if the account is a user account (not system account)"""
account_type = self.get_account_type(username)
return account_type == 'user'
def is_admin_account(self, username):
"""Check if the account is an admin account"""
account_type = self.get_account_type(username)
return account_type == 'admin'
def get_password_strength_setting(self, account_type='Users'):
"""Get the password strength setting for specific account type
Args:
account_type: 'Users', 'Admin', or 'Ibays'
"""
success, output = self._run_db_command(['configuration', 'getprop', 'passwordstrength', account_type])
if success and output:
strength = output.strip().lower()
if strength in ['none', 'normal', 'strong']:
return strength
# Default to 'normal' if not set or invalid
return 'normal'
def set_password_strength_setting(self, strength, account_type='Users'):
"""Set the password strength setting for specific account type
Args:
strength: 'none', 'normal', or 'strong'
account_type: 'Users', 'Admin', or 'Ibays'
"""
if strength.lower() not in ['none', 'normal', 'strong']:
return False, "Invalid strength level. Must be 'none', 'normal', or 'strong'"
success, output = self._run_db_command(['configuration', 'setprop', 'passwordstrength', account_type, strength.lower()])
return success, output
def get_all_password_strength_settings(self):
"""Get all password strength settings"""
success, output = self._run_db_command(['configuration', 'show', 'passwordstrength'])
settings = {
'Users': 'normal',
'Admin': 'normal',
'Ibays': 'normal'
}
if success and output:
for line in output.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip().lower()
if key in ['Users', 'Admin', 'Ibays'] and value in ['none', 'normal', 'strong']:
settings[key] = value
return settings
class BasicPasswordValidator:
"""Basic password validation for fallback when external library not available"""
def __init__(self):
# Common weak passwords for basic validation
self.common_passwords = {
'password', 'password123', '123456', '123456789', 'qwerty', 'abc123',
'password1', 'admin', 'administrator', 'root', 'user', 'guest',
'welcome', 'login', 'pass', 'secret', 'default', 'changeme',
'letmein', 'monkey', 'dragon', 'master', 'shadow', 'superman'
}
def validate_password_strength(self, password, strength_level='normal'):
"""Basic password validation"""
errors = []
if strength_level == 'none':
if len(password) < 1:
errors.append("Password cannot be empty")
return errors
elif strength_level == 'normal':
errors.extend(self._validate_normal_strength(password))
elif strength_level == 'strong':
errors.extend(self._validate_normal_strength(password))
errors.extend(self._validate_strong_basic(password))
return errors
def _validate_normal_strength(self, password):
"""Validate normal strength requirements"""
errors = []
if len(password) < 12:
errors.append("Password must be at least 12 characters long")
if len(password) > 127:
errors.append("Password must be no more than 127 characters long")
has_upper = any(c.isupper() for c in password)
has_lower = any(c.islower() for c in password)
has_numeric = any(c.isdigit() for c in password)
has_non_alpha = any(not c.isalnum() for c in password)
missing_types = []
if not has_upper:
missing_types.append("uppercase letter")
if not has_lower:
missing_types.append("lowercase letter")
if not has_numeric:
missing_types.append("number")
if not has_non_alpha:
missing_types.append("special character")
if missing_types:
errors.append("Password must contain at least one: {}".format(", ".join(missing_types)))
return errors
def _validate_strong_basic(self, password):
"""Basic strong validation"""
errors = []
if password.lower() in self.common_passwords:
errors.append("Password is too common and easily guessable")
# Basic keyboard pattern check
keyboard_patterns = ['qwerty', 'asdfgh', 'zxcvbn', '123456', '654321']
password_lower = password.lower()
for pattern in keyboard_patterns:
if pattern in password_lower:
errors.append("Password contains keyboard patterns that are easily guessable")
break
return errors
class SMEPasswordManager:
"""Handle password operations for SME Server with corrected DB structure"""
def __init__(self):
self.config_db = SMEConfigDB()
self.basic_validator = BasicPasswordValidator()
# Try to import external password validation library
self.external_validator = None
try:
import zxcvbn
self.external_validator = zxcvbn
logger.info("Using zxcvbn library for password validation")
except ImportError:
logger.info("zxcvbn library not available, using basic validation")
def validate_username(self, username):
"""Validate username format and existence"""
if not username:
return False, "Username cannot be empty"
# Check username format (alphanumeric, underscore, hyphen)
if not re.match(r'^[a-zA-Z0-9_-]+$', username):
return False, "Username contains invalid characters"
# Check if account exists
if not self.config_db.account_exists(username):
return False, "User account does not exist"
# Check if it's a user account (not system account)
if not self.config_db.is_user_account(username):
return False, "Account is not a user account"
return True, "Username is valid"
def validate_password_strength(self, password, username=None):
"""Validate password using external library or fallback to basic validation"""
# Get appropriate password strength setting
strength_level = self.config_db.get_password_strength_setting('Users')
if self.external_validator:
return self._validate_with_zxcvbn(password, strength_level, username)
else:
return self.basic_validator.validate_password_strength(password, strength_level)
def _validate_with_zxcvbn(self, password, strength_level, username=None):
"""Validate password using zxcvbn library"""
errors = []
if strength_level == 'none':
if len(password) < 1:
errors.append("Password cannot be empty")
return errors
# Basic length and complexity checks first
if len(password) < 12:
errors.append("Password must be at least 12 characters long")
if len(password) > 127:
errors.append("Password must be no more than 127 characters long")
# Character type requirements for normal and strong
if strength_level in ['normal', 'strong']:
has_upper = any(c.isupper() for c in password)
has_lower = any(c.islower() for c in password)
has_numeric = any(c.isdigit() for c in password)
has_non_alpha = any(not c.isalnum() for c in password)
missing_types = []
if not has_upper:
missing_types.append("uppercase letter")
if not has_lower:
missing_types.append("lowercase letter")
if not has_numeric:
missing_types.append("number")
if not has_non_alpha:
missing_types.append("special character")
if missing_types:
errors.append("Password must contain at least one: {}".format(", ".join(missing_types)))
# Use zxcvbn for advanced validation on strong passwords
if strength_level == 'strong' and not errors:
user_inputs = [username] if username else []
result = self.external_validator.zxcvbn(password, user_inputs)
# zxcvbn scores: 0-4 (0 = very weak, 4 = very strong)
if result["score"] < 3: # Require score of 3 or higher for strong
feedback = result.get('feedback', {})
warning = feedback.get('warning', '')
suggestions = feedback.get('suggestions', [])
if warning:
errors.append("Password weakness: {}".format(warning))
for suggestion in suggestions:
errors.append("Suggestion: {}".format(suggestion))
if not warning and not suggestions:
errors.append("Password is not strong enough (zxcvbn score: {}/4)".format(result['score']))
return errors
def get_password_strength_info(self, account_type='Users'):
"""Get current password strength setting and requirements"""
strength_level = self.config_db.get_password_strength_setting(account_type)
descriptions = {
'none': 'No specific password requirements',
'normal': 'Minimum 12 characters with uppercase, lowercase, number, and special character',
'strong': 'Normal requirements plus advanced security validation using zxcvbn library' if self.external_validator else 'Normal requirements plus basic pattern protection'
}
return {
'level': strength_level,
'description': descriptions.get(strength_level, 'Unknown strength level'),
'account_type': account_type,
'using_zxcvbn': self.external_validator is not None
}
def verify_current_password(self, username, password):
"""Verify the current password for a user using system authentication"""
try:
# Use the 'su' command to verify password
process = subprocess.Popen(
['su', username, '-c', 'true'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=password + '\n', timeout=10)
return process.returncode == 0
except subprocess.TimeoutExpired:
logger.error("Password verification timed out")
return False
except Exception as e:
logger.error("Error verifying password: {}".format(e))
return False
def change_password(self, username, new_password):
"""Change user password and signal the update event"""
try:
# First, change the password using passwd command
process = subprocess.Popen(
['passwd', username],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Send the new password twice (passwd asks for confirmation)
input_data = "{}\n{}\n".format(new_password, new_password)
stdout, stderr = process.communicate(input=input_data, timeout=30)
if process.returncode != 0:
logger.error("passwd command failed: {}".format(stderr))
return False, "Failed to change password: {}".format(stderr)
# Signal the password update event
signal_result = subprocess.run(
['/sbin/e-smith/signal-event', 'password-update', username],
capture_output=True,
text=True,
timeout=60
)
if signal_result.returncode != 0:
logger.error("signal-event failed: {}".format(signal_result.stderr))
return False, "Password changed but failed to update system: {}".format(signal_result.stderr)
logger.info("Password successfully changed for user: {}".format(username))
return True, "Password changed successfully"
except subprocess.TimeoutExpired:
logger.error("Password change operation timed out")
return False, "Password change operation timed out"
except FileNotFoundError as e:
logger.error("Required command not found: {}".format(e))
return False, "System command not available"
except Exception as e:
logger.error("Unexpected error changing password: {}".format(e))
return False, "Unexpected error: {}".format(str(e))
class SMESystemInfo:
"""Get SME Server system information"""
@staticmethod
def get_version():
"""Get SME Server version"""
try:
version_files = [
'/etc/e-smith-release',
'/etc/sme-release',
'/etc/redhat-release'
]
for version_file in version_files:
if os.path.exists(version_file):
with open(version_file, 'r') as f:
return f.read().strip()
return "SME Server (version unknown)"
except Exception:
return "SME Server (version unknown)"
@staticmethod
def get_copyright_info():
"""Get copyright information"""
return {
'mitel': 'Copyright 1999-2006 Mitel Corporation',
'rights': 'All rights reserved.',
'koozali': 'Copyright (C) 2013 - 2021 Koozali Foundation Inc.'
}