""" La Voix du Peuple — Couche d'accès à la base de données PostgreSQL Copyright (C) 2026 billisdead — Licence EUPL-1.2 Utilise psycopg2 directement — pas d'ORM, code lisible et transparent. """ import os import logging import psycopg2 import psycopg2.extras from contextlib import contextmanager logger = logging.getLogger(__name__) def get_connection(): database_url = os.environ.get("DATABASE_URL") if not database_url: raise RuntimeError("DATABASE_URL est requis.") return psycopg2.connect(database_url, cursor_factory=psycopg2.extras.RealDictCursor) @contextmanager def db_cursor(): conn = get_connection() try: with conn.cursor() as cur: yield cur conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db() -> None: """Crée les tables si elles n'existent pas, et applique les migrations nécessaires.""" with db_cursor() as cur: # Table consultations — créée AVANT ideas pour la contrainte FK cur.execute(""" CREATE TABLE IF NOT EXISTS consultations ( id SERIAL PRIMARY KEY, slug VARCHAR(100) UNIQUE NOT NULL, title VARCHAR(200) NOT NULL, subject TEXT NOT NULL, intro_message TEXT, organizer_name VARCHAR(200), organizer_logo_url TEXT, starts_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), ends_at TIMESTAMPTZ, closed_at TIMESTAMPTZ, webhook_url TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS ideas ( id SERIAL PRIMARY KEY, content TEXT NOT NULL, author VARCHAR(100), accepted BOOLEAN NOT NULL DEFAULT FALSE, rejection_reason TEXT, legal_basis TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) # Migrations incrémentales — idempotentes cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS legal_basis TEXT") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flagged BOOLEAN NOT NULL DEFAULT FALSE") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flag_count INTEGER NOT NULL DEFAULT 0") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS admin_note TEXT") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS fingerprint_hash VARCHAR(64)") # P5 — consultation_id : NULL = contribution globale (page d'accueil) cur.execute( "ALTER TABLE ideas ADD COLUMN IF NOT EXISTS consultation_id INTEGER REFERENCES consultations(id) ON DELETE SET NULL" ) cur.execute(""" CREATE TABLE IF NOT EXISTS synthesis ( id SERIAL PRIMARY KEY, text TEXT NOT NULL, idea_count INTEGER NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) # P5 — une synthèse par contexte (NULL = synthèse globale) cur.execute( "ALTER TABLE synthesis ADD COLUMN IF NOT EXISTS consultation_id INTEGER REFERENCES consultations(id) ON DELETE CASCADE" ) # Table de traçabilité des consentements RGPD (art. 7.1 — charge de la preuve) cur.execute(""" CREATE TABLE IF NOT EXISTS consents ( id SERIAL PRIMARY KEY, fingerprint_hash VARCHAR(64) NOT NULL, consent_version VARCHAR(20) NOT NULL, consented_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) cur.execute( "CREATE INDEX IF NOT EXISTS idx_consents_fingerprint ON consents(fingerprint_hash)" ) cur.execute("CREATE INDEX IF NOT EXISTS idx_ideas_consultation ON ideas(consultation_id)") cur.execute("CREATE INDEX IF NOT EXISTS idx_synthesis_consultation ON synthesis(consultation_id)") logger.info("Base de données initialisée.") # ─── Idées ──────────────────────────────────────────────────────────────────── def insert_idea( content: str, author: str | None, accepted: bool, rejection_reason: str | None, legal_basis: str | None, fingerprint_hash: str | None = None, consultation_id: int | None = None, ) -> dict: with db_cursor() as cur: cur.execute( """ INSERT INTO ideas (content, author, accepted, rejection_reason, legal_basis, fingerprint_hash, consultation_id) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING * """, (content, author, accepted, rejection_reason, legal_basis, fingerprint_hash, consultation_id), ) return dict(cur.fetchone()) def get_accepted_ideas(consultation_id: int | None = None) -> list[dict]: """Retourne les idées acceptées d'un contexte donné (NULL = global).""" with db_cursor() as cur: if consultation_id is None: cur.execute( "SELECT * FROM ideas WHERE accepted = TRUE AND consultation_id IS NULL ORDER BY created_at ASC" ) else: cur.execute( "SELECT * FROM ideas WHERE accepted = TRUE AND consultation_id = %s ORDER BY created_at ASC", (consultation_id,), ) return [dict(row) for row in cur.fetchall()] def get_all_ideas(limit: int = 50) -> list[dict]: with db_cursor() as cur: cur.execute( "SELECT * FROM ideas ORDER BY created_at DESC LIMIT %s", (limit,) ) return [dict(row) for row in cur.fetchall()] def get_ideas_admin( status: str = "all", page: int = 1, per_page: int = 50, search: str = "", consultation_id: int | None = None, global_only: bool = False, ) -> tuple[list[dict], int]: offset = (page - 1) * per_page conditions = [] params: list = [] if global_only: conditions.append("consultation_id IS NULL") elif consultation_id is not None: conditions.append("consultation_id = %s") params.append(consultation_id) if status == "accepted": conditions.append("accepted = TRUE") elif status == "rejected": conditions.append("accepted = FALSE AND flagged = FALSE") elif status == "flagged": conditions.append("flagged = TRUE") if search: conditions.append("content ILIKE %s") params.append(f"%{search}%") where = ("WHERE " + " AND ".join(conditions)) if conditions else "" with db_cursor() as cur: cur.execute(f"SELECT COUNT(*) as total FROM ideas {where}", params) total = cur.fetchone()["total"] cur.execute( f"SELECT * FROM ideas {where} ORDER BY flagged DESC, created_at DESC LIMIT %s OFFSET %s", params + [per_page, offset], ) rows = [dict(row) for row in cur.fetchall()] return rows, total def delete_idea(idea_id: int) -> bool: with db_cursor() as cur: cur.execute("DELETE FROM ideas WHERE id = %s RETURNING id", (idea_id,)) return cur.fetchone() is not None def bulk_delete_ideas(idea_ids: list[int]) -> int: if not idea_ids: return 0 with db_cursor() as cur: cur.execute( "DELETE FROM ideas WHERE id = ANY(%s) RETURNING id", (idea_ids,), ) return len(cur.fetchall()) def override_idea( idea_id: int, accepted: bool, reason: str | None, note: str | None, ) -> dict | None: with db_cursor() as cur: cur.execute( """ UPDATE ideas SET accepted = %s, rejection_reason = %s, flagged = FALSE, admin_note = %s WHERE id = %s RETURNING * """, (accepted, reason, note, idea_id), ) row = cur.fetchone() return dict(row) if row else None def flag_idea(idea_id: int) -> dict | None: with db_cursor() as cur: cur.execute( """ UPDATE ideas SET flagged = TRUE, flag_count = flag_count + 1 WHERE id = %s AND accepted = TRUE RETURNING * """, (idea_id,), ) row = cur.fetchone() return dict(row) if row else None def unflag_idea(idea_id: int) -> dict | None: with db_cursor() as cur: cur.execute( "UPDATE ideas SET flagged = FALSE WHERE id = %s RETURNING *", (idea_id,), ) row = cur.fetchone() return dict(row) if row else None def get_stats() -> dict: with db_cursor() as cur: cur.execute("SELECT COUNT(*) as total FROM ideas") total = cur.fetchone()["total"] cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE") accepted = cur.fetchone()["accepted"] cur.execute("SELECT COUNT(*) as rejected FROM ideas WHERE accepted = FALSE") rejected = cur.fetchone()["rejected"] cur.execute("SELECT COUNT(*) as flagged FROM ideas WHERE flagged = TRUE") flagged = cur.fetchone()["flagged"] return {"total": total, "accepted": accepted, "rejected": rejected, "flagged": flagged} # ─── Synthèse ───────────────────────────────────────────────────────────────── def upsert_synthesis(text: str, idea_count: int, consultation_id: int | None = None) -> dict: """Crée ou met à jour la synthèse d'un contexte (NULL = global, int = consultation).""" with db_cursor() as cur: if consultation_id is None: cur.execute("SELECT id FROM synthesis WHERE consultation_id IS NULL LIMIT 1") else: cur.execute("SELECT id FROM synthesis WHERE consultation_id = %s LIMIT 1", (consultation_id,)) row = cur.fetchone() if row: cur.execute( """ UPDATE synthesis SET text = %s, idea_count = %s, updated_at = NOW() WHERE id = %s RETURNING * """, (text, idea_count, row["id"]), ) else: cur.execute( """ INSERT INTO synthesis (text, idea_count, consultation_id) VALUES (%s, %s, %s) RETURNING * """, (text, idea_count, consultation_id), ) return dict(cur.fetchone()) def get_synthesis(consultation_id: int | None = None) -> dict | None: """Retourne la synthèse d'un contexte (NULL = globale).""" with db_cursor() as cur: if consultation_id is None: cur.execute("SELECT * FROM synthesis WHERE consultation_id IS NULL LIMIT 1") else: cur.execute("SELECT * FROM synthesis WHERE consultation_id = %s LIMIT 1", (consultation_id,)) row = cur.fetchone() return dict(row) if row else None # ─── RGPD — Consentements ───────────────────────────────────────────────────── def create_consent(fingerprint_hash: str, consent_version: str) -> dict: """Enregistre un consentement explicite (art. 7.1 RGPD — preuve).""" with db_cursor() as cur: cur.execute( """ INSERT INTO consents (fingerprint_hash, consent_version) VALUES (%s, %s) RETURNING * """, (fingerprint_hash, consent_version), ) return dict(cur.fetchone()) # ─── Contributions publiques (contexte global) ──────────────────────────────── def get_public_contributions(page: int = 1, per_page: int = 20) -> tuple[list[dict], int]: """Contributions globales (consultation_id IS NULL) acceptées, paginées.""" offset = (page - 1) * per_page with db_cursor() as cur: cur.execute( "SELECT COUNT(*) as total FROM ideas WHERE accepted = TRUE AND flagged = FALSE AND consultation_id IS NULL" ) total = cur.fetchone()["total"] cur.execute( """ SELECT id, content, author, created_at FROM ideas WHERE accepted = TRUE AND flagged = FALSE AND consultation_id IS NULL ORDER BY created_at DESC LIMIT %s OFFSET %s """, (per_page, offset), ) rows = [dict(row) for row in cur.fetchall()] return rows, total def get_public_stats() -> dict: """Statistiques publiques globales (hors consultations).""" with db_cursor() as cur: cur.execute("SELECT COUNT(*) as total FROM ideas WHERE consultation_id IS NULL") total = cur.fetchone()["total"] cur.execute( "SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE AND consultation_id IS NULL" ) accepted = cur.fetchone()["accepted"] cur.execute("SELECT updated_at FROM synthesis WHERE consultation_id IS NULL LIMIT 1") row = cur.fetchone() last_updated = row["updated_at"].isoformat() if row and row.get("updated_at") else None return {"total": total, "accepted": accepted, "lastUpdated": last_updated} # ─── Consultations (P5) ─────────────────────────────────────────────────────── def create_consultation( slug: str, title: str, subject: str, intro_message: str | None, organizer_name: str | None, organizer_logo_url: str | None, starts_at, ends_at, webhook_url: str | None, ) -> dict: with db_cursor() as cur: cur.execute( """ INSERT INTO consultations (slug, title, subject, intro_message, organizer_name, organizer_logo_url, starts_at, ends_at, webhook_url) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING * """, (slug, title, subject, intro_message, organizer_name, organizer_logo_url, starts_at, ends_at, webhook_url), ) return dict(cur.fetchone()) def get_consultation_by_slug(slug: str) -> dict | None: with db_cursor() as cur: cur.execute("SELECT * FROM consultations WHERE slug = %s", (slug,)) row = cur.fetchone() return dict(row) if row else None def list_consultations(include_closed: bool = False) -> list[dict]: with db_cursor() as cur: if include_closed: cur.execute("SELECT * FROM consultations ORDER BY created_at DESC") else: cur.execute( "SELECT * FROM consultations WHERE closed_at IS NULL ORDER BY created_at DESC" ) return [dict(row) for row in cur.fetchall()] def close_consultation(consultation_id: int) -> dict | None: with db_cursor() as cur: cur.execute( "UPDATE consultations SET closed_at = NOW() WHERE id = %s AND closed_at IS NULL RETURNING *", (consultation_id,), ) row = cur.fetchone() return dict(row) if row else None def get_consultations_to_autoclose() -> list[dict]: """Consultations dont la date de fin est passée mais non encore fermées.""" with db_cursor() as cur: cur.execute( """ SELECT * FROM consultations WHERE ends_at IS NOT NULL AND ends_at < NOW() AND closed_at IS NULL """ ) return [dict(row) for row in cur.fetchall()] def get_consultation_stats(consultation_id: int) -> dict: with db_cursor() as cur: cur.execute( "SELECT COUNT(*) as total FROM ideas WHERE consultation_id = %s", (consultation_id,), ) total = cur.fetchone()["total"] cur.execute( "SELECT COUNT(*) as accepted FROM ideas WHERE consultation_id = %s AND accepted = TRUE", (consultation_id,), ) accepted = cur.fetchone()["accepted"] cur.execute( "SELECT updated_at FROM synthesis WHERE consultation_id = %s LIMIT 1", (consultation_id,), ) row = cur.fetchone() last_updated = row["updated_at"].isoformat() if row and row.get("updated_at") else None return {"total": total, "accepted": accepted, "lastUpdated": last_updated} def get_consultation_contributions( consultation_id: int, page: int = 1, per_page: int = 20 ) -> tuple[list[dict], int]: offset = (page - 1) * per_page with db_cursor() as cur: cur.execute( "SELECT COUNT(*) as total FROM ideas WHERE consultation_id = %s AND accepted = TRUE AND flagged = FALSE", (consultation_id,), ) total = cur.fetchone()["total"] cur.execute( """ SELECT id, content, author, created_at FROM ideas WHERE consultation_id = %s AND accepted = TRUE AND flagged = FALSE ORDER BY created_at DESC LIMIT %s OFFSET %s """, (consultation_id, per_page, offset), ) rows = [dict(row) for row in cur.fetchall()] return rows, total def delete_consultation(consultation_id: int) -> bool: """Supprime une consultation et toutes ses données (idées, synthèse) dans la même transaction.""" with db_cursor() as cur: # Supprimer explicitement les idées pour éviter qu'elles ne deviennent "globales" par SET NULL cur.execute("DELETE FROM ideas WHERE consultation_id = %s", (consultation_id,)) cur.execute("DELETE FROM synthesis WHERE consultation_id = %s", (consultation_id,)) cur.execute("DELETE FROM consultations WHERE id = %s RETURNING id", (consultation_id,)) return cur.fetchone() is not None