import os
import tempfile
import subprocess
from flask import (
Flask, request, render_template_string,
jsonify, send_file, redirect, url_for, flash
)
from werkzeug.utils import secure_filename
from flask_talisman import Talisman
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from functools import wraps
from dotenv import load_dotenv
# Load environment variables from .env (for local dev only)
load_dotenv()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MOJO_FMT_PATH = os.path.join(BASE_DIR, "mojofmt.py")
# Get secrets from environment variables
SECRET_KEY = os.environ.get('FLASK_SECRET_KEY')
API_TOKEN = os.environ.get('FLASK_API_TOKEN')
if not SECRET_KEY or not API_TOKEN:
raise RuntimeError("FLASK_SECRET_KEY and FLASK_API_TOKEN must be set")
app = Flask(__name__)
app.secret_key = SECRET_KEY
# Secure cookies
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SECURE=True, # requires HTTPS in production
SESSION_COOKIE_SAMESITE='Lax'
)
# Security headers with Flask‑Talisman (CSP allowing only self + cdn.jsdelivr.net)
csp = {
'default-src': ["'self'"],
'script-src': ["'self'", 'https://cdn.jsdelivr.net'],
'style-src': ["'self'", 'https://cdn.jsdelivr.net', "'unsafe-inline'"],
}
Talisman(app, content_security_policy=csp)
# Rate limiting
limiter = Limiter(key_func=get_remote_address, app=app, default_limits=["100/hour"])
# Token authentication decorator
def require_api_token(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.headers.get('Authorization', '')
if not auth.startswith('Bearer ') or auth[len('Bearer '):] != API_TOKEN:
return jsonify({"error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated
# ----------------------------- HTML TEMPLATE -----------------------------
HTML_TEMPLATE = """
Mojolicious Template Code Formatter
Mojolicious Template Code Formatter
{% with messages = get_flashed_messages() %}
{% if messages %}
{% for m in messages %}- {{ m }}
{% endfor %}
{% endif %}
{% endwith %}
"""
# ----------------------------- Core logic -----------------------------
def run_mojofmt(input_text: str) -> str:
with tempfile.TemporaryDirectory() as tmpdir:
in_path = os.path.join(tmpdir, "input.txt")
out_path = os.path.join(tmpdir, "output.txt")
with open(in_path, 'w', encoding='utf-8') as f:
f.write(input_text)
result = subprocess.run(
['python3', MOJO_FMT_PATH, '-o', out_path, in_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode != 0:
raise RuntimeError(f"mojofmt failed:\n{result.stderr.strip()}")
with open(out_path, 'r', encoding='utf-8') as f:
return f.read()
@app.route("/", methods=["GET", "POST"])
def index():
input_text, formatted_text = "", None
if request.method == "POST":
action = request.form.get("action")
f_obj = request.files.get("input_file")
input_text = request.form.get("input_text", "")
if f_obj and f_obj.filename:
try:
input_text = f_obj.read().decode('utf-8')
except Exception as e:
flash(f"Error reading uploaded file: {e}")
return render_template_string(HTML_TEMPLATE, input_text=input_text)
if action in ("format", "download"):
if not input_text.strip():
flash("No input data provided.")
return render_template_string(HTML_TEMPLATE, input_text=input_text)
try:
formatted_text = run_mojofmt(input_text)
except RuntimeError as e:
flash(str(e))
return render_template_string(HTML_TEMPLATE, input_text=input_text)
if action == "download":
tmpfile = tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8', suffix='.txt')
tmpfile.write(formatted_text)
tmpfile.close()
return redirect(url_for('download_file', filename=os.path.basename(tmpfile.name)))
return render_template_string(HTML_TEMPLATE, input_text=input_text, formatted_text=formatted_text)
@app.route("/download/")
def download_file(filename):
safe_name = secure_filename(filename)
path = os.path.join(tempfile.gettempdir(), safe_name)
if not os.path.exists(path):
return "File not found", 404
resp = send_file(path, as_attachment=True, download_name="formatted_output.txt")
try:
os.unlink(path)
except Exception:
pass
return resp
@app.route("/api/format", methods=["POST"])
@limiter.limit("10/minute")
@require_api_token
def api_format():
if not request.is_json:
return jsonify({"error": "JSON body required"}), 400
data = request.get_json()
text = data.get("text", "")
if not text:
return jsonify({"error": "Missing 'text'"}), 400
try:
formatted = run_mojofmt(text)
return jsonify({"formatted_text": formatted})
except RuntimeError as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True) # debug=False in prod