""" Couche d'accès à la base de données PostgreSQL. Utilise psycopg2 directement — pas d'ORM, code lisible et transparent. """ import os import logging import psycopg2 import psycopg2.extras from contextlib import contextmanager logger = logging.getLogger(__name__) def get_connection(): database_url = os.environ.get("DATABASE_URL") if not database_url: raise RuntimeError("DATABASE_URL est requis.") return psycopg2.connect(database_url, cursor_factory=psycopg2.extras.RealDictCursor) @contextmanager def db_cursor(): conn = get_connection() try: with conn.cursor() as cur: yield cur conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db(): """Crée les tables si elles n'existent pas, et applique les migrations nécessaires.""" with db_cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS ideas ( id SERIAL PRIMARY KEY, content TEXT NOT NULL, author VARCHAR(100), accepted BOOLEAN NOT NULL DEFAULT FALSE, rejection_reason TEXT, legal_basis TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) cur.execute(""" ALTER TABLE ideas ADD COLUMN IF NOT EXISTS legal_basis TEXT """) cur.execute(""" CREATE TABLE IF NOT EXISTS synthesis ( id SERIAL PRIMARY KEY, text TEXT NOT NULL, idea_count INTEGER NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """) logger.info("Base de données initialisée.") def insert_idea(content: str, author: str | None, accepted: bool, rejection_reason: str | None, legal_basis: str | None) -> dict: with db_cursor() as cur: cur.execute( """ INSERT INTO ideas (content, author, accepted, rejection_reason, legal_basis) VALUES (%s, %s, %s, %s, %s) RETURNING * """, (content, author, accepted, rejection_reason, legal_basis), ) return dict(cur.fetchone()) def get_accepted_ideas() -> list[dict]: with db_cursor() as cur: cur.execute( "SELECT * FROM ideas WHERE accepted = TRUE ORDER BY created_at ASC" ) return [dict(row) for row in cur.fetchall()] def get_all_ideas(limit: int = 50) -> list[dict]: with db_cursor() as cur: cur.execute( "SELECT * FROM ideas ORDER BY created_at DESC LIMIT %s", (limit,) ) return [dict(row) for row in cur.fetchall()] def get_stats() -> dict: with db_cursor() as cur: cur.execute("SELECT COUNT(*) as total FROM ideas") total = cur.fetchone()["total"] cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE") accepted = cur.fetchone()["accepted"] cur.execute("SELECT COUNT(*) as rejected FROM ideas WHERE accepted = FALSE") rejected = cur.fetchone()["rejected"] return {"total": total, "accepted": accepted, "rejected": rejected} def upsert_synthesis(text: str, idea_count: int) -> dict: with db_cursor() as cur: cur.execute("SELECT id FROM synthesis LIMIT 1") row = cur.fetchone() if row: cur.execute( """ UPDATE synthesis SET text = %s, idea_count = %s, updated_at = NOW() WHERE id = %s RETURNING * """, (text, idea_count, row["id"]), ) else: cur.execute( """ INSERT INTO synthesis (text, idea_count) VALUES (%s, %s) RETURNING * """, (text, idea_count), ) return dict(cur.fetchone()) def get_synthesis() -> dict | None: with db_cursor() as cur: cur.execute("SELECT * FROM synthesis LIMIT 1") row = cur.fetchone() return dict(row) if row else None