Files
la-voix-du-peuple/artifacts/flask-api/database.py
T
billisdead fbc1fad8b9 P5 — Mode consultation ciblée (Option B, implémentation complète)
Backend :
- Nouvelle table `consultations` (slug unique, fenêtre temporelle, webhook, logo)
- `ideas.consultation_id` FK nullable (NULL = contexte global home)
- `synthesis.consultation_id` FK nullable (synthèse par contexte)
- Boucle auto-fermeture (thread daemon, 60 s) — ferme + webhook à l'échéance
- Webhook de clôture : POST JSON (synthèse + métadonnées) via urllib.request
- Routes publiques : GET/POST /api/consultations/<slug>, synthèse, contributions, export/print
- Routes admin : list, create, close (+ webhook), delete (cascade explicite)
- CSP ajustée sur /export/print pour autoriser window.print()

Frontend :
- Nouvelle page /consultation/:slug — formulaire, synthèse live, contributions paginées, PDF
- Admin panel : onglet Consultations — liste, formulaire création, fermeture, suppression

Docs : DAT.md v1.5, DEX.md v1.7 (section P5, tables, routes, webhook)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 10:00:39 +02:00

495 lines
18 KiB
Python

"""
La Voix du Peuple — Couche d'accès à la base de données PostgreSQL
Copyright (C) 2026 billisdead — Licence EUPL-1.2
Utilise psycopg2 directement — pas d'ORM, code lisible et transparent.
"""
import os
import logging
import psycopg2
import psycopg2.extras
from contextlib import contextmanager
logger = logging.getLogger(__name__)
def get_connection():
database_url = os.environ.get("DATABASE_URL")
if not database_url:
raise RuntimeError("DATABASE_URL est requis.")
return psycopg2.connect(database_url, cursor_factory=psycopg2.extras.RealDictCursor)
@contextmanager
def db_cursor():
conn = get_connection()
try:
with conn.cursor() as cur:
yield cur
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db() -> None:
"""Crée les tables si elles n'existent pas, et applique les migrations nécessaires."""
with db_cursor() as cur:
# Table consultations — créée AVANT ideas pour la contrainte FK
cur.execute("""
CREATE TABLE IF NOT EXISTS consultations (
id SERIAL PRIMARY KEY,
slug VARCHAR(100) UNIQUE NOT NULL,
title VARCHAR(200) NOT NULL,
subject TEXT NOT NULL,
intro_message TEXT,
organizer_name VARCHAR(200),
organizer_logo_url TEXT,
starts_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ends_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
webhook_url TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS ideas (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
author VARCHAR(100),
accepted BOOLEAN NOT NULL DEFAULT FALSE,
rejection_reason TEXT,
legal_basis TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
# Migrations incrémentales — idempotentes
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS legal_basis TEXT")
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flagged BOOLEAN NOT NULL DEFAULT FALSE")
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flag_count INTEGER NOT NULL DEFAULT 0")
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS admin_note TEXT")
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS fingerprint_hash VARCHAR(64)")
# P5 — consultation_id : NULL = contribution globale (page d'accueil)
cur.execute(
"ALTER TABLE ideas ADD COLUMN IF NOT EXISTS consultation_id INTEGER REFERENCES consultations(id) ON DELETE SET NULL"
)
cur.execute("""
CREATE TABLE IF NOT EXISTS synthesis (
id SERIAL PRIMARY KEY,
text TEXT NOT NULL,
idea_count INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
# P5 — une synthèse par contexte (NULL = synthèse globale)
cur.execute(
"ALTER TABLE synthesis ADD COLUMN IF NOT EXISTS consultation_id INTEGER REFERENCES consultations(id) ON DELETE CASCADE"
)
# Table de traçabilité des consentements RGPD (art. 7.1 — charge de la preuve)
cur.execute("""
CREATE TABLE IF NOT EXISTS consents (
id SERIAL PRIMARY KEY,
fingerprint_hash VARCHAR(64) NOT NULL,
consent_version VARCHAR(20) NOT NULL,
consented_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_consents_fingerprint ON consents(fingerprint_hash)"
)
cur.execute("CREATE INDEX IF NOT EXISTS idx_ideas_consultation ON ideas(consultation_id)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_synthesis_consultation ON synthesis(consultation_id)")
logger.info("Base de données initialisée.")
# ─── Idées ────────────────────────────────────────────────────────────────────
def insert_idea(
content: str,
author: str | None,
accepted: bool,
rejection_reason: str | None,
legal_basis: str | None,
fingerprint_hash: str | None = None,
consultation_id: int | None = None,
) -> dict:
with db_cursor() as cur:
cur.execute(
"""
INSERT INTO ideas
(content, author, accepted, rejection_reason, legal_basis, fingerprint_hash, consultation_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING *
""",
(content, author, accepted, rejection_reason, legal_basis, fingerprint_hash, consultation_id),
)
return dict(cur.fetchone())
def get_accepted_ideas(consultation_id: int | None = None) -> list[dict]:
"""Retourne les idées acceptées d'un contexte donné (NULL = global)."""
with db_cursor() as cur:
if consultation_id is None:
cur.execute(
"SELECT * FROM ideas WHERE accepted = TRUE AND consultation_id IS NULL ORDER BY created_at ASC"
)
else:
cur.execute(
"SELECT * FROM ideas WHERE accepted = TRUE AND consultation_id = %s ORDER BY created_at ASC",
(consultation_id,),
)
return [dict(row) for row in cur.fetchall()]
def get_all_ideas(limit: int = 50) -> list[dict]:
with db_cursor() as cur:
cur.execute(
"SELECT * FROM ideas ORDER BY created_at DESC LIMIT %s", (limit,)
)
return [dict(row) for row in cur.fetchall()]
def get_ideas_admin(
status: str = "all",
page: int = 1,
per_page: int = 50,
search: str = "",
consultation_id: int | None = None,
global_only: bool = False,
) -> tuple[list[dict], int]:
offset = (page - 1) * per_page
conditions = []
params: list = []
if global_only:
conditions.append("consultation_id IS NULL")
elif consultation_id is not None:
conditions.append("consultation_id = %s")
params.append(consultation_id)
if status == "accepted":
conditions.append("accepted = TRUE")
elif status == "rejected":
conditions.append("accepted = FALSE AND flagged = FALSE")
elif status == "flagged":
conditions.append("flagged = TRUE")
if search:
conditions.append("content ILIKE %s")
params.append(f"%{search}%")
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
with db_cursor() as cur:
cur.execute(f"SELECT COUNT(*) as total FROM ideas {where}", params)
total = cur.fetchone()["total"]
cur.execute(
f"SELECT * FROM ideas {where} ORDER BY flagged DESC, created_at DESC LIMIT %s OFFSET %s",
params + [per_page, offset],
)
rows = [dict(row) for row in cur.fetchall()]
return rows, total
def delete_idea(idea_id: int) -> bool:
with db_cursor() as cur:
cur.execute("DELETE FROM ideas WHERE id = %s RETURNING id", (idea_id,))
return cur.fetchone() is not None
def bulk_delete_ideas(idea_ids: list[int]) -> int:
if not idea_ids:
return 0
with db_cursor() as cur:
cur.execute(
"DELETE FROM ideas WHERE id = ANY(%s) RETURNING id",
(idea_ids,),
)
return len(cur.fetchall())
def override_idea(
idea_id: int,
accepted: bool,
reason: str | None,
note: str | None,
) -> dict | None:
with db_cursor() as cur:
cur.execute(
"""
UPDATE ideas
SET accepted = %s,
rejection_reason = %s,
flagged = FALSE,
admin_note = %s
WHERE id = %s
RETURNING *
""",
(accepted, reason, note, idea_id),
)
row = cur.fetchone()
return dict(row) if row else None
def flag_idea(idea_id: int) -> dict | None:
with db_cursor() as cur:
cur.execute(
"""
UPDATE ideas
SET flagged = TRUE, flag_count = flag_count + 1
WHERE id = %s AND accepted = TRUE
RETURNING *
""",
(idea_id,),
)
row = cur.fetchone()
return dict(row) if row else None
def unflag_idea(idea_id: int) -> dict | None:
with db_cursor() as cur:
cur.execute(
"UPDATE ideas SET flagged = FALSE WHERE id = %s RETURNING *",
(idea_id,),
)
row = cur.fetchone()
return dict(row) if row else None
def get_stats() -> dict:
with db_cursor() as cur:
cur.execute("SELECT COUNT(*) as total FROM ideas")
total = cur.fetchone()["total"]
cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE")
accepted = cur.fetchone()["accepted"]
cur.execute("SELECT COUNT(*) as rejected FROM ideas WHERE accepted = FALSE")
rejected = cur.fetchone()["rejected"]
cur.execute("SELECT COUNT(*) as flagged FROM ideas WHERE flagged = TRUE")
flagged = cur.fetchone()["flagged"]
return {"total": total, "accepted": accepted, "rejected": rejected, "flagged": flagged}
# ─── Synthèse ─────────────────────────────────────────────────────────────────
def upsert_synthesis(text: str, idea_count: int, consultation_id: int | None = None) -> dict:
"""Crée ou met à jour la synthèse d'un contexte (NULL = global, int = consultation)."""
with db_cursor() as cur:
if consultation_id is None:
cur.execute("SELECT id FROM synthesis WHERE consultation_id IS NULL LIMIT 1")
else:
cur.execute("SELECT id FROM synthesis WHERE consultation_id = %s LIMIT 1", (consultation_id,))
row = cur.fetchone()
if row:
cur.execute(
"""
UPDATE synthesis
SET text = %s, idea_count = %s, updated_at = NOW()
WHERE id = %s
RETURNING *
""",
(text, idea_count, row["id"]),
)
else:
cur.execute(
"""
INSERT INTO synthesis (text, idea_count, consultation_id)
VALUES (%s, %s, %s)
RETURNING *
""",
(text, idea_count, consultation_id),
)
return dict(cur.fetchone())
def get_synthesis(consultation_id: int | None = None) -> dict | None:
"""Retourne la synthèse d'un contexte (NULL = globale)."""
with db_cursor() as cur:
if consultation_id is None:
cur.execute("SELECT * FROM synthesis WHERE consultation_id IS NULL LIMIT 1")
else:
cur.execute("SELECT * FROM synthesis WHERE consultation_id = %s LIMIT 1", (consultation_id,))
row = cur.fetchone()
return dict(row) if row else None
# ─── RGPD — Consentements ─────────────────────────────────────────────────────
def create_consent(fingerprint_hash: str, consent_version: str) -> dict:
"""Enregistre un consentement explicite (art. 7.1 RGPD — preuve)."""
with db_cursor() as cur:
cur.execute(
"""
INSERT INTO consents (fingerprint_hash, consent_version)
VALUES (%s, %s)
RETURNING *
""",
(fingerprint_hash, consent_version),
)
return dict(cur.fetchone())
# ─── Contributions publiques (contexte global) ────────────────────────────────
def get_public_contributions(page: int = 1, per_page: int = 20) -> tuple[list[dict], int]:
"""Contributions globales (consultation_id IS NULL) acceptées, paginées."""
offset = (page - 1) * per_page
with db_cursor() as cur:
cur.execute(
"SELECT COUNT(*) as total FROM ideas WHERE accepted = TRUE AND flagged = FALSE AND consultation_id IS NULL"
)
total = cur.fetchone()["total"]
cur.execute(
"""
SELECT id, content, author, created_at
FROM ideas
WHERE accepted = TRUE AND flagged = FALSE AND consultation_id IS NULL
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(per_page, offset),
)
rows = [dict(row) for row in cur.fetchall()]
return rows, total
def get_public_stats() -> dict:
"""Statistiques publiques globales (hors consultations)."""
with db_cursor() as cur:
cur.execute("SELECT COUNT(*) as total FROM ideas WHERE consultation_id IS NULL")
total = cur.fetchone()["total"]
cur.execute(
"SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE AND consultation_id IS NULL"
)
accepted = cur.fetchone()["accepted"]
cur.execute("SELECT updated_at FROM synthesis WHERE consultation_id IS NULL LIMIT 1")
row = cur.fetchone()
last_updated = row["updated_at"].isoformat() if row and row.get("updated_at") else None
return {"total": total, "accepted": accepted, "lastUpdated": last_updated}
# ─── Consultations (P5) ───────────────────────────────────────────────────────
def create_consultation(
slug: str,
title: str,
subject: str,
intro_message: str | None,
organizer_name: str | None,
organizer_logo_url: str | None,
starts_at,
ends_at,
webhook_url: str | None,
) -> dict:
with db_cursor() as cur:
cur.execute(
"""
INSERT INTO consultations
(slug, title, subject, intro_message, organizer_name,
organizer_logo_url, starts_at, ends_at, webhook_url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING *
""",
(slug, title, subject, intro_message, organizer_name,
organizer_logo_url, starts_at, ends_at, webhook_url),
)
return dict(cur.fetchone())
def get_consultation_by_slug(slug: str) -> dict | None:
with db_cursor() as cur:
cur.execute("SELECT * FROM consultations WHERE slug = %s", (slug,))
row = cur.fetchone()
return dict(row) if row else None
def list_consultations(include_closed: bool = False) -> list[dict]:
with db_cursor() as cur:
if include_closed:
cur.execute("SELECT * FROM consultations ORDER BY created_at DESC")
else:
cur.execute(
"SELECT * FROM consultations WHERE closed_at IS NULL ORDER BY created_at DESC"
)
return [dict(row) for row in cur.fetchall()]
def close_consultation(consultation_id: int) -> dict | None:
with db_cursor() as cur:
cur.execute(
"UPDATE consultations SET closed_at = NOW() WHERE id = %s AND closed_at IS NULL RETURNING *",
(consultation_id,),
)
row = cur.fetchone()
return dict(row) if row else None
def get_consultations_to_autoclose() -> list[dict]:
"""Consultations dont la date de fin est passée mais non encore fermées."""
with db_cursor() as cur:
cur.execute(
"""
SELECT * FROM consultations
WHERE ends_at IS NOT NULL
AND ends_at < NOW()
AND closed_at IS NULL
"""
)
return [dict(row) for row in cur.fetchall()]
def get_consultation_stats(consultation_id: int) -> dict:
with db_cursor() as cur:
cur.execute(
"SELECT COUNT(*) as total FROM ideas WHERE consultation_id = %s",
(consultation_id,),
)
total = cur.fetchone()["total"]
cur.execute(
"SELECT COUNT(*) as accepted FROM ideas WHERE consultation_id = %s AND accepted = TRUE",
(consultation_id,),
)
accepted = cur.fetchone()["accepted"]
cur.execute(
"SELECT updated_at FROM synthesis WHERE consultation_id = %s LIMIT 1",
(consultation_id,),
)
row = cur.fetchone()
last_updated = row["updated_at"].isoformat() if row and row.get("updated_at") else None
return {"total": total, "accepted": accepted, "lastUpdated": last_updated}
def get_consultation_contributions(
consultation_id: int, page: int = 1, per_page: int = 20
) -> tuple[list[dict], int]:
offset = (page - 1) * per_page
with db_cursor() as cur:
cur.execute(
"SELECT COUNT(*) as total FROM ideas WHERE consultation_id = %s AND accepted = TRUE AND flagged = FALSE",
(consultation_id,),
)
total = cur.fetchone()["total"]
cur.execute(
"""
SELECT id, content, author, created_at
FROM ideas
WHERE consultation_id = %s AND accepted = TRUE AND flagged = FALSE
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(consultation_id, per_page, offset),
)
rows = [dict(row) for row in cur.fetchall()]
return rows, total
def delete_consultation(consultation_id: int) -> bool:
"""Supprime une consultation et toutes ses données (idées, synthèse) dans la même transaction."""
with db_cursor() as cur:
# Supprimer explicitement les idées pour éviter qu'elles ne deviennent "globales" par SET NULL
cur.execute("DELETE FROM ideas WHERE consultation_id = %s", (consultation_id,))
cur.execute("DELETE FROM synthesis WHERE consultation_id = %s", (consultation_id,))
cur.execute("DELETE FROM consultations WHERE id = %s RETURNING id", (consultation_id,))
return cur.fetchone() is not None