Files
la-voix-du-peuple/artifacts/flask-api/app.py
T
billisdead 45edc1fa77 Licence EUPL-1.2 + hardening anti-abus
P1 — Licence :
- Ajout du fichier LICENSE (EUPL-1.2 complet)
- README mis à jour : section licence, table docs, vars d'environnement
- En-têtes EUPL ajoutés dans les fichiers sources principaux (Flask, React)

P2 — Hardening anti-abus :
- Rate limiting Redis-ready (REDIS_URL) avec clé fingerprint + IP
- Honeypot anti-bot : champ caché côté client + vérification serveur
- Fingerprinting non-PII via FingerprintJS (hash SHA-256, colonne ideas.fingerprint_hash)
- Cooldown session : cookie httpOnly signé HMAC-SHA256 (SECRET_KEY requis)
- Détection de flood : alerte WARNING si > FLOOD_THRESHOLD soumissions / 5 min
- hCaptcha stub : intégré, activable via HCAPTCHA_SECRET_KEY + VITE_HCAPTCHA_SITE_KEY
- Nouvelles dépendances : redis (backend), @fingerprintjs/fingerprintjs + @hcaptcha/react-hcaptcha (frontend)
- docs/SECURITE_ANTI_ABUS.md : documentation complète des seuils et de la configuration

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 18:05:46 +02:00

624 lines
24 KiB
Python

