import os import sys import secrets import tempfile import subprocess import re import logging from logging.handlers import RotatingFileHandler from flask import Flask, request, render_template_string, jsonify, send_file, redirect, url_for, flash from werkzeug.utils import secure_filename from werkzeug.exceptions import RequestEntityTooLarge from flask_talisman import Talisman from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_cors import CORS from functools import wraps from dotenv import load_dotenv import json # --- ENV GENERATOR --- def generate_env(): secret_key = secrets.token_urlsafe(32) api_token = secrets.token_hex(20) with open(".env", "w") as f: f.write(f"FLASK_SECRET_KEY={secret_key}\n") f.write(f"FLASK_API_TOKEN={api_token}\n") f.write(f"FLASK_DEBUG=False\n") print("✅ .env generated") print(f"FLASK_SECRET_KEY={secret_key}") print(f"FLASK_API_TOKEN={api_token}") print("⚠ Keep .env out of version control!") sys.exit(0) if "--genenv" in sys.argv: generate_env() # --- LOAD ENV --- load_dotenv() BASE_DIR = os.path.dirname(os.path.abspath(__file__)) MOJO_FMT_PATH = os.path.join(BASE_DIR, "mojofmt.py") SECRET_KEY = os.environ.get('FLASK_SECRET_KEY') API_TOKEN = os.environ.get('FLASK_API_TOKEN') DEBUG_MODE = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' if not SECRET_KEY or not API_TOKEN: raise RuntimeError("FLASK_SECRET_KEY and FLASK_API_TOKEN must be set (use --genenv to create .env)") # --- FLASK APP CONFIGURATION --- app = Flask(__name__) app.secret_key = SECRET_KEY # Enhanced app configuration app.config.update( SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax', MAX_CONTENT_LENGTH=1024 * 1024, # 1MB limit DEBUG=DEBUG_MODE ) # --- LOGGING CONFIGURATION --- if not app.debug: if not os.path.exists('logs'): os.mkdir('logs') file_handler = RotatingFileHandler('logs/app.log', maxBytes=10240, backupCount=10) file_handler.setFormatter(logging.Formatter( '%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s' )) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) app.logger.setLevel(logging.INFO) app.logger.info('Flask application startup') # --- SECURITY CONFIGURATION --- def configure_security_headers(app): """Configure security headers for Flask-Talisman 1.0.0 compatibility""" # Enhanced CSP configuration - different for development vs production if app.debug: # Development CSP - allows inline styles and scripts for easier development csp = { 'default-src': ["'self'"], 'script-src': ["'self'", 'https://cdn.jsdelivr.net', "'unsafe-inline'"], 'style-src': ["'self'", 'https://cdn.jsdelivr.net', "'unsafe-inline'"], 'img-src': ["'self'", 'data:'], 'font-src': ["'self'", 'https://cdn.jsdelivr.net'], 'connect-src': ["'self'"], 'frame-ancestors': ["'none'"], 'base-uri': ["'self'"], 'form-action': ["'self'"] } else: # Production CSP - strict security policy csp = { 'default-src': ["'self'"], 'script-src': ["'self'", 'https://cdn.jsdelivr.net'], 'style-src': ["'self'", 'https://cdn.jsdelivr.net'], 'img-src': ["'self'", 'data:'], 'font-src': ["'self'", 'https://cdn.jsdelivr.net'], 'connect-src': ["'self'"], 'frame-ancestors': ["'none'"], 'base-uri': ["'self'"], 'form-action': ["'self'"] } # Initialize Talisman with enhanced configuration Talisman(app, content_security_policy=csp, force_https=not app.debug, # Only force HTTPS in production strict_transport_security=not app.debug, # Only enable HSTS in production strict_transport_security_max_age=31536000 if not app.debug else 0, strict_transport_security_include_subdomains=not app.debug, frame_options='DENY', x_content_type_options=True, referrer_policy='strict-origin-when-cross-origin' ) # Manual headers for Flask-Talisman 1.0.0 compatibility @app.after_request def add_security_headers(response): # Disable deprecated X-XSS-Protection (version 1.0.0 compatibility) response.headers['X-XSS-Protection'] = '0' # Add Permissions-Policy for privacy (version 1.0.0 compatibility) response.headers['Permissions-Policy'] = 'browsing-topics=()' # Additional security headers response.headers['X-Download-Options'] = 'noopen' response.headers['X-Permitted-Cross-Domain-Policies'] = 'none' return response # Apply security configuration configure_security_headers(app) # Enable CORS for all routes CORS(app) # Enhanced rate limiting configuration limiter = Limiter( key_func=get_remote_address, app=app, default_limits=["1000/day", "100/hour"], storage_uri="memory://" # Use Redis in production ) # --- INPUT VALIDATION --- def validate_input_text(text: str) -> bool: """Validate input text for security""" # Size limit (1MB) if len(text.encode('utf-8')) > 1024 * 1024: return False # Content validation - only allow printable characters and common whitespace if not re.match(r'^[\x20-\x7E\s]*$', text): return False return True def validate_api_input(data): """Validate API input data""" if not isinstance(data, dict): raise ValueError("Invalid data format") input_text = data.get("input_text", "") if not isinstance(input_text, str): raise ValueError("input_text must be a string") if len(input_text.strip()) == 0: raise ValueError("input_text cannot be empty") if len(input_text.encode('utf-8')) > 1024 * 1024: # 1MB raise ValueError("input_text too large") return True # --- FILE UPLOAD VALIDATION --- ALLOWED_EXTENSIONS = {'.ep'} MAX_FILE_SIZE = 1024 * 1024 # 1MB def validate_file_upload(file): """Enhanced file validation""" if not file or not file.filename: return False, "No file provided" # Check file extension filename = secure_filename(file.filename) if not any(filename.endswith(ext) for ext in ALLOWED_EXTENSIONS): return False, "Invalid file type" # Check file size file.seek(0, 2) # Seek to end size = file.tell() file.seek(0) # Reset if size > MAX_FILE_SIZE: return False, "File too large" # Basic content validation try: content = file.read().decode('utf-8') file.seek(0) # Reset # Validate content is text if not content.isprintable() and not all(c in '\n\r\t' for c in content if not c.isprintable()): return False, "Invalid file content" except UnicodeDecodeError: return False, "File must be valid UTF-8 text" return True, "Valid file" # --- SECURE SUBPROCESS EXECUTION --- def get_formatter_version(): """Get formatter version safely""" try: if not os.path.exists(MOJO_FMT_PATH): app.logger.warning(f"Formatter script not found at {MOJO_FMT_PATH}") return "Unknown" result = subprocess.run( ["python3", MOJO_FMT_PATH, "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10 # Add timeout ) if result.returncode == 0: return result.stdout.strip() else: app.logger.warning(f"Could not get formatter version: {result.stderr}") return "Unknown" except subprocess.TimeoutExpired: app.logger.warning("Formatter version check timed out") return "Unknown" except Exception as e: app.logger.warning(f"Could not get formatter version: {e}") return "Unknown" def run_mojofmt(input_text: str) -> str: """Secure mojofmt execution with comprehensive validation""" # Validate input first if not validate_input_text(input_text): raise ValueError("Invalid input text") app.logger.debug("Running mojofmt") with tempfile.TemporaryDirectory() as tmpdir: in_path = os.path.join(tmpdir, "input.txt") out_path = os.path.join(tmpdir, "output.txt") # Secure file writing try: with open(in_path, 'w', encoding='utf-8') as f: f.write(input_text) except Exception as e: app.logger.error(f"Failed to write input file: {e}") raise RuntimeError("Failed to write input file") # Validate formatter script exists if not os.path.exists(MOJO_FMT_PATH): app.logger.error(f"Formatter script not found at {MOJO_FMT_PATH}") raise RuntimeError("Formatter script not found") # Secure subprocess execution with timeout try: result = subprocess.run( ['python3', MOJO_FMT_PATH, '-o', out_path, in_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30, # Add timeout cwd=tmpdir # Set working directory ) except subprocess.TimeoutExpired: app.logger.error("Formatting operation timed out") raise RuntimeError("Formatting operation timed out") except Exception as e: app.logger.error(f"Subprocess execution failed: {e}") raise RuntimeError("Formatting failed") if result.returncode != 0: # Don't expose internal error details app.logger.error(f"mojofmt failed with return code {result.returncode}: {result.stderr}") raise RuntimeError("Formatting failed") try: with open(out_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: app.logger.error(f"Failed to read output file: {e}") raise RuntimeError("Failed to read output file") FORMATTER_VERSION = get_formatter_version() # --- AUTHENTICATION --- def require_api_token(f): """API token authentication decorator (unchanged)""" @wraps(f) def decorated(*args, **kwargs): auth = request.headers.get('Authorization', '') if not auth.startswith('Bearer ') or auth[len('Bearer '):] != API_TOKEN: app.logger.warning(f"Unauthorized API access attempt from {request.remote_addr}") return jsonify({"error": "Unauthorized"}), 401 return f(*args, **kwargs) return decorated # --- ERROR HANDLERS --- @app.errorhandler(RequestEntityTooLarge) def handle_file_too_large(e): app.logger.warning(f"File too large from {request.remote_addr}") return jsonify({"error": "File too large"}), 413 @app.errorhandler(400) def handle_bad_request(e): app.logger.warning(f"Bad request from {request.remote_addr}: {e}") return jsonify({"error": "Bad request"}), 400 @app.errorhandler(404) def handle_not_found(e): return jsonify({"error": "Not found"}), 404 @app.errorhandler(429) def handle_rate_limit(e): app.logger.warning(f"Rate limit exceeded from {request.remote_addr}") return jsonify({"error": "Rate limit exceeded"}), 429 @app.errorhandler(500) def handle_internal_error(e): app.logger.error(f"Internal server error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.errorhandler(Exception) def handle_exception(e): """Global exception handler""" app.logger.error(f"Unhandled exception: {e}", exc_info=True) return jsonify({"error": "Internal server error"}), 500 # --- HTML TEMPLATE (unchanged) --- HTML_TEMPLATE = """