Add shell script to use API and update security for website
This commit is contained in:
376
website.py
376
website.py
@@ -3,15 +3,18 @@ import sys
|
||||
import secrets
|
||||
import tempfile
|
||||
import subprocess
|
||||
import re
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from flask import Flask, request, render_template_string, jsonify, send_file, redirect, url_for, flash
|
||||
from werkzeug.utils import secure_filename
|
||||
from werkzeug.exceptions import RequestEntityTooLarge
|
||||
from flask_talisman import Talisman
|
||||
from flask_limiter import Limiter
|
||||
from flask_limiter.util import get_remote_address
|
||||
from flask_cors import CORS
|
||||
from functools import wraps
|
||||
from dotenv import load_dotenv
|
||||
from flask import send_file
|
||||
import json
|
||||
|
||||
# --- ENV GENERATOR ---
|
||||
@@ -21,6 +24,7 @@ def generate_env():
|
||||
with open(".env", "w") as f:
|
||||
f.write(f"FLASK_SECRET_KEY={secret_key}\n")
|
||||
f.write(f"FLASK_API_TOKEN={api_token}\n")
|
||||
f.write(f"FLASK_DEBUG=False\n")
|
||||
print("✅ .env generated")
|
||||
print(f"FLASK_SECRET_KEY={secret_key}")
|
||||
print(f"FLASK_API_TOKEN={api_token}")
|
||||
@@ -37,57 +41,302 @@ MOJO_FMT_PATH = os.path.join(BASE_DIR, "mojofmt.py")
|
||||
|
||||
SECRET_KEY = os.environ.get('FLASK_SECRET_KEY')
|
||||
API_TOKEN = os.environ.get('FLASK_API_TOKEN')
|
||||
DEBUG_MODE = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
|
||||
|
||||
if not SECRET_KEY or not API_TOKEN:
|
||||
raise RuntimeError("FLASK_SECRET_KEY and FLASK_API_TOKEN must be set (use --genenv to create .env)")
|
||||
|
||||
# --- FLASK APP CONFIGURATION ---
|
||||
app = Flask(__name__)
|
||||
app.secret_key = SECRET_KEY
|
||||
|
||||
# Enhanced app configuration
|
||||
app.config.update(
|
||||
SESSION_COOKIE_HTTPONLY=True,
|
||||
SESSION_COOKIE_SECURE=True,
|
||||
SESSION_COOKIE_SAMESITE='Lax'
|
||||
SESSION_COOKIE_SAMESITE='Lax',
|
||||
MAX_CONTENT_LENGTH=1024 * 1024, # 1MB limit
|
||||
DEBUG=DEBUG_MODE
|
||||
)
|
||||
|
||||
# --- LOGGING CONFIGURATION ---
|
||||
if not app.debug:
|
||||
if not os.path.exists('logs'):
|
||||
os.mkdir('logs')
|
||||
file_handler = RotatingFileHandler('logs/app.log', maxBytes=10240, backupCount=10)
|
||||
file_handler.setFormatter(logging.Formatter(
|
||||
'%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s'
|
||||
))
|
||||
file_handler.setLevel(logging.INFO)
|
||||
app.logger.addHandler(file_handler)
|
||||
app.logger.setLevel(logging.INFO)
|
||||
app.logger.info('Flask application startup')
|
||||
|
||||
# --- SECURITY CONFIGURATION ---
|
||||
def configure_security_headers(app):
|
||||
"""Configure security headers for Flask-Talisman 1.0.0 compatibility"""
|
||||
|
||||
# Enhanced CSP configuration - different for development vs production
|
||||
if app.debug:
|
||||
# Development CSP - allows inline styles and scripts for easier development
|
||||
csp = {
|
||||
'default-src': ["'self'"],
|
||||
'script-src': ["'self'", 'https://cdn.jsdelivr.net', "'unsafe-inline'"],
|
||||
'style-src': ["'self'", 'https://cdn.jsdelivr.net', "'unsafe-inline'"],
|
||||
'img-src': ["'self'", 'data:'],
|
||||
'font-src': ["'self'", 'https://cdn.jsdelivr.net'],
|
||||
'connect-src': ["'self'"],
|
||||
'frame-ancestors': ["'none'"],
|
||||
'base-uri': ["'self'"],
|
||||
'form-action': ["'self'"]
|
||||
}
|
||||
else:
|
||||
# Production CSP - strict security policy
|
||||
csp = {
|
||||
'default-src': ["'self'"],
|
||||
'script-src': ["'self'", 'https://cdn.jsdelivr.net'],
|
||||
'style-src': ["'self'", 'https://cdn.jsdelivr.net'],
|
||||
'img-src': ["'self'", 'data:'],
|
||||
'font-src': ["'self'", 'https://cdn.jsdelivr.net'],
|
||||
'connect-src': ["'self'"],
|
||||
'frame-ancestors': ["'none'"],
|
||||
'base-uri': ["'self'"],
|
||||
'form-action': ["'self'"]
|
||||
}
|
||||
|
||||
# Initialize Talisman with enhanced configuration
|
||||
Talisman(app,
|
||||
content_security_policy=csp,
|
||||
force_https=not app.debug, # Only force HTTPS in production
|
||||
strict_transport_security=not app.debug, # Only enable HSTS in production
|
||||
strict_transport_security_max_age=31536000 if not app.debug else 0,
|
||||
strict_transport_security_include_subdomains=not app.debug,
|
||||
frame_options='DENY',
|
||||
x_content_type_options=True,
|
||||
referrer_policy='strict-origin-when-cross-origin'
|
||||
)
|
||||
|
||||
# Manual headers for Flask-Talisman 1.0.0 compatibility
|
||||
@app.after_request
|
||||
def add_security_headers(response):
|
||||
# Disable deprecated X-XSS-Protection (version 1.0.0 compatibility)
|
||||
response.headers['X-XSS-Protection'] = '0'
|
||||
|
||||
# Add Permissions-Policy for privacy (version 1.0.0 compatibility)
|
||||
response.headers['Permissions-Policy'] = 'browsing-topics=()'
|
||||
|
||||
# Additional security headers
|
||||
response.headers['X-Download-Options'] = 'noopen'
|
||||
response.headers['X-Permitted-Cross-Domain-Policies'] = 'none'
|
||||
|
||||
return response
|
||||
|
||||
# Apply security configuration
|
||||
configure_security_headers(app)
|
||||
|
||||
# Enable CORS for all routes
|
||||
CORS(app)
|
||||
|
||||
# CSP — only self and cdn.jsdelivr.net for scripts/styles
|
||||
csp = {
|
||||
'default-src': ["'self'"],
|
||||
'script-src': ["'self'", 'https://cdn.jsdelivr.net', "'unsafe-inline'"],
|
||||
'style-src': ["'self'", 'https://cdn.jsdelivr.net', "'unsafe-inline'"],
|
||||
}
|
||||
Talisman(app, content_security_policy=csp)
|
||||
# Enhanced rate limiting configuration
|
||||
limiter = Limiter(
|
||||
key_func=get_remote_address,
|
||||
app=app,
|
||||
default_limits=["1000/day", "100/hour"],
|
||||
storage_uri="memory://" # Use Redis in production
|
||||
)
|
||||
|
||||
limiter = Limiter(key_func=get_remote_address, app=app, default_limits=["100/hour"])
|
||||
# --- INPUT VALIDATION ---
|
||||
def validate_input_text(text: str) -> bool:
|
||||
"""Validate input text for security"""
|
||||
# Size limit (1MB)
|
||||
if len(text.encode('utf-8')) > 1024 * 1024:
|
||||
return False
|
||||
|
||||
# Content validation - only allow printable characters and common whitespace
|
||||
if not re.match(r'^[\x20-\x7E\s]*$', text):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_formatter_version():
|
||||
def validate_api_input(data):
|
||||
"""Validate API input data"""
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Invalid data format")
|
||||
|
||||
input_text = data.get("input_text", "")
|
||||
if not isinstance(input_text, str):
|
||||
raise ValueError("input_text must be a string")
|
||||
|
||||
if len(input_text.strip()) == 0:
|
||||
raise ValueError("input_text cannot be empty")
|
||||
|
||||
if len(input_text.encode('utf-8')) > 1024 * 1024: # 1MB
|
||||
raise ValueError("input_text too large")
|
||||
|
||||
return True
|
||||
|
||||
# --- FILE UPLOAD VALIDATION ---
|
||||
ALLOWED_EXTENSIONS = {'.ep'}
|
||||
MAX_FILE_SIZE = 1024 * 1024 # 1MB
|
||||
|
||||
def validate_file_upload(file):
|
||||
"""Enhanced file validation"""
|
||||
if not file or not file.filename:
|
||||
return False, "No file provided"
|
||||
|
||||
# Check file extension
|
||||
filename = secure_filename(file.filename)
|
||||
if not any(filename.endswith(ext) for ext in ALLOWED_EXTENSIONS):
|
||||
return False, "Invalid file type"
|
||||
|
||||
# Check file size
|
||||
file.seek(0, 2) # Seek to end
|
||||
size = file.tell()
|
||||
file.seek(0) # Reset
|
||||
|
||||
if size > MAX_FILE_SIZE:
|
||||
return False, "File too large"
|
||||
|
||||
# Basic content validation
|
||||
try:
|
||||
content = file.read().decode('utf-8')
|
||||
file.seek(0) # Reset
|
||||
|
||||
# Validate content is text
|
||||
if not content.isprintable() and not all(c in '\n\r\t' for c in content if not c.isprintable()):
|
||||
return False, "Invalid file content"
|
||||
|
||||
except UnicodeDecodeError:
|
||||
return False, "File must be valid UTF-8 text"
|
||||
|
||||
return True, "Valid file"
|
||||
|
||||
# --- SECURE SUBPROCESS EXECUTION ---
|
||||
def get_formatter_version():
|
||||
"""Get formatter version safely"""
|
||||
try:
|
||||
if not os.path.exists(MOJO_FMT_PATH):
|
||||
app.logger.warning(f"Formatter script not found at {MOJO_FMT_PATH}")
|
||||
return "Unknown"
|
||||
|
||||
result = subprocess.run(
|
||||
["python3", MOJO_FMT_PATH, "--version"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
text=True,
|
||||
timeout=10 # Add timeout
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip()
|
||||
else:
|
||||
app.logger.warning(f"Could not get formatter version: {result.stderr}")
|
||||
return "Unknown"
|
||||
except subprocess.TimeoutExpired:
|
||||
app.logger.warning("Formatter version check timed out")
|
||||
return "Unknown"
|
||||
except Exception as e:
|
||||
app.logger.warning(f"Could not get formatter version: {e}")
|
||||
return "Unknown"
|
||||
|
||||
def run_mojofmt(input_text: str) -> str:
|
||||
"""Secure mojofmt execution with comprehensive validation"""
|
||||
# Validate input first
|
||||
if not validate_input_text(input_text):
|
||||
raise ValueError("Invalid input text")
|
||||
|
||||
app.logger.debug("Running mojofmt")
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
in_path = os.path.join(tmpdir, "input.txt")
|
||||
out_path = os.path.join(tmpdir, "output.txt")
|
||||
|
||||
# Secure file writing
|
||||
try:
|
||||
with open(in_path, 'w', encoding='utf-8') as f:
|
||||
f.write(input_text)
|
||||
except Exception as e:
|
||||
app.logger.error(f"Failed to write input file: {e}")
|
||||
raise RuntimeError("Failed to write input file")
|
||||
|
||||
# Validate formatter script exists
|
||||
if not os.path.exists(MOJO_FMT_PATH):
|
||||
app.logger.error(f"Formatter script not found at {MOJO_FMT_PATH}")
|
||||
raise RuntimeError("Formatter script not found")
|
||||
|
||||
# Secure subprocess execution with timeout
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['python3', MOJO_FMT_PATH, '-o', out_path, in_path],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
timeout=30, # Add timeout
|
||||
cwd=tmpdir # Set working directory
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
app.logger.error("Formatting operation timed out")
|
||||
raise RuntimeError("Formatting operation timed out")
|
||||
except Exception as e:
|
||||
app.logger.error(f"Subprocess execution failed: {e}")
|
||||
raise RuntimeError("Formatting failed")
|
||||
|
||||
if result.returncode != 0:
|
||||
# Don't expose internal error details
|
||||
app.logger.error(f"mojofmt failed with return code {result.returncode}: {result.stderr}")
|
||||
raise RuntimeError("Formatting failed")
|
||||
|
||||
try:
|
||||
with open(out_path, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
except Exception as e:
|
||||
app.logger.error(f"Failed to read output file: {e}")
|
||||
raise RuntimeError("Failed to read output file")
|
||||
|
||||
FORMATTER_VERSION = get_formatter_version()
|
||||
|
||||
# --- AUTHENTICATION ---
|
||||
def require_api_token(f):
|
||||
"""API token authentication decorator (unchanged)"""
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
auth = request.headers.get('Authorization', '')
|
||||
if not auth.startswith('Bearer ') or auth[len('Bearer '):] != API_TOKEN:
|
||||
app.logger.warning(f"Unauthorized API access attempt from {request.remote_addr}")
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
return f(*args, **kwargs)
|
||||
return decorated
|
||||
|
||||
# --- ERROR HANDLERS ---
|
||||
@app.errorhandler(RequestEntityTooLarge)
|
||||
def handle_file_too_large(e):
|
||||
app.logger.warning(f"File too large from {request.remote_addr}")
|
||||
return jsonify({"error": "File too large"}), 413
|
||||
|
||||
@app.errorhandler(400)
|
||||
def handle_bad_request(e):
|
||||
app.logger.warning(f"Bad request from {request.remote_addr}: {e}")
|
||||
return jsonify({"error": "Bad request"}), 400
|
||||
|
||||
@app.errorhandler(404)
|
||||
def handle_not_found(e):
|
||||
return jsonify({"error": "Not found"}), 404
|
||||
|
||||
@app.errorhandler(429)
|
||||
def handle_rate_limit(e):
|
||||
app.logger.warning(f"Rate limit exceeded from {request.remote_addr}")
|
||||
return jsonify({"error": "Rate limit exceeded"}), 429
|
||||
|
||||
@app.errorhandler(500)
|
||||
def handle_internal_error(e):
|
||||
app.logger.error(f"Internal server error: {e}")
|
||||
return jsonify({"error": "Internal server error"}), 500
|
||||
|
||||
@app.errorhandler(Exception)
|
||||
def handle_exception(e):
|
||||
"""Global exception handler"""
|
||||
app.logger.error(f"Unhandled exception: {e}", exc_info=True)
|
||||
return jsonify({"error": "Internal server error"}), 500
|
||||
|
||||
# --- HTML TEMPLATE (unchanged) ---
|
||||
HTML_TEMPLATE = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
@@ -459,29 +708,15 @@ HTML_TEMPLATE = """
|
||||
</html>
|
||||
"""
|
||||
|
||||
def run_mojofmt(input_text: str) -> str:
|
||||
app.logger.debug("Running mojofmt")
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
in_path = os.path.join(tmpdir, "input.txt")
|
||||
out_path = os.path.join(tmpdir, "output.txt")
|
||||
with open(in_path, 'w', encoding='utf-8') as f:
|
||||
f.write(input_text)
|
||||
result = subprocess.run(
|
||||
['python3', MOJO_FMT_PATH, '-o', out_path, in_path],
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"mojofmt failed:\n{result.stderr.strip()}")
|
||||
with open(out_path, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
# --- ROUTES ---
|
||||
@app.route("/", methods=["GET", "POST"])
|
||||
def index():
|
||||
"""Main page route with enhanced security"""
|
||||
# Handle both GET and POST requests
|
||||
# POST requests are redirected to use AJAX instead
|
||||
if request.method == "POST":
|
||||
# If someone tries to submit the form traditionally, redirect to GET
|
||||
app.logger.info(f"Traditional form submission redirected from {request.remote_addr}")
|
||||
return redirect(url_for('index'))
|
||||
|
||||
# Serve the HTML template
|
||||
@@ -490,52 +725,93 @@ def index():
|
||||
formatter_version=FORMATTER_VERSION
|
||||
)
|
||||
|
||||
|
||||
@app.route("/api/format_ajax", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit("5/minute") # Stricter rate limiting
|
||||
def api_format_ajax():
|
||||
"""AJAX endpoint for formatting text"""
|
||||
"""AJAX endpoint for formatting text with enhanced security"""
|
||||
if not request.is_json:
|
||||
app.logger.warning(f"Non-JSON request to format_ajax from {request.remote_addr}")
|
||||
return jsonify({"error": "JSON body required"}), 400
|
||||
|
||||
data = request.get_json()
|
||||
input_text = data.get("input_text", "")
|
||||
remove_empty = bool(data.get("remove_empty", False))
|
||||
|
||||
if not input_text.strip():
|
||||
return jsonify({"error": "No input data provided."}), 400
|
||||
|
||||
try:
|
||||
data = request.get_json()
|
||||
validate_api_input(data) # Enhanced input validation
|
||||
|
||||
input_text = data.get("input_text", "")
|
||||
remove_empty = bool(data.get("remove_empty", False))
|
||||
|
||||
app.logger.info(f"Processing format request from {request.remote_addr}, size: {len(input_text)} chars")
|
||||
|
||||
formatted_text = run_mojofmt(input_text)
|
||||
if remove_empty:
|
||||
formatted_text = "\n".join(
|
||||
line for line in formatted_text.splitlines() if line.strip()
|
||||
)
|
||||
|
||||
app.logger.info(f"Successfully formatted text for {request.remote_addr}")
|
||||
return jsonify({"formatted_text": formatted_text})
|
||||
|
||||
except ValueError as e:
|
||||
app.logger.warning(f"Validation error from {request.remote_addr}: {e}")
|
||||
return jsonify({"error": str(e)}), 400
|
||||
except RuntimeError as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
app.logger.error(f"Runtime error from {request.remote_addr}: {e}")
|
||||
return jsonify({"error": "Processing failed"}), 500
|
||||
except Exception as e:
|
||||
app.logger.error(f"Unexpected error from {request.remote_addr}: {e}")
|
||||
return jsonify({"error": "Internal server error"}), 500
|
||||
|
||||
@app.route("/api/format", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
@limiter.limit("5/minute") # Stricter rate limiting
|
||||
@require_api_token
|
||||
def api_format():
|
||||
"""Original API endpoint with token authentication"""
|
||||
"""Original API endpoint with token authentication and enhanced security"""
|
||||
if not request.is_json:
|
||||
app.logger.warning(f"Non-JSON request to format API from {request.remote_addr}")
|
||||
return jsonify({"error": "JSON body required"}), 400
|
||||
data = request.get_json()
|
||||
text = data.get("text", "")
|
||||
remove_empty = bool(data.get("remove_empty"))
|
||||
if not text:
|
||||
return jsonify({"error": "Missing 'text'"}), 400
|
||||
|
||||
try:
|
||||
data = request.get_json()
|
||||
|
||||
# Validate input using the same validation as AJAX endpoint
|
||||
input_data = {
|
||||
"input_text": data.get("text", ""),
|
||||
"remove_empty": data.get("remove_empty", False)
|
||||
}
|
||||
validate_api_input(input_data)
|
||||
|
||||
text = input_data["input_text"]
|
||||
remove_empty = bool(input_data["remove_empty"])
|
||||
|
||||
app.logger.info(f"Processing authenticated API request from {request.remote_addr}, size: {len(text)} chars")
|
||||
|
||||
formatted = run_mojofmt(text)
|
||||
if remove_empty:
|
||||
formatted = "\n".join([line for line in formatted.splitlines() if line.strip()])
|
||||
|
||||
app.logger.info(f"Successfully processed authenticated API request from {request.remote_addr}")
|
||||
return jsonify({"formatted_text": formatted})
|
||||
|
||||
except ValueError as e:
|
||||
app.logger.warning(f"API validation error from {request.remote_addr}: {e}")
|
||||
return jsonify({"error": str(e)}), 400
|
||||
except RuntimeError as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
app.logger.error(f"API runtime error from {request.remote_addr}: {e}")
|
||||
return jsonify({"error": "Processing failed"}), 500
|
||||
except Exception as e:
|
||||
app.logger.error(f"API unexpected error from {request.remote_addr}: {e}")
|
||||
return jsonify({"error": "Internal server error"}), 500
|
||||
|
||||
# --- HEALTH CHECK ENDPOINT ---
|
||||
@app.route("/health", methods=["GET"])
|
||||
def health_check():
|
||||
"""Health check endpoint"""
|
||||
return jsonify({
|
||||
"status": "healthy",
|
||||
"version": FORMATTER_VERSION,
|
||||
"debug": app.debug
|
||||
})
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=8000, debug=True)
|
||||
app.logger.info(f"Starting Flask application in {'debug' if app.debug else 'production'} mode")
|
||||
app.run(host="0.0.0.0", port=8000, debug=app.debug)
|
Reference in New Issue
Block a user