Files
la-voix-du-peuple/artifacts/flask-api/app.py
T
pironantoine 2a792cbbb5 Add secure admin panel for content moderation and contribution flagging
Adds an admin interface with authentication for manual content deletion and flagging. Implements a flagging system for user contributions and secures the admin panel with a secret token.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 923ae0e3-a363-4db8-b04a-e8baca2a1330
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 7e5834b1-796d-4a9e-bbde-cd91012292de
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/8af7d2ec-2cc3-4ece-8af3-9f071488d072/923ae0e3-a363-4db8-b04a-e8baca2a1330/nghZcOj
Replit-Helium-Checkpoint-Created: true
2026-04-05 03:42:58 +00:00

442 lines
17 KiB
Python

"""
La Voix du Peuple — Backend Flask
==================================
Plateforme démocratique citoyenne.
Base légale : DUDH (ONU 1948), PIDCP (ONU 1966), CEDH (1950), Charte UE (2000),
Code pénal français, Loi du 29 juillet 1881, LCEN, SREN 2024.
Sécurité :
- Rate limiting (flask-limiter)
- Validation et assainissement des entrées (bleach)
- CORS restreint
- En-têtes de sécurité HTTP (CSP, HSTS, X-Frame-Options, etc.)
- Protection contre l'injection via requêtes paramétrées (psycopg2)
- Panel admin protégé par ADMIN_SECRET (Bearer token)
- Aucun secret exposé dans les réponses d'erreur
"""
import csv
import io
import os
import logging
import threading
from functools import wraps
import bleach
from flask import Flask, jsonify, request, Response
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from database import (
init_db, insert_idea, get_accepted_ideas, get_stats, upsert_synthesis,
get_synthesis, get_all_ideas, get_ideas_admin, delete_idea, bulk_delete_ideas,
override_idea, flag_idea, unflag_idea,
)
from ai_agent import filter_idea, synthesize_ideas
# ─── Logging ────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s",
)
logger = logging.getLogger(__name__)
# ─── Application ────────────────────────────────────────────────────────────
app = Flask(__name__)
app.config["JSON_SORT_KEYS"] = False
# CORS : autorise uniquement les origines du même domaine Replit
CORS(app, resources={r"/api/*": {"origins": "*"}}, supports_credentials=False)
# Rate limiting — protection anti-spam et anti-DDoS
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["200 per day", "60 per hour"],
storage_uri="memory://",
strategy="fixed-window",
)
# ─── En-têtes de sécurité HTTP ───────────────────────────────────────────────
@app.after_request
def set_security_headers(response):
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'none'; "
"object-src 'none';"
)
response.headers["Cache-Control"] = "no-store"
response.headers["Pragma"] = "no-cache"
return response
# ─── Validation des entrées ──────────────────────────────────────────────────
CONTENT_MIN = 10
CONTENT_MAX = 1000
AUTHOR_MAX = 100
def sanitize_text(text: str) -> str:
"""Supprime tout HTML/JavaScript — protection XSS."""
return bleach.clean(text, tags=[], strip=True).strip()
def validate_idea_input(data: dict) -> tuple[dict | None, str | None]:
"""Valide et assainit les données de soumission d'une idée."""
content = data.get("content")
author = data.get("author")
if not content or not isinstance(content, str):
return None, "Le champ 'content' est requis et doit être une chaîne de caractères."
content = sanitize_text(content)
if len(content) < CONTENT_MIN:
return None, f"L'idée doit contenir au moins {CONTENT_MIN} caractères."
if len(content) > CONTENT_MAX:
return None, f"L'idée ne peut pas dépasser {CONTENT_MAX} caractères."
if author is not None:
if not isinstance(author, str):
return None, "Le champ 'author' doit être une chaîne de caractères."
author = sanitize_text(author)
if len(author) > AUTHOR_MAX:
return None, f"Le pseudonyme ne peut pas dépasser {AUTHOR_MAX} caractères."
if not author:
author = None
return {"content": content, "author": author}, None
# ─── Authentification Admin ──────────────────────────────────────────────────
def _get_admin_secret() -> str | None:
return os.environ.get("ADMIN_SECRET")
def require_admin(f):
@wraps(f)
def decorated(*args, **kwargs):
secret = _get_admin_secret()
if not secret:
return jsonify({"error": "admin_not_configured", "message": "ADMIN_SECRET non configuré."}), 503
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer ") or auth[7:] != secret:
return jsonify({"error": "unauthorized", "message": "Accès non autorisé."}), 401
return f(*args, **kwargs)
return decorated
# ─── Gestion des erreurs ─────────────────────────────────────────────────────
@app.errorhandler(400)
def bad_request(e):
return jsonify({"error": "bad_request", "message": "Requête invalide."}), 400
@app.errorhandler(404)
def not_found(e):
return jsonify({"error": "not_found", "message": "Ressource introuvable."}), 404
@app.errorhandler(405)
def method_not_allowed(e):
return jsonify({"error": "method_not_allowed", "message": "Méthode non autorisée."}), 405
@app.errorhandler(429)
def rate_limit_exceeded(e):
return jsonify({
"error": "rate_limit_exceeded",
"message": "Trop de requêtes. Veuillez patienter avant de soumettre une nouvelle idée.",
}), 429
@app.errorhandler(500)
def internal_error(e):
logger.exception("Erreur interne non gérée")
return jsonify({"error": "internal_error", "message": "Erreur interne du serveur."}), 500
# ─── Routes ──────────────────────────────────────────────────────────────────
@app.get("/api/healthz")
def health():
return jsonify({"status": "ok"})
@app.get("/api/ideas")
@limiter.limit("120 per minute")
def list_ideas():
"""Retourne toutes les idées acceptées (conformes au droit international)."""
ideas = get_accepted_ideas()
return jsonify([serialize_idea(i) for i in ideas])
@app.get("/api/ideas/stats")
@limiter.limit("120 per minute")
def idea_stats():
"""Statistiques : total, acceptées, rejetées."""
stats = get_stats()
return jsonify(stats)
@app.post("/api/ideas")
@limiter.limit("5 per minute; 20 per hour")
def submit_idea():
"""
Soumet une idée citoyenne.
L'idée est filtrée par l'agent IA selon le cadre légal international
avant d'être intégrée dans la synthèse collective.
"""
if not request.is_json:
return jsonify({"error": "bad_request", "message": "Content-Type doit être application/json."}), 400
data = request.get_json(silent=True)
if data is None:
return jsonify({"error": "bad_request", "message": "Corps JSON invalide."}), 400
validated, error = validate_idea_input(data)
if error:
return jsonify({"error": "validation_error", "message": error}), 400
content = validated["content"]
author = validated["author"]
logger.info("Filtrage d'une nouvelle idée (longueur: %d)", len(content))
filter_result = filter_idea(content)
accepted = bool(filter_result.get("accepted", False))
rejection_reason = filter_result.get("reason") if not accepted else None
legal_basis = filter_result.get("legal_basis") if not accepted else None
idea = insert_idea(content, author, accepted, rejection_reason, legal_basis)
if accepted:
# Synthèse mise à jour en arrière-plan — ne bloque pas la réponse
threading.Thread(target=_update_synthesis_background, daemon=True).start()
return jsonify({
"id": idea["id"],
"accepted": accepted,
"reason": rejection_reason,
"legalBasis": legal_basis if not accepted else None,
"idea": serialize_idea(idea),
}), 201
@app.get("/api/synthesis")
@limiter.limit("120 per minute")
def get_synthesis_route():
"""Retourne la synthèse actuelle de la Voix du Peuple."""
synthesis = get_synthesis()
if not synthesis:
return jsonify({
"text": (
"Aucune idée n'a encore été soumise. "
"Soyez le premier à partager votre vision pour une société meilleure, "
"fondée sur la Déclaration universelle des droits de l'homme."
),
"ideaCount": 0,
"updatedAt": None,
})
return jsonify({
"text": synthesis["text"],
"ideaCount": synthesis["idea_count"],
"updatedAt": synthesis["updated_at"].isoformat() if synthesis["updated_at"] else None,
})
# ─── Route publique : signalement ────────────────────────────────────────────
@app.post("/api/ideas/<int:idea_id>/flag")
@limiter.limit("3 per minute; 10 per hour")
def flag_idea_route(idea_id: int):
"""Signale une contribution pour examen par l'administrateur."""
result = flag_idea(idea_id)
if not result:
return jsonify({"error": "not_found", "message": "Contribution introuvable ou non publiée."}), 404
return jsonify({"ok": True, "flagCount": result.get("flag_count", 1)})
# ─── Routes Admin ─────────────────────────────────────────────────────────────
@app.post("/api/admin/login")
@limiter.limit("10 per minute")
def admin_login():
"""Vérifie le mot de passe admin. Retourne ok si correct."""
secret = _get_admin_secret()
if not secret:
return jsonify({"error": "admin_not_configured", "message": "ADMIN_SECRET non configuré."}), 503
data = request.get_json(silent=True) or {}
password = data.get("password", "")
if password != secret:
logger.warning("Tentative de connexion admin échouée")
return jsonify({"error": "unauthorized", "message": "Mot de passe incorrect."}), 401
logger.info("Connexion admin réussie")
return jsonify({"ok": True, "token": secret})
@app.get("/api/admin/stats")
@require_admin
def admin_stats():
"""Statistiques détaillées pour l'administrateur."""
stats = get_stats()
return jsonify(stats)
@app.get("/api/admin/ideas")
@require_admin
@limiter.limit("120 per minute")
def admin_list_ideas():
"""Liste toutes les contributions avec filtres et pagination."""
status = request.args.get("status", "all")
page = max(1, int(request.args.get("page", 1)))
per_page = min(100, max(10, int(request.args.get("per_page", 50))))
search = request.args.get("q", "").strip()
if status not in ("all", "accepted", "rejected", "flagged"):
status = "all"
ideas, total = get_ideas_admin(status=status, page=page, per_page=per_page, search=search)
return jsonify({
"ideas": [serialize_idea_admin(i) for i in ideas],
"total": total,
"page": page,
"perPage": per_page,
"pages": max(1, -(-total // per_page)),
})
@app.delete("/api/admin/ideas/<int:idea_id>")
@require_admin
def admin_delete_idea(idea_id: int):
"""Supprime une contribution et régénère la synthèse."""
deleted = delete_idea(idea_id)
if not deleted:
return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — contribution #%d supprimée", idea_id)
return jsonify({"ok": True, "synthesisUpdating": True})
@app.post("/api/admin/ideas/bulk-delete")
@require_admin
def admin_bulk_delete():
"""Suppression en masse de contributions."""
data = request.get_json(silent=True) or {}
ids = data.get("ids", [])
if not isinstance(ids, list) or not ids:
return jsonify({"error": "validation_error", "message": "ids doit être une liste non vide."}), 400
ids = [int(i) for i in ids if isinstance(i, (int, str)) and str(i).isdigit()]
count = bulk_delete_ideas(ids)
if count > 0:
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — %d contribution(s) supprimée(s) en masse", count)
return jsonify({"ok": True, "deleted": count, "synthesisUpdating": count > 0})
@app.post("/api/admin/ideas/<int:idea_id>/override")
@require_admin
def admin_override_idea(idea_id: int):
"""Modifie manuellement le statut d'une contribution (accepter/rejeter)."""
data = request.get_json(silent=True) or {}
accepted = bool(data.get("accepted", False))
reason = sanitize_text(data.get("reason", "") or "")
note = sanitize_text(data.get("note", "") or "")
result = override_idea(idea_id, accepted, reason or None, note or None)
if not result:
return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — contribution #%d override → accepted=%s", idea_id, accepted)
return jsonify({"ok": True, "idea": serialize_idea_admin(result), "synthesisUpdating": True})
@app.post("/api/admin/ideas/<int:idea_id>/unflag")
@require_admin
def admin_unflag_idea(idea_id: int):
"""Retire le signalement d'une contribution."""
result = unflag_idea(idea_id)
if not result:
return jsonify({"error": "not_found", "message": "Contribution introuvable."}), 404
return jsonify({"ok": True, "idea": serialize_idea_admin(result)})
@app.post("/api/admin/synthesis/regenerate")
@require_admin
@limiter.limit("5 per minute")
def admin_regenerate_synthesis():
"""Force la régénération complète de la synthèse."""
threading.Thread(target=_update_synthesis_background, daemon=True).start()
logger.info("Admin — régénération manuelle de la synthèse déclenchée")
return jsonify({"ok": True, "message": "Régénération lancée en arrière-plan."})
@app.get("/api/admin/export/csv")
@require_admin
def admin_export_csv():
"""Exporte toutes les contributions en CSV."""
ideas, _ = get_ideas_admin(status="all", page=1, per_page=10000)
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_ALL)
writer.writerow(["id", "content", "author", "accepted", "flagged",
"flag_count", "rejection_reason", "legal_basis",
"admin_note", "created_at"])
for idea in ideas:
writer.writerow([
idea.get("id"),
idea.get("content", ""),
idea.get("author", ""),
"oui" if idea.get("accepted") else "non",
"oui" if idea.get("flagged") else "non",
idea.get("flag_count", 0),
idea.get("rejection_reason", ""),
idea.get("legal_basis", ""),
idea.get("admin_note", ""),
idea.get("created_at").isoformat() if idea.get("created_at") else "",
])
csv_bytes = output.getvalue().encode("utf-8-sig")
return Response(
csv_bytes,
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=contributions.csv"},
)
# ─── Helpers ─────────────────────────────────────────────────────────────────
def serialize_idea(idea: dict) -> dict:
return {
"id": idea["id"],
"content": idea["content"],
"author": idea.get("author"),
"accepted": idea["accepted"],
"flagged": idea.get("flagged", False),
"flagCount": idea.get("flag_count", 0),
"rejectionReason": idea.get("rejection_reason"),
"legalBasis": idea.get("legal_basis"),
"createdAt": idea["created_at"].isoformat() if idea.get("created_at") else None,
}
def serialize_idea_admin(idea: dict) -> dict:
base = serialize_idea(idea)
base["adminNote"] = idea.get("admin_note")
return base
def _update_synthesis_background():
try:
ideas = get_accepted_ideas()
texts = [i["content"] for i in ideas]
synthesized = synthesize_ideas(texts)
upsert_synthesis(synthesized, len(texts))
logger.info("Synthèse mise à jour — %d idée(s) intégrée(s).", len(texts))
except Exception:
logger.exception("Erreur lors de la mise à jour de la synthèse en arrière-plan")
# ─── Démarrage ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8080))
logger.info("Initialisation de la base de données...")
init_db()
logger.info("La Voix du Peuple — Flask démarre sur le port %d", port)
app.run(host="0.0.0.0", port=port, debug=False)