""" La Voix du Peuple — Couche d'accès à la base de données PostgreSQL Copyright (C) 2026 billisdead — Licence EUPL-1.2 Utilise psycopg2 directement — pas d'ORM, code lisible et transparent. """ import os import logging import psycopg2 import psycopg2.extras from contextlib import contextmanager logger = logging.getLogger(__name__) def get_connection(): database_url = os.environ.get("DATABASE_URL") if not database_url: raise RuntimeError("DATABASE_URL est requis.") return psycopg2.connect(database_url, cursor_factory=psycopg2.extras.RealDictCursor) @contextmanager def db_cursor(): conn = get_connection() try: with conn.cursor() as cur: yield cur conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db() -> None: """Crée les tables si elles n'existent pas, et applique les migrations nécessaires.""" with db_cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS ideas ( id SERIAL PRIMARY KEY, content TEXT NOT NULL, author VARCHAR(100), accepted BOOLEAN NOT NULL DEFAULT FALSE, rejection_reason TEXT, legal_basis TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) # Migrations incrémentales — idempotentes cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS legal_basis TEXT") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flagged BOOLEAN NOT NULL DEFAULT FALSE") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flag_count INTEGER NOT NULL DEFAULT 0") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS admin_note TEXT") cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS fingerprint_hash VARCHAR(64)") cur.execute(""" CREATE TABLE IF NOT EXISTS synthesis ( id SERIAL PRIMARY KEY, text TEXT NOT NULL, idea_count INTEGER NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) # Table de traçabilité des consentements RGPD (art. 7.1 — charge de la preuve) cur.execute(""" CREATE TABLE IF NOT EXISTS consents ( id SERIAL PRIMARY KEY, fingerprint_hash VARCHAR(64) NOT NULL, consent_version VARCHAR(20) NOT NULL, consented_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) cur.execute( "CREATE INDEX IF NOT EXISTS idx_consents_fingerprint ON consents(fingerprint_hash)" ) logger.info("Base de données initialisée.") def insert_idea( content: str, author: str | None, accepted: bool, rejection_reason: str | None, legal_basis: str | None, fingerprint_hash: str | None = None, ) -> dict: with db_cursor() as cur: cur.execute( """ INSERT INTO ideas (content, author, accepted, rejection_reason, legal_basis, fingerprint_hash) VALUES (%s, %s, %s, %s, %s, %s) RETURNING * """, (content, author, accepted, rejection_reason, legal_basis, fingerprint_hash), ) return dict(cur.fetchone()) def create_consent(fingerprint_hash: str, consent_version: str) -> dict: """Enregistre un consentement explicite (art. 7.1 RGPD — preuve).""" with db_cursor() as cur: cur.execute( """ INSERT INTO consents (fingerprint_hash, consent_version) VALUES (%s, %s) RETURNING * """, (fingerprint_hash, consent_version), ) return dict(cur.fetchone()) def get_public_contributions(page: int = 1, per_page: int = 20) -> tuple[list[dict], int]: """Retourne les contributions acceptées paginées pour la vue publique.""" offset = (page - 1) * per_page with db_cursor() as cur: cur.execute( "SELECT COUNT(*) as total FROM ideas WHERE accepted = TRUE AND flagged = FALSE" ) total = cur.fetchone()["total"] cur.execute( """ SELECT id, content, author, created_at FROM ideas WHERE accepted = TRUE AND flagged = FALSE ORDER BY created_at DESC LIMIT %s OFFSET %s """, (per_page, offset), ) rows = [dict(row) for row in cur.fetchall()] return rows, total def get_public_stats() -> dict: """Statistiques publiques — ne révèle pas les chiffres de rejet.""" with db_cursor() as cur: cur.execute("SELECT COUNT(*) as total FROM ideas") total = cur.fetchone()["total"] cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE") accepted = cur.fetchone()["accepted"] cur.execute("SELECT updated_at FROM synthesis LIMIT 1") row = cur.fetchone() last_updated = row["updated_at"].isoformat() if row and row.get("updated_at") else None return {"total": total, "accepted": accepted, "lastUpdated": last_updated} def get_accepted_ideas() -> list[dict]: with db_cursor() as cur: cur.execute( "SELECT * FROM ideas WHERE accepted = TRUE ORDER BY created_at ASC" ) return [dict(row) for row in cur.fetchall()] def get_all_ideas(limit: int = 50) -> list[dict]: with db_cursor() as cur: cur.execute( "SELECT * FROM ideas ORDER BY created_at DESC LIMIT %s", (limit,) ) return [dict(row) for row in cur.fetchall()] def get_ideas_admin( status: str = "all", page: int = 1, per_page: int = 50, search: str = "", ) -> tuple[list[dict], int]: offset = (page - 1) * per_page conditions = [] params: list = [] if status == "accepted": conditions.append("accepted = TRUE") elif status == "rejected": conditions.append("accepted = FALSE AND flagged = FALSE") elif status == "flagged": conditions.append("flagged = TRUE") if search: conditions.append("content ILIKE %s") params.append(f"%{search}%") where = ("WHERE " + " AND ".join(conditions)) if conditions else "" with db_cursor() as cur: cur.execute(f"SELECT COUNT(*) as total FROM ideas {where}", params) total = cur.fetchone()["total"] cur.execute( f"SELECT * FROM ideas {where} ORDER BY flagged DESC, created_at DESC LIMIT %s OFFSET %s", params + [per_page, offset], ) rows = [dict(row) for row in cur.fetchall()] return rows, total def delete_idea(idea_id: int) -> bool: with db_cursor() as cur: cur.execute("DELETE FROM ideas WHERE id = %s RETURNING id", (idea_id,)) return cur.fetchone() is not None def bulk_delete_ideas(idea_ids: list[int]) -> int: if not idea_ids: return 0 with db_cursor() as cur: cur.execute( "DELETE FROM ideas WHERE id = ANY(%s) RETURNING id", (idea_ids,), ) return len(cur.fetchall()) def override_idea( idea_id: int, accepted: bool, reason: str | None, note: str | None, ) -> dict | None: with db_cursor() as cur: cur.execute( """ UPDATE ideas SET accepted = %s, rejection_reason = %s, flagged = FALSE, admin_note = %s WHERE id = %s RETURNING * """, (accepted, reason, note, idea_id), ) row = cur.fetchone() return dict(row) if row else None def flag_idea(idea_id: int) -> dict | None: with db_cursor() as cur: cur.execute( """ UPDATE ideas SET flagged = TRUE, flag_count = flag_count + 1 WHERE id = %s AND accepted = TRUE RETURNING * """, (idea_id,), ) row = cur.fetchone() return dict(row) if row else None def unflag_idea(idea_id: int) -> dict | None: with db_cursor() as cur: cur.execute( "UPDATE ideas SET flagged = FALSE WHERE id = %s RETURNING *", (idea_id,), ) row = cur.fetchone() return dict(row) if row else None def get_stats() -> dict: with db_cursor() as cur: cur.execute("SELECT COUNT(*) as total FROM ideas") total = cur.fetchone()["total"] cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE") accepted = cur.fetchone()["accepted"] cur.execute("SELECT COUNT(*) as rejected FROM ideas WHERE accepted = FALSE") rejected = cur.fetchone()["rejected"] cur.execute("SELECT COUNT(*) as flagged FROM ideas WHERE flagged = TRUE") flagged = cur.fetchone()["flagged"] return {"total": total, "accepted": accepted, "rejected": rejected, "flagged": flagged} def upsert_synthesis(text: str, idea_count: int) -> dict: with db_cursor() as cur: cur.execute("SELECT id FROM synthesis LIMIT 1") row = cur.fetchone() if row: cur.execute( """ UPDATE synthesis SET text = %s, idea_count = %s, updated_at = NOW() WHERE id = %s RETURNING * """, (text, idea_count, row["id"]), ) else: cur.execute( """ INSERT INTO synthesis (text, idea_count) VALUES (%s, %s) RETURNING * """, (text, idea_count), ) return dict(cur.fetchone()) def get_synthesis() -> dict | None: with db_cursor() as cur: cur.execute("SELECT * FROM synthesis LIMIT 1") row = cur.fetchone() return dict(row) if row else None