""" La Voix du Peuple — Backend Flask ================================== Copyright (C) 2026 billisdead Licence : European Union Public Licence v. 1.2 (EUPL-1.2) Plateforme démocratique citoyenne. Base légale : DUDH (ONU 1948), PIDCP (ONU 1966), CEDH (1950), Charte UE (2000), Code pénal français, Loi du 29 juillet 1881, LCEN, SREN 2024. Sécurité : - Rate limiting IP + fingerprint (flask-limiter, Redis si REDIS_URL défini) - Honeypot anti-bot (champ caché + vérification serveur) - Fingerprinting non-PII (FingerprintJS hash SHA-256, sans cookie tiers) - Détection de flood (> FLOOD_THRESHOLD soumissions / 5 min / même IP) - Cooldown session (cookie httpOnly signé HMAC-SHA256, si SECRET_KEY défini) - hCaptcha stub (activer via HCAPTCHA_SECRET_KEY + VITE_HCAPTCHA_SITE_KEY) - Validation et assainissement des entrées (bleach) - CORS restreint - En-têtes de sécurité HTTP - Panel admin protégé par ADMIN_SECRET (Bearer token) """ import csv import datetime as dt import hashlib import hmac import html as html_module import io import json import logging import os import threading import time import urllib.parse import urllib.request from functools import wraps import bleach from flask import Flask, jsonify, make_response, request, Response from flask_cors import CORS from flask_limiter import Limiter from flask_limiter.util import get_remote_address from database import ( init_db, insert_idea, get_accepted_ideas, get_stats, upsert_synthesis, get_synthesis, get_all_ideas, get_ideas_admin, delete_idea, bulk_delete_ideas, override_idea, flag_idea, unflag_idea, create_consent, get_public_contributions, get_public_stats, create_consultation, get_consultation_by_slug, list_consultations, close_consultation, get_consultations_to_autoclose, get_consultation_stats, get_consultation_contributions, delete_consultation, ) from ai_agent import filter_idea, synthesize_ideas # ─── Logging ──────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s — %(message)s", ) logger = logging.getLogger(__name__) # ─── Constantes anti-abus ──────────────────────────────────────────────────── RATE_LIMIT_CONTRIBUTIONS = os.environ.get( "RATE_LIMIT_CONTRIBUTIONS", "5 per minute;3 per hour" ) CONTRIBUTION_COOLDOWN_SECONDS = int( os.environ.get("CONTRIBUTION_COOLDOWN_SECONDS", "3600") ) FLOOD_THRESHOLD = int(os.environ.get("FLOOD_THRESHOLD", "10")) FLOOD_WINDOW_SECONDS = 300 _flood_tracker: dict[str, list[float]] = {} _flood_lock = threading.Lock() # ─── Application ──────────────────────────────────────────────────────────── app = Flask(__name__) app.config["JSON_SORT_KEYS"] = False CORS(app, resources={r"/api/*": {"origins": "*"}}, supports_credentials=False) _redis_url = os.environ.get("REDIS_URL", "") _storage_uri = _redis_url if _redis_url else "memory://" limiter = Limiter( get_remote_address, app=app, default_limits=["200 per day", "60 per hour"], storage_uri=_storage_uri, strategy="fixed-window", ) # ─── Boucle de fermeture automatique des consultations ─────────────────────── def _autoclose_consultations_loop() -> None: """Vérifie toutes les 60 s les consultations expirées et les ferme automatiquement.""" while True: time.sleep(60) try: expired = get_consultations_to_autoclose() for c in expired: closed = close_consultation(c["id"]) if closed: logger.info( "Consultation '%s' fermée automatiquement (échéance dépassée)", c["slug"] ) synthesis = get_synthesis(consultation_id=c["id"]) threading.Thread( target=_trigger_webhook, args=(closed, synthesis), daemon=True, ).start() except Exception: logger.exception("Erreur dans la boucle de fermeture automatique des consultations") threading.Thread(target=_autoclose_consultations_loop, daemon=True).start() # ─── En-têtes de sécurité HTTP ─────────────────────────────────────────────── @app.after_request def set_security_headers(response: Response) -> Response: response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" # Les routes /export/print contiennent un légitime if request.path.endswith("/export/print"): csp = "default-src 'self'; script-src 'unsafe-inline'; object-src 'none';" else: csp = "default-src 'self'; script-src 'none'; object-src 'none';" response.headers["Content-Security-Policy"] = csp response.headers["Cache-Control"] = "no-store" response.headers["Pragma"] = "no-cache" return response # ─── Validation des entrées ────────────────────────────────────────────────── CONTENT_MIN = 10 CONTENT_MAX = 1000 AUTHOR_MAX = 100 def sanitize_text(text: str) -> str: """Supprime tout HTML/JavaScript — protection XSS.""" return bleach.clean(text, tags=[], strip=True).strip() def validate_idea_input(data: dict) -> tuple[dict | None, str | None]: """Valide et assainit les données de soumission d'une idée.""" content = data.get("content") author = data.get("author") if not content or not isinstance(content, str): return None, "Le champ 'content' est requis et doit être une chaîne de caractères." content = sanitize_text(content) if len(content) < CONTENT_MIN: return None, f"L'idée doit contenir au moins {CONTENT_MIN} caractères." if len(content) > CONTENT_MAX: return None, f"L'idée ne peut pas dépasser {CONTENT_MAX} caractères." if author is not None: if not isinstance(author, str): return None, "Le champ 'author' doit être une chaîne de caractères." author = sanitize_text(author) if len(author) > AUTHOR_MAX: return None, f"Le pseudonyme ne peut pas dépasser {AUTHOR_MAX} caractères." if not author: author = None return {"content": content, "author": author}, None # ─── Authentification Admin ────────────────────────────────────────────────── def _get_admin_secret() -> str | None: return os.environ.get("ADMIN_SECRET") def require_admin(f): @wraps(f) def decorated(*args, **kwargs): secret = _get_admin_secret() if not secret: return jsonify({"error": "admin_not_configured", "message": "ADMIN_SECRET non configuré."}), 503 auth = request.headers.get("Authorization", "") if not auth.startswith("Bearer ") or auth[7:] != secret: return jsonify({"error": "unauthorized", "message": "Accès non autorisé."}), 401 return f(*args, **kwargs) return decorated # ─── Helpers anti-abus ─────────────────────────────────────────────────────── def get_fingerprint_key() -> str: """Clé de rate limiting : fingerprint hashé si présent, sinon IP.""" visitor_id = request.headers.get("X-Visitor-Id", "").strip() if visitor_id: return "fp:" + hashlib.sha256(visitor_id.encode()).hexdigest()[:32] return "ip:" + get_remote_address() def _sign_cooldown(secret: str) -> str: ts = int(time.time()) msg = ts.to_bytes(8, "big") sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()[:16] return f"{ts}.{sig}" def _verify_cooldown(cookie: str, secret: str, cooldown_seconds: int) -> bool: try: ts_str, sig = cookie.rsplit(".", 1) ts = int(ts_str) msg = ts.to_bytes(8, "big") expected = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()[:16] if not hmac.compare_digest(sig, expected): return False return (time.time() - ts) < cooldown_seconds except Exception: return False def _verify_hcaptcha(token: str) -> bool: """ Vérifie un token hCaptcha. Renvoie True si HCAPTCHA_SECRET_KEY n'est pas configuré (stub désactivé). """ secret = os.environ.get("HCAPTCHA_SECRET_KEY", "").strip() if not secret: return True if not token: return False try: payload = urllib.parse.urlencode({"secret": secret, "response": token}).encode() req = urllib.request.Request( "https://hcaptcha.com/siteverify", data=payload, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) with urllib.request.urlopen(req, timeout=5) as resp: result = json.loads(resp.read()) return bool(result.get("success", False)) except Exception: logger.warning("Vérification hCaptcha impossible (erreur réseau) — skip") return True def _check_flood(ip: str, fingerprint_hash: str | None) -> bool: key = fingerprint_hash if fingerprint_hash else ip now = time.time() with _flood_lock: times = _flood_tracker.get(key, []) times = [t for t in times if now - t < FLOOD_WINDOW_SECONDS] times.append(now) _flood_tracker[key] = times return len(times) > FLOOD_THRESHOLD # ─── Helpers consultations ──────────────────────────────────────────────────── def _is_consultation_open(c: dict) -> bool: """Retourne True si la consultation est dans sa fenêtre temporelle et non fermée.""" if c.get("closed_at"): return False now = dt.datetime.now(dt.timezone.utc) starts_at = c.get("starts_at") if starts_at and starts_at.tzinfo and starts_at > now: return False ends_at = c.get("ends_at") if ends_at and ends_at.tzinfo and ends_at < now: return False return True def serialize_consultation(c: dict) -> dict: return { "id": c["id"], "slug": c["slug"], "title": c["title"], "subject": c["subject"], "introMessage": c.get("intro_message"), "organizerName": c.get("organizer_name"), "organizerLogoUrl": c.get("organizer_logo_url"), "startsAt": c["starts_at"].isoformat() if c.get("starts_at") else None, "endsAt": c["ends_at"].isoformat() if c.get("ends_at") else None, "closedAt": c["closed_at"].isoformat() if c.get("closed_at") else None, "createdAt": c["created_at"].isoformat() if c.get("created_at") else None, "isOpen": _is_consultation_open(c), } def _trigger_webhook(consultation: dict, synthesis: dict | None) -> None: """Envoie les résultats de clôture au webhook configuré (non-bloquant).""" url = consultation.get("webhook_url") if not url: return try: payload = { "event": "consultation_closed", "consultation": { "slug": consultation["slug"], "title": consultation["title"], "closedAt": consultation["closed_at"].isoformat() if consultation.get("closed_at") else None, }, "synthesis": { "text": synthesis["text"] if synthesis else None, "ideaCount": synthesis["idea_count"] if synthesis else 0, }, } data = json.dumps(payload).encode() req = urllib.request.Request( url, data=data, headers={ "Content-Type": "application/json", "User-Agent": "LaVoixDuPeuple/1.0", }, ) with urllib.request.urlopen(req, timeout=10) as resp: logger.info("Webhook consultation '%s' → HTTP %d", consultation["slug"], resp.status) except Exception: logger.warning("Webhook consultation '%s' échoué (non-bloquant)", consultation.get("slug")) # ─── Gestion des erreurs ───────────────────────────────────────────────────── @app.errorhandler(400) def bad_request(e): return jsonify({"error": "bad_request", "message": "Requête invalide."}), 400 @app.errorhandler(404) def not_found(e): return jsonify({"error": "not_found", "message": "Ressource introuvable."}), 404 @app.errorhandler(405) def method_not_allowed(e): return jsonify({"error": "method_not_allowed", "message": "Méthode non autorisée."}), 405 @app.errorhandler(429) def rate_limit_exceeded(e): return jsonify({ "error": "rate_limit_exceeded", "message": "Trop de requêtes. Veuillez patienter avant de soumettre une nouvelle idée.", }), 429 @app.errorhandler(500) def internal_error(e): logger.exception("Erreur interne non gérée") return jsonify({"error": "internal_error", "message": "Erreur interne du serveur."}), 500 # ─── Routes publiques ───────────────────────────────────────────────────────── @app.get("/api/healthz") def health(): return jsonify({"status": "ok"}) @app.get("/api/ideas") @limiter.limit("120 per minute") def list_ideas(): """Retourne toutes les idées globales acceptées (consultation_id IS NULL).""" ideas = get_accepted_ideas(consultation_id=None) return jsonify([serialize_idea(i) for i in ideas]) @app.get("/api/ideas/stats") @limiter.limit("120 per minute") def idea_stats(): stats = get_stats() return jsonify(stats) @app.post("/api/ideas") @limiter.limit(RATE_LIMIT_CONTRIBUTIONS, key_func=get_fingerprint_key) def submit_idea(): """ Soumet une idée citoyenne dans le contexte global. Protections anti-abus (dans l'ordre) : 1. Honeypot — rejet silencieux si champ leurre rempli 2. hCaptcha — vérification si HCAPTCHA_SECRET_KEY configuré 3. Cooldown cookie — rejet si soumission trop récente 4. Rate limiting — 3/heure par IP ou fingerprint (configurable) 5. Flood detection — alerte si > 10 soumissions / 5 min 6. Fingerprinting non-PII — hash de l'identifiant FingerprintJS 7. Filtrage IA — cadre légal international """ if not request.is_json: return jsonify({"error": "bad_request", "message": "Content-Type doit être application/json."}), 400 data = request.get_json(silent=True) if data is None: return jsonify({"error": "bad_request", "message": "Corps JSON invalide."}), 400 if data.get("_hp"): logger.info("Honeypot déclenché — soumission ignorée silencieusement") return jsonify({"id": 0, "accepted": True, "reason": None, "legalBasis": None}), 201 hcaptcha_token = ( (data.get("_h") or "") or request.headers.get("X-HCaptcha-Token", "") ) if not _verify_hcaptcha(hcaptcha_token): logger.warning("hCaptcha échoué — IP: %s", get_remote_address()) return jsonify({"error": "captcha_failed", "message": "Vérification CAPTCHA échouée. Veuillez réessayer."}), 400 secret_key = os.environ.get("SECRET_KEY", "").strip() if CONTRIBUTION_COOLDOWN_SECONDS > 0 and secret_key: cv = request.cookies.get("_cv", "") if cv and _verify_cooldown(cv, secret_key, CONTRIBUTION_COOLDOWN_SECONDS): logger.info("Cooldown actif — soumission rejetée (même session)") return jsonify({ "error": "cooldown", "message": "Vous avez déjà contribué récemment. Veuillez patienter avant de soumettre une nouvelle idée.", }), 429 validated, error = validate_idea_input(data) if error: return jsonify({"error": "validation_error", "message": error}), 400 content = validated["content"] author = validated["author"] raw_fp = request.headers.get("X-Visitor-Id", "").strip() fingerprint_hash = ( hashlib.sha256(raw_fp.encode()).hexdigest()[:32] if raw_fp else None ) client_ip = get_remote_address() if _check_flood(client_ip, fingerprint_hash): logger.warning( "ALERTE FLOOD — IP: %s | fingerprint: %s | seuil: %d/5min", client_ip, fingerprint_hash, FLOOD_THRESHOLD, ) logger.info("Filtrage d'une nouvelle idée (longueur: %d)", len(content)) filter_result = filter_idea(content) accepted = bool(filter_result.get("accepted", False)) rejection_reason = filter_result.get("reason") if not accepted else None legal_basis = filter_result.get("legal_basis") if not accepted else None idea = insert_idea(content, author, accepted, rejection_reason, legal_basis, fingerprint_hash) if accepted: threading.Thread(target=_update_synthesis_background, daemon=True).start() resp_data = { "id": idea["id"], "accepted": accepted, "reason": rejection_reason, "legalBasis": legal_basis if not accepted else None, "idea": serialize_idea(idea), } response = make_response(jsonify(resp_data), 201) if accepted and CONTRIBUTION_COOLDOWN_SECONDS > 0 and secret_key: response.set_cookie( "_cv", _sign_cooldown(secret_key), max_age=CONTRIBUTION_COOLDOWN_SECONDS, httponly=True, samesite="Lax", secure=request.is_secure, ) return response @app.get("/api/synthesis") @limiter.limit("120 per minute") def get_synthesis_route(): """Retourne la synthèse globale (consultation_id IS NULL).""" synthesis = get_synthesis(consultation_id=None) if not synthesis: return jsonify({ "text": ( "Aucune idée n'a encore été soumise. " "Soyez le premier à partager votre vision pour une société meilleure, " "fondée sur la Déclaration universelle des droits de l'homme." ), "ideaCount": 0, "updatedAt": None, }) return jsonify({ "text": synthesis["text"], "ideaCount": synthesis["idea_count"], "updatedAt": synthesis["updated_at"].isoformat() if synthesis["updated_at"] else None, }) # ─── Routes publiques : consentement, stats, contributions ─────────────────── @app.post("/api/consent") @limiter.limit("5 per minute") def record_consent(): """Enregistre le consentement explicite d'un citoyen (art. 9.2.a RGPD).""" if not request.is_json: return jsonify({"error": "bad_request", "message": "Content-Type doit être application/json."}), 400 data = request.get_json(silent=True) or {} consent_version = sanitize_text(str(data.get("consent_version", "1.0") or "1.0"))[:20] raw_fp = request.headers.get("X-Visitor-Id", "").strip() fingerprint_hash = ( hashlib.sha256(raw_fp.encode()).hexdigest()[:32] if raw_fp else "anonymous" ) create_consent(fingerprint_hash, consent_version) logger.info( "Consentement enregistré — fingerprint: %s... | version: %s", fingerprint_hash[:8], consent_version, ) return jsonify({"ok": True}), 201 @app.get("/api/stats/public") @limiter.limit("120 per minute") def public_stats(): """Statistiques publiques globales (hors consultations).""" return jsonify(get_public_stats()) @app.get("/api/contributions") @limiter.limit("60 per minute") def public_contributions(): """Contributions globales acceptées, paginées, anti-chronologiques.""" try: page = max(1, int(request.args.get("page", 1))) per_page = min(50, max(5, int(request.args.get("per_page", 20)))) except (ValueError, TypeError): page, per_page = 1, 20 contributions, total = get_public_contributions(page=page, per_page=per_page) pages = max(1, -(-total // per_page)) return jsonify({ "contributions": [ { "id": c["id"], "content": c["content"], "author": c.get("author"), "createdAt": c["created_at"].isoformat() if c.get("created_at") else None, } for c in contributions ], "total": total, "page": page, "perPage": per_page, "pages": pages, }) @app.get("/api/contributions/export/json") @limiter.limit("10 per minute") def export_contributions_json(): """Export JSON intégral des contributions globales acceptées.""" contributions, _ = get_public_contributions(page=1, per_page=10000) payload = [ { "id": c["id"], "content": c["content"], "author": c.get("author"), "createdAt": c["created_at"].isoformat() if c.get("created_at") else None, } for c in contributions ] return Response( json.dumps(payload, ensure_ascii=False, indent=2), mimetype="application/json", headers={"Content-Disposition": "attachment; filename=contributions.json"}, ) @app.get("/api/contributions/export/csv") @limiter.limit("10 per minute") def export_contributions_csv(): """Export CSV des contributions globales acceptées (champs publics uniquement).""" contributions, _ = get_public_contributions(page=1, per_page=10000) output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_ALL) writer.writerow(["id", "content", "author", "created_at"]) for c in contributions: writer.writerow([ c.get("id"), c.get("content", ""), c.get("author", ""), c["created_at"].isoformat() if c.get("created_at") else "", ]) return Response( output.getvalue().encode("utf-8-sig"), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=contributions.csv"}, ) # ─── Route publique : signalement ──────────────────────────────────────────── @app.post("/api/ideas//flag") @limiter.limit("3 per minute; 10 per hour") def flag_idea_route(idea_id: int): """Signale une contribution pour examen par l'administrateur.""" result = flag_idea(idea_id) if not result: return jsonify({"error": "not_found", "message": "Contribution introuvable ou non publiée."}), 404 return jsonify({"ok": True, "flagCount": result.get("flag_count", 1)}) # ─── Routes publiques : consultations (P5) ─────────────────────────────────── @app.get("/api/consultations") @limiter.limit("120 per minute") def list_consultations_route(): """Liste les consultations (actives par défaut, toutes si include_closed=true).""" include_closed = request.args.get("include_closed", "false").lower() == "true" consultations = list_consultations(include_closed=include_closed) return jsonify([serialize_consultation(c) for c in consultations]) @app.get("/api/consultations/") @limiter.limit("120 per minute") def get_consultation_route(slug: str): """Détails et statistiques d'une consultation.""" slug = sanitize_text(slug)[:100] consultation = get_consultation_by_slug(slug) if not consultation: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 stats = get_consultation_stats(consultation["id"]) return jsonify({**serialize_consultation(consultation), "stats": stats}) @app.get("/api/consultations//synthesis") @limiter.limit("120 per minute") def get_consultation_synthesis_route(slug: str): """Synthèse collective d'une consultation.""" slug = sanitize_text(slug)[:100] consultation = get_consultation_by_slug(slug) if not consultation: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 synthesis = get_synthesis(consultation_id=consultation["id"]) if not synthesis: return jsonify({ "text": "Aucune contribution n'a encore été soumise pour cette consultation.", "ideaCount": 0, "updatedAt": None, }) return jsonify({ "text": synthesis["text"], "ideaCount": synthesis["idea_count"], "updatedAt": synthesis["updated_at"].isoformat() if synthesis["updated_at"] else None, }) @app.get("/api/consultations//contributions") @limiter.limit("60 per minute") def get_consultation_contributions_route(slug: str): """Contributions acceptées d'une consultation, paginées.""" slug = sanitize_text(slug)[:100] consultation = get_consultation_by_slug(slug) if not consultation: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 try: page = max(1, int(request.args.get("page", 1))) per_page = min(50, max(5, int(request.args.get("per_page", 20)))) except (ValueError, TypeError): page, per_page = 1, 20 contributions, total = get_consultation_contributions( consultation["id"], page=page, per_page=per_page ) pages = max(1, -(-total // per_page)) return jsonify({ "contributions": [ { "id": c["id"], "content": c["content"], "author": c.get("author"), "createdAt": c["created_at"].isoformat() if c.get("created_at") else None, } for c in contributions ], "total": total, "page": page, "perPage": per_page, "pages": pages, }) @app.post("/api/consultations//ideas") @limiter.limit(RATE_LIMIT_CONTRIBUTIONS, key_func=get_fingerprint_key) def submit_consultation_idea(slug: str): """Soumet une idée dans le contexte d'une consultation ciblée.""" slug = sanitize_text(slug)[:100] consultation = get_consultation_by_slug(slug) if not consultation: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 if not _is_consultation_open(consultation): return jsonify({ "error": "consultation_closed", "message": "Cette consultation est terminée ou n'a pas encore débuté.", }), 403 if not request.is_json: return jsonify({"error": "bad_request", "message": "Content-Type doit être application/json."}), 400 data = request.get_json(silent=True) if data is None: return jsonify({"error": "bad_request", "message": "Corps JSON invalide."}), 400 if data.get("_hp"): return jsonify({"id": 0, "accepted": True, "reason": None}), 201 hcaptcha_token = (data.get("_h") or "") or request.headers.get("X-HCaptcha-Token", "") if not _verify_hcaptcha(hcaptcha_token): return jsonify({"error": "captcha_failed", "message": "Vérification CAPTCHA échouée."}), 400 validated, error = validate_idea_input(data) if error: return jsonify({"error": "validation_error", "message": error}), 400 content = validated["content"] author = validated["author"] raw_fp = request.headers.get("X-Visitor-Id", "").strip() fingerprint_hash = hashlib.sha256(raw_fp.encode()).hexdigest()[:32] if raw_fp else None if _check_flood(get_remote_address(), fingerprint_hash): logger.warning("ALERTE FLOOD — consultation '%s' | IP: %s", slug, get_remote_address()) filter_result = filter_idea(content) accepted = bool(filter_result.get("accepted", False)) rejection_reason = filter_result.get("reason") if not accepted else None legal_basis = filter_result.get("legal_basis") if not accepted else None idea = insert_idea( content, author, accepted, rejection_reason, legal_basis, fingerprint_hash, consultation["id"], ) if accepted: threading.Thread( target=_update_synthesis_background, kwargs={"consultation_id": consultation["id"]}, daemon=True, ).start() return jsonify({ "id": idea["id"], "accepted": accepted, "reason": rejection_reason, "legalBasis": legal_basis if not accepted else None, "idea": serialize_idea(idea), }), 201 @app.get("/api/consultations//export/print") @limiter.limit("10 per minute") def export_consultation_print(slug: str): """Rendu HTML de la synthèse pour impression/export PDF côté client.""" slug = sanitize_text(slug)[:100] consultation = get_consultation_by_slug(slug) if not consultation: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 synthesis = get_synthesis(consultation_id=consultation["id"]) stats = get_consultation_stats(consultation["id"]) synthesis_text = synthesis["text"] if synthesis else "Aucune synthèse disponible." idea_count = synthesis["idea_count"] if synthesis else 0 title_esc = html_module.escape(consultation["title"]) subject_esc = html_module.escape(consultation["subject"]) organizer_esc = html_module.escape(consultation.get("organizer_name") or "") synthesis_esc = html_module.escape(synthesis_text).replace("\n", "
") now_str = dt.datetime.now().strftime("%d/%m/%Y %H:%M") html_content = f""" {title_esc}