"""
La Voix du Peuple — Backend Flask
==================================
Copyright (C) 2026 billisdead
Licence : European Union Public Licence v. 1.2 (EUPL-1.2)
Plateforme démocratique citoyenne.
Base légale : DUDH (ONU 1948), PIDCP (ONU 1966), CEDH (1950), Charte UE (2000),
Code pénal français, Loi du 29 juillet 1881, LCEN, SREN 2024.
Sécurité :
- Rate limiting IP + fingerprint (flask-limiter, Redis si REDIS_URL défini)
- Honeypot anti-bot (champ caché + vérification serveur)
- Fingerprinting non-PII (FingerprintJS hash SHA-256, sans cookie tiers)
- Détection de flood (> FLOOD_THRESHOLD soumissions / 5 min / même IP)
- Cooldown session (cookie httpOnly signé HMAC-SHA256, si SECRET_KEY défini)
- hCaptcha stub (activer via HCAPTCHA_SECRET_KEY + VITE_HCAPTCHA_SITE_KEY)
- Validation et assainissement des entrées (bleach)
- CORS restreint
- En-têtes de sécurité HTTP
- Panel admin protégé par ADMIN_SECRET (Bearer token)
"""
import csv
import hashlib
import hmac
import io
import json
import logging
import os
import threading
import time
import urllib.parse
import urllib.request
from functools import wraps
import bleach
from flask import Flask, jsonify, make_response, request, Response
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from database import (
init_db, insert_idea, get_accepted_ideas, get_stats, upsert_synthesis,
get_synthesis, get_all_ideas, get_ideas_admin, delete_idea, bulk_delete_ideas,
override_idea, flag_idea, unflag_idea,
)
from ai_agent import filter_idea, synthesize_ideas
# ─── Logging ────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s",
)
logger = logging.getLogger(__name__)
# ─── Constantes anti-abus ────────────────────────────────────────────────────
# Rate limit pour POST /api/ideas (format flask-limiter)
RATE_LIMIT_CONTRIBUTIONS = os.environ.get(
"RATE_LIMIT_CONTRIBUTIONS", "5 per minute;3 per hour"
)
# Cooldown entre deux soumissions d'une même session (secondes)
CONTRIBUTION_COOLDOWN_SECONDS = int(
os.environ.get("CONTRIBUTION_COOLDOWN_SECONDS", "3600")
)
# Seuil de flood : nombre de soumissions / 5 min / IP avant alerte
FLOOD_THRESHOLD = int(os.environ.get("FLOOD_THRESHOLD", "10"))
FLOOD_WINDOW_SECONDS = 300 # 5 minutes
# État interne flood detection (en mémoire, reset au redémarrage)
_flood_tracker: dict[str, list[float]] = {}
_flood_lock = threading.Lock()
# ─── Application ────────────────────────────────────────────────────────────
app = Flask(__name__)
app.config["JSON_SORT_KEYS"] = False
# CORS : autorise uniquement les origines du même domaine en production
CORS(app, resources={r"/api/*": {"origins": "*"}}, supports_credentials=False)
# Storage Redis si disponible, sinon mémoire (dev / instance unique)
_redis_url = os.environ.get("REDIS_URL", "")
_storage_uri = _redis_url if _redis_url else "memory://"
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["200 per day", "60 per hour"],
storage_uri=_storage_uri,
strategy="fixed-window",
)
# ─── En-têtes de sécurité HTTP ───────────────────────────────────────────────
@app.after_request
def set_security_headers(response: Response) -> Response:
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'none'; "
"object-src 'none';"
)
response.headers["Cache-Control"] = "no-store"
response.headers["Pragma"] = "no-cache"
return response
# ─── Validation des entrées ──────────────────────────────────────────────────
CONTENT_MIN = 10
CONTENT_MAX = 1000
AUTHOR_MAX = 100
def sanitize_text(text: str) -> str:
"""Supprime tout HTML/JavaScript — protection XSS."""
return bleach.clean(text, tags=[], strip=True).strip()
def validate_idea_input(data: dict) -> tuple[dict | None, str | None]:
"""Valide et assainit les données de soumission d'une idée."""
content = data.get("content")
author = data.get("author")
if not content or not isinstance(content, str):
return None, "Le champ 'content' est requis et doit être une chaîne de caractères."
content = sanitize_text(content)
if len(content) < CONTENT_MIN:
return None, f"L'idée doit contenir au moins {CONTENT_MIN} caractères."
if len(content) > CONTENT_MAX:
return None, f"L'idée ne peut pas dépasser {CONTENT_MAX} caractères."
if author is not None:
if not isinstance(author, str):
return None, "Le champ 'author' doit être une chaîne de caractères."
author = sanitize_text(author)
if len(author) > AUTHOR_MAX:
return None, f"Le pseudonyme ne peut pas dépasser {AUTHOR_MAX} caractères."
if not author:
author = None
return {"content": content, "author": author}, None
# ─── Authentification Admin ──────────────────────────────────────────────────
def _get_admin_secret() -> str | None:
return os.environ.get("ADMIN_SECRET")
def require_admin(f):
@wraps(f)
def decorated(*args, **kwargs):
secret = _get_admin_secret()
if not secret:
return jsonify({"error": "admin_not_configured", "message": "ADMIN_SECRET non configuré."}), 503
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer ") or auth[7:] != secret:
return jsonify({"error": "unauthorized", "message": "Accès non autorisé."}), 401
return f(*args, **kwargs)
return decorated
# ─── Helpers anti-abus ───────────────────────────────────────────────────────
def get_fingerprint_key() -> str:
"""Clé de rate limiting : fingerprint hashé si présent, sinon IP."""
visitor_id = request.headers.get("X-Visitor-Id", "").strip()
if visitor_id:
return "fp:" + hashlib.sha256(visitor_id.encode()).hexdigest()[:32]
return "ip:" + get_remote_address()
def _sign_cooldown(secret: str) -> str:
"""Génère un token de cooldown signé HMAC-SHA256."""
ts = int(time.time())
msg = ts.to_bytes(8, "big")
sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()[:16]
return f"{ts}.{sig}"
def _verify_cooldown(cookie: str, secret: str, cooldown_seconds: int) -> bool:
"""Retourne True si le cooldown est encore actif pour ce cookie."""
try:
ts_str, sig = cookie.rsplit(".", 1)
ts = int(ts_str)
msg = ts.to_bytes(8, "big")
expected = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()[:16]
if not hmac.compare_digest(sig, expected):
return False
return (time.time() - ts) < cooldown_seconds
except Exception:
return False
def _verify_hcaptcha(token: str) -> bool:
"""
Vérifie un token hCaptcha.
Renvoie True si HCAPTCHA_SECRET_KEY n'est pas configuré (stub désactivé).
"""
secret = os.environ.get("HCAPTCHA_SECRET_KEY", "").strip()
if not secret:
return True # Stub désactivé — pas de clé configurée
if not token:
return False
try:
payload = urllib.parse.urlencode({"secret": secret, "response": token}).encode()
req = urllib.request.Request(
"https://hcaptcha.com/siteverify",
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
with urllib.request.urlopen(req, timeout=5) as resp:
result = json.loads(resp.read())
return bool(result.get("success", False))
except Exception:
logger.warning("Vérification hCaptcha impossible (erreur réseau) — skip")
return True # En cas d'erreur réseau, on ne bloque pas
def _check_flood(ip: str, fingerprint_hash: str | None) -> bool:
"""
Enregistre la soumission et retourne True si le seuil de flood est dépassé.
Utilise l'empreinte si disponible, sinon l'IP seule.
"""
key = fingerprint_hash if fingerprint_hash else ip
now = time.time()
with _flood_lock:
times = _flood_tracker.get(key, [])
# Nettoyage des timestamps expirés
times = [t for t in times if now - t < FLOOD_WINDOW_SECONDS]
times.append(now)
_flood_tracker[key] = times
return len(times) > FLOOD_THRESHOLD
# ─── Gestion des erreurs ─────────────────────────────────────────────────────
@app.errorhandler(400)
def bad_request(e):
return jsonify({"error": "bad_request", "message": "Requête invalide."}), 400
@app.errorhandler(404)
def not_found(e):
return jsonify({"error": "not_found", "message": "Ressource introuvable."}), 404
@app.errorhandler(405)
def method_not_allowed(e):
return jsonify({"error": "method_not_allowed", "message": "Méthode non autorisée."}), 405
@app.errorhandler(429)
def rate_limit_exceeded(e):
return jsonify({
"error": "rate_limit_exceeded",
"message": "Trop de requêtes. Veuillez patienter avant de soumettre une nouvelle idée.",
}), 429
@app.errorhandler(500)
def internal_error(e):
logger.exception("Erreur interne non gérée")
return jsonify({"error": "internal_error", "message": "Erreur interne du serveur."}), 500
# ─── Routes publiques ─────────────────────────────────────────────────────────
@app.get("/api/healthz")
def health():
return jsonify({"status": "ok"})
@app.get("/api/ideas")
@limiter.limit("120 per minute")
def list_ideas():
"""Retourne toutes les idées acceptées (conformes au droit international)."""
ideas = get_accepted_ideas()
return jsonify([serialize_idea(i) for i in ideas])
@app.get("/api/ideas/stats")
@limiter.limit("120 per minute")
def idea_stats():
"""Statistiques publiques : total, acceptées, rejetées."""
stats = get_stats()
return jsonify(stats)
@app.post("/api/ideas")
@limiter.limit(RATE_LIMIT_CONTRIBUTIONS, key_func=get_fingerprint_key)
def submit_idea():
"""
Soumet une idée citoyenne.
Protections anti-abus (dans l'ordre) :
1. Honeypot — rejet silencieux si champ leurre rempli
2. hCaptcha — vérification si HCAPTCHA_SECRET_KEY configuré
3. Cooldown cookie — rejet si soumission trop récente
4. Rate limiting — 3/heure par IP ou fingerprint (configurable)
5. Flood detection — alerte si > 10 soumissions / 5 min
6. Fingerprinting non-PII — hash de l'identifiant FingerprintJS
7. Filtrage IA — cadre légal international
"""
if not request.is_json:
return jsonify({"error": "bad_request", "message": "Content-Type doit être application/json."}), 400
data = request.get_json(silent=True)
if data is None:
return jsonify({"error": "bad_request", "message": "Corps JSON invalide."}), 400
# 1. Honeypot — si le champ leurre est rempli, c'est un bot
if data.get("_hp"):
logger.info("Honeypot déclenché — soumission ignorée silencieusement")
# Réponse 201 factice pour ne pas informer le bot
return jsonify({"id": 0, "accepted": True, "reason": None, "legalBasis": None}), 201
# 2. hCaptcha (stub — skip si HCAPTCHA_SECRET_KEY non configuré)
hcaptcha_token = (
(data.get("_h") or "")
or request.headers.get("X-HCaptcha-Token", "")
)
if not _verify_hcaptcha(hcaptcha_token):
logger.warning("hCaptcha échoué — IP: %s", get_remote_address())
return jsonify({"error": "captcha_failed", "message": "Vérification CAPTCHA échouée. Veuillez réessayer."}), 400
# 3. Cooldown cookie — protection contre les soumissions en rafale d'une même session
secret_key = os.environ.get("SECRET_KEY", "").strip()
if CONTRIBUTION_COOLDOWN_SECONDS > 0 and secret_key:
cv = request.cookies.get("_cv", "")
if cv and _verify_cooldown(cv, secret_key, CONTRIBUTION_COOLDOWN_SECONDS):
logger.info("Cooldown actif — soumission rejetée (même session)")
return jsonify({
"error": "cooldown",
"message": "Vous avez déjà contribué récemment. Veuillez patienter avant de soumettre une nouvelle idée.",
}), 429
# 4. Validation et assainissement
validated, error = validate_idea_input(data)
if error:
return jsonify({"error": "validation_error", "message": error}), 400
content = validated["content"]
author = validated["author"]
# 5. Extraction et hashage du fingerprint (non-PII)
raw_fp = request.headers.get("X-Visitor-Id", "").strip()
fingerprint_hash = (
hashlib.sha256(raw_fp.encode()).hexdigest()[:32] if raw_fp else None
)
# 6. Détection de flood
client_ip = get_remote_address()
if _check_flood(client_ip, fingerprint_hash):
logger.warning(
"ALERTE FLOOD — IP: %s | fingerprint: %s | seuil: %d/5min",
client_ip, fingerprint_hash, FLOOD_THRESHOLD,
)
# 7. Filtrage IA selon le cadre légal international
logger.info("Filtrage d'une nouvelle idée (longueur: %d)", len(content))
filter_result = filter_idea(content)
accepted = bool(filter_result.get("accepted", False))
rejection_reason = filter_result.get("reason") if not accepted else None
legal_basis = filter_result.get("legal_basis") if not accepted else None
idea = insert_idea(content, author, accepted, rejection_reason, legal_basis, fingerprint_hash)
if accepted:
threading.Thread(target=_update_synthesis_background, daemon=True).start()
# Construction de la réponse
resp_data = {
"id": idea["id"],
"accepted": accepted,
"reason": rejection_reason,
"legalBasis": legal_basis if not accepted else None,
"idea": serialize_idea(idea),
}
response = make_response(jsonify(resp_data), 201)
# Cookie cooldown httpOnly — marque la session comme ayant contribué
if accepted and CONTRIBUTION_COOLDOWN_SECONDS > 0 and secret_key:
response.set_cookie(
"_cv",
_sign_cooldown(secret_key),
max_age=CONTRIBUTION_COOLDOWN_SECONDS,
httponly=True,
samesite="Lax",
secure=request.is_secure,
)
return response
@app.get("/api/synthesis")
@limiter.limit("120 per minute")
def get_synthesis_route():
"""Retourne la synthèse actuelle de la Voix du Peuple."""
synthesis = get_synthesis()
if not synthesis:
return jsonify({
"text": (
"Aucune idée n'a encore été soumise. "
"Soyez le premier à partager votre vision pour une société meilleure, "
"fondée sur la Déclaration universelle des droits de l'homme."
),
"ideaCount": 0,
"updatedAt": None,
})
return jsonify({
"text": synthesis["text"],
"ideaCount": synthesis["idea_count"],
"updatedAt": synthesis["updated_at"].isoformat() if synthesis["updated_at"] else None,
})
# ─── Route publique : signalement ────────────────────────────────────────────
@app.post("/api/ideas/<int:idea_id>/flag")
@limiter.limit("3 per minute; 10 per hour")
def flag_idea_route(idea_id: int):
"""Signale une contribution pour examen par l'administrateur."""
result = flag_idea(idea_id)
if not result:
return jsonify({"error": "not_found", "message": "Contribution introuvable ou non publiée."}), 404
return jsonify({"ok": True, "flagCount": result.get("flag_count", 1)})
# ─── Routes Admin ─────────────────────────────────────────────────────────────
@app.post("/api/admin/login")
@limiter.limit("10 per minute")
def admin_login():
"""Vérifie le mot de passe admin. Retourne ok si correct."""
secret = _get_admin_secret()
if not secret:
return jsonify({"error": "admin_not_configured", "message": "ADMIN_SECRET non configuré."}), 503
data = request.get_json(silent=True) or {}
password = data.get("password", "")
if password != secret:
logger.warning("Tentative de connexion admin échouée")
return jsonify({"error": "unauthorized", "message": "Mot de passe incorrect."}), 401
logger.info("Connexion admin réussie")
return jsonify({"ok": True, "token": secret})
@app.get("/api/admin/stats")
@require_admin
def admin_stats():
"""Statistiques détaillées pour l'administrateur."""
stats = get_stats()
return jsonify(stats)
@app.get("/api/admin/ideas")
@require_admin
@limiter.limit("120 per minute")
def admin_list_ideas():
"""Liste toutes les contributions avec filtres et pagination."""
status = request.args.get("status", "all")
page = max(1, int(request.args.get("page", 1)))
per_page = min(100, max(10, int(request.args.get("per_page", 50))))
search = request.args.get("q", "").strip()
if status not in ("all", "accepted", "rejected", "flagged"):
status = "all"
ideas, total = get_ideas_admin(status=status, page=page, per_page=per_page, search=search)
return jsonify({
"ideas": [serialize_idea_admin(i) for i in ideas],
"total": total,
"page": page,
"perPage": per_page,
"pages": max(1, -(-total // per_page)),
})
@app.delete("/api/admin/ideas/<int:idea_id>")
@require_admin
def admin_delete_idea(idea_id: int):
"""Supprime une contribution et régénère la synthèse."""
deleted = delete_idea(idea_id)
if not deleted:
return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — contribution #%d supprimée", idea_id)
return jsonify({"ok": True, "synthesisUpdating": True})
@app.post("/api/admin/ideas/bulk-delete")
@require_admin
def admin_bulk_delete():
"""Suppression en masse de contributions."""
data = request.get_json(silent=True) or {}
ids = data.get("ids", [])
if not isinstance(ids, list) or not ids:
return jsonify({"error": "validation_error", "message": "ids doit être une liste non vide."}), 400
ids = [int(i) for i in ids if isinstance(i, (int, str)) and str(i).isdigit()]
count = bulk_delete_ideas(ids)
if count > 0:
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — %d contribution(s) supprimée(s) en masse", count)
return jsonify({"ok": True, "deleted": count, "synthesisUpdating": count > 0})
@app.post("/api/admin/ideas/<int:idea_id>/override")
@require_admin
def admin_override_idea(idea_id: int):
"""Modifie manuellement le statut d'une contribution (accepter/rejeter)."""
data = request.get_json(silent=True) or {}
accepted = bool(data.get("accepted", False))
reason = sanitize_text(data.get("reason", "") or "")
note = sanitize_text(data.get("note", "") or "")
result = override_idea(idea_id, accepted, reason or None, note or None)
if not result:
return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — contribution #%d override → accepted=%s", idea_id, accepted)
return jsonify({"ok": True, "idea": serialize_idea_admin(result), "synthesisUpdating": True})
@app.post("/api/admin/ideas/<int:idea_id>/unflag")
@require_admin
def admin_unflag_idea(idea_id: int):
"""Retire le signalement d'une contribution."""
result = unflag_idea(idea_id)
if not result:
return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404
return jsonify({"ok": True, "idea": serialize_idea_admin(result)})
@app.post("/api/admin/synthesis/regenerate")
@require_admin
@limiter.limit("5 per minute")
def admin_regenerate_synthesis():
"""Force la régénération complète de la synthèse."""
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — régénération manuelle de la synthèse déclenchée")
return jsonify({"ok": True, "message": "Régénération lancée en arrière-plan."})
@app.get("/api/admin/export/csv")
@require_admin
def admin_export_csv():
"""Exporte toutes les contributions en CSV."""
ideas, _ = get_ideas_admin(status="all", page=1, per_page=10000)
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_ALL)
writer.writerow(["id", "content", "author", "accepted", "flagged",
"flag_count", "rejection_reason", "legal_basis",
"admin_note", "fingerprint_hash", "created_at"])
for idea in ideas:
writer.writerow([
idea.get("id"),
idea.get("content", ""),
idea.get("author", ""),
"oui" if idea.get("accepted") else "non",
"oui" if idea.get("flagged") else "non",
idea.get("flag_count", 0),
idea.get("rejection_reason", ""),
idea.get("legal_basis", ""),
idea.get("admin_note", ""),
idea.get("fingerprint_hash", ""),
idea.get("created_at").isoformat() if idea.get("created_at") else "",
])
csv_bytes = output.getvalue().encode("utf-8-sig")
return Response(
csv_bytes,
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=contributions.csv"},
)
# ─── Helpers ─────────────────────────────────────────────────────────────────
def serialize_idea(idea: dict) -> dict:
return {
"id": idea["id"],
"content": idea["content"],
"author": idea.get("author"),
"accepted": idea["accepted"],
"flagged": idea.get("flagged", False),
"flagCount": idea.get("flag_count", 0),
"rejectionReason": idea.get("rejection_reason"),
"legalBasis": idea.get("legal_basis"),
"createdAt": idea["created_at"].isoformat() if idea.get("created_at") else None,
}
def serialize_idea_admin(idea: dict) -> dict:
base = serialize_idea(idea)
base["adminNote"] = idea.get("admin_note")
base["fingerprintHash"] = idea.get("fingerprint_hash")
return base
def _update_synthesis_background() -> None:
try:
ideas = get_accepted_ideas()
texts = [i["content"] for i in ideas]
synthesized = synthesize_ideas(texts)
upsert_synthesis(synthesized, len(texts))
logger.info("Synthèse mise à jour — %d idée(s) intégrée(s).", len(texts))
except Exception:
logger.exception("Erreur lors de la mise à jour de la synthèse en arrière-plan")
# ─── Démarrage ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8080))
logger.info("Initialisation de la base de données...")
init_db()
logger.info(
"La Voix du Peuple — Flask démarre sur le port %d (storage: %s)",
port, "Redis" if _redis_url else "mémoire",
)
app.run(host="0.0.0.0", port=port, debug=False)