{title_esc}

{f'

{organizer_esc}

' if organizer_esc else ""}

{subject_esc}

Exporté le {now_str} — {idea_count} contribution(s) intégrée(s)

{synthesis_esc}
""" return Response(html_content, mimetype="text/html") # ─── Routes Admin ───────────────────────────────────────────────────────────── @app.post("/api/admin/login") @limiter.limit("10 per minute") def admin_login(): """Vérifie le mot de passe admin. Retourne ok si correct.""" secret = _get_admin_secret() if not secret: return jsonify({"error": "admin_not_configured", "message": "ADMIN_SECRET non configuré."}), 503 data = request.get_json(silent=True) or {} password = data.get("password", "") if password != secret: logger.warning("Tentative de connexion admin échouée") return jsonify({"error": "unauthorized", "message": "Mot de passe incorrect."}), 401 logger.info("Connexion admin réussie") return jsonify({"ok": True, "token": secret}) @app.get("/api/admin/stats") @require_admin def admin_stats(): """Statistiques détaillées pour l'administrateur (toutes contributions confondues).""" stats = get_stats() return jsonify(stats) @app.get("/api/admin/ideas") @require_admin @limiter.limit("120 per minute") def admin_list_ideas(): """Liste toutes les contributions avec filtres et pagination.""" status = request.args.get("status", "all") page = max(1, int(request.args.get("page", 1))) per_page = min(100, max(10, int(request.args.get("per_page", 50)))) search = request.args.get("q", "").strip() if status not in ("all", "accepted", "rejected", "flagged"): status = "all" ideas, total = get_ideas_admin(status=status, page=page, per_page=per_page, search=search) return jsonify({ "ideas": [serialize_idea_admin(i) for i in ideas], "total": total, "page": page, "perPage": per_page, "pages": max(1, -(-total // per_page)), }) @app.delete("/api/admin/ideas/") @require_admin def admin_delete_idea(idea_id: int): """Supprime une contribution et régénère la synthèse du contexte concerné.""" deleted = delete_idea(idea_id) if not deleted: return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404 threading.Thread(target=_update_synthesis_background, daemon=True).start() logger.info("Admin — contribution #%d supprimée", idea_id) return jsonify({"ok": True, "synthesisUpdating": True}) @app.post("/api/admin/ideas/bulk-delete") @require_admin def admin_bulk_delete(): """Suppression en masse de contributions.""" data = request.get_json(silent=True) or {} ids = data.get("ids", []) if not isinstance(ids, list) or not ids: return jsonify({"error": "validation_error", "message": "ids doit être une liste non vide."}), 400 ids = [int(i) for i in ids if isinstance(i, (int, str)) and str(i).isdigit()] count = bulk_delete_ideas(ids) if count > 0: threading.Thread(target=_update_synthesis_background, daemon=True).start() logger.info("Admin — %d contribution(s) supprimée(s) en masse", count) return jsonify({"ok": True, "deleted": count, "synthesisUpdating": count > 0}) @app.post("/api/admin/ideas//override") @require_admin def admin_override_idea(idea_id: int): """Modifie manuellement le statut d'une contribution.""" data = request.get_json(silent=True) or {} accepted = bool(data.get("accepted", False)) reason = sanitize_text(data.get("reason", "") or "") note = sanitize_text(data.get("note", "") or "") result = override_idea(idea_id, accepted, reason or None, note or None) if not result: return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404 threading.Thread(target=_update_synthesis_background, daemon=True).start() logger.info("Admin — contribution #%d override → accepted=%s", idea_id, accepted) return jsonify({"ok": True, "idea": serialize_idea_admin(result), "synthesisUpdating": True}) @app.post("/api/admin/ideas//unflag") @require_admin def admin_unflag_idea(idea_id: int): """Retire le signalement d'une contribution.""" result = unflag_idea(idea_id) if not result: return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404 return jsonify({"ok": True, "idea": serialize_idea_admin(result)}) @app.post("/api/admin/synthesis/regenerate") @require_admin @limiter.limit("5 per minute") def admin_regenerate_synthesis(): """Force la régénération complète de la synthèse globale.""" threading.Thread(target=_update_synthesis_background, daemon=True).start() logger.info("Admin — régénération manuelle de la synthèse déclenchée") return jsonify({"ok": True, "message": "Régénération lancée en arrière-plan."}) @app.get("/api/admin/export/csv") @require_admin def admin_export_csv(): """Exporte toutes les contributions en CSV (admin).""" ideas, _ = get_ideas_admin(status="all", page=1, per_page=10000) output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_ALL) writer.writerow([ "id", "content", "author", "accepted", "flagged", "flag_count", "rejection_reason", "legal_basis", "admin_note", "fingerprint_hash", "consultation_id", "created_at", ]) for idea in ideas: writer.writerow([ idea.get("id"), idea.get("content", ""), idea.get("author", ""), "oui" if idea.get("accepted") else "non", "oui" if idea.get("flagged") else "non", idea.get("flag_count", 0), idea.get("rejection_reason", ""), idea.get("legal_basis", ""), idea.get("admin_note", ""), idea.get("fingerprint_hash", ""), idea.get("consultation_id", ""), idea.get("created_at").isoformat() if idea.get("created_at") else "", ]) return Response( output.getvalue().encode("utf-8-sig"), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=contributions.csv"}, ) # ─── Routes Admin : consultations (P5) ─────────────────────────────────────── @app.get("/api/admin/consultations") @require_admin def admin_list_consultations(): """Liste toutes les consultations (actives et fermées) avec leurs statistiques.""" consultations = list_consultations(include_closed=True) result = [] for c in consultations: stats = get_consultation_stats(c["id"]) result.append({**serialize_consultation(c), "stats": stats}) return jsonify(result) @app.post("/api/admin/consultations") @require_admin @limiter.limit("20 per minute") def admin_create_consultation(): """Crée une nouvelle consultation ciblée.""" data = request.get_json(silent=True) or {} slug_raw = sanitize_text(str(data.get("slug", "") or ""))[:100].lower() # Normalisation : uniquement alphanumériques et tirets slug = "".join(c if c.isalnum() or c == "-" else "-" for c in slug_raw) slug = "-".join(filter(None, slug.split("-"))) # supprime les tirets multiples/en bord title = sanitize_text(str(data.get("title", "") or ""))[:200] subject = sanitize_text(str(data.get("subject", "") or ""))[:2000] intro_message = sanitize_text(str(data.get("introMessage", "") or ""))[:2000] or None organizer_name = sanitize_text(str(data.get("organizerName", "") or ""))[:200] or None webhook_url = sanitize_text(str(data.get("webhookUrl", "") or ""))[:500] or None # URL logo : uniquement HTTPS pour éviter le contenu mixte logo_raw = sanitize_text(str(data.get("organizerLogoUrl", "") or ""))[:500] organizer_logo_url = logo_raw if logo_raw.startswith("https://") else None if not slug or not title or not subject: return jsonify({"error": "validation_error", "message": "slug, title et subject sont requis."}), 400 try: starts_at_str = data.get("startsAt", "") starts_at = dt.datetime.fromisoformat(starts_at_str) if starts_at_str else dt.datetime.now(dt.timezone.utc) ends_at_str = data.get("endsAt", "") ends_at = dt.datetime.fromisoformat(ends_at_str) if ends_at_str else None except (ValueError, TypeError): return jsonify({"error": "validation_error", "message": "Format de date invalide (ISO 8601 requis)."}), 400 try: consultation = create_consultation( slug, title, subject, intro_message, organizer_name, organizer_logo_url, starts_at, ends_at, webhook_url, ) except Exception as e: if "unique" in str(e).lower() or "duplicate" in str(e).lower(): return jsonify({"error": "conflict", "message": f"Le slug '{slug}' est déjà utilisé."}), 409 raise logger.info("Admin — consultation '%s' créée", slug) return jsonify(serialize_consultation(consultation)), 201 @app.post("/api/admin/consultations//close") @require_admin def admin_close_consultation(slug: str): """Ferme manuellement une consultation et déclenche le webhook si configuré.""" slug = sanitize_text(slug)[:100] consultation = get_consultation_by_slug(slug) if not consultation: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 closed = close_consultation(consultation["id"]) if not closed: return jsonify({"error": "already_closed", "message": "Cette consultation est déjà fermée."}), 409 synthesis = get_synthesis(consultation_id=consultation["id"]) threading.Thread(target=_trigger_webhook, args=(closed, synthesis), daemon=True).start() logger.info("Admin — consultation '%s' fermée manuellement", slug) return jsonify({"ok": True, "consultation": serialize_consultation(closed)}) @app.delete("/api/admin/consultations/") @require_admin def admin_delete_consultation(slug: str): """Supprime une consultation et toutes ses contributions/synthèse associées.""" slug = sanitize_text(slug)[:100] consultation = get_consultation_by_slug(slug) if not consultation: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 deleted = delete_consultation(consultation["id"]) if not deleted: return jsonify({"error": "not_found", "message": "Consultation introuvable."}), 404 logger.info("Admin — consultation '%s' supprimée", slug) return jsonify({"ok": True}) # ─── Helpers sérialiseurs ───────────────────────────────────────────────────── def serialize_idea(idea: dict) -> dict: return { "id": idea["id"], "content": idea["content"], "author": idea.get("author"), "accepted": idea["accepted"], "flagged": idea.get("flagged", False), "flagCount": idea.get("flag_count", 0), "rejectionReason": idea.get("rejection_reason"), "legalBasis": idea.get("legal_basis"), "createdAt": idea["created_at"].isoformat() if idea.get("created_at") else None, } def serialize_idea_admin(idea: dict) -> dict: base = serialize_idea(idea) base["adminNote"] = idea.get("admin_note") base["fingerprintHash"] = idea.get("fingerprint_hash") base["consultationId"] = idea.get("consultation_id") return base def _update_synthesis_background(consultation_id: int | None = None) -> None: """Met à jour la synthèse d'un contexte donné en arrière-plan.""" try: ideas = get_accepted_ideas(consultation_id=consultation_id) texts = [i["content"] for i in ideas] synthesized = synthesize_ideas(texts) upsert_synthesis(synthesized, len(texts), consultation_id=consultation_id) logger.info( "Synthèse mise à jour (consultation_id=%s) — %d idée(s).", consultation_id, len(texts), ) except Exception: logger.exception("Erreur lors de la mise à jour de la synthèse en arrière-plan") # ─── Démarrage ──────────────────────────────────────────────────────────────── if __name__ == "__main__": port = int(os.environ.get("PORT", 8080)) logger.info("Initialisation de la base de données...") init_db() logger.info( "La Voix du Peuple — Flask démarre sur le port %d (storage: %s)", port, "Redis" if _redis_url else "mémoire", ) app.run(host="0.0.0.0", port=port, debug=False)