a7b7684e87
P3 — RGPD :
- Table `consents` + `POST /api/consent` (art. 7.1 — preuve du consentement)
- Dialogue de consentement explicite avant la première contribution (art. 9.2.a)
- Pages `/mentions-legales` et `/politique-confidentialite`
- `docs/RGPD.md` — registre des traitements, bases légales, sous-traitants
- `getVisitorId()` exporté depuis l'API client React
P4 — Transparence éditoriale :
- Page `/contributions-brutes` avec pagination et export JSON/CSV
- `GET /api/contributions`, `GET /api/contributions/export/{json,csv}`
- `GET /api/stats/public` — stats publiques sans données de rejet
- Label de transparence IA sur la colonne de synthèse
- Compteurs (acceptées / soumises) dans le bandeau d'intro
- `docs/PROMPTS_IA.md` — prompts intégraux publiés + analyse des biais
- Pied de page avec liens légaux et transparence
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
308 lines
9.8 KiB
Python
308 lines
9.8 KiB
Python
"""
|
|
La Voix du Peuple — Couche d'accès à la base de données PostgreSQL
|
|
Copyright (C) 2026 billisdead — Licence EUPL-1.2
|
|
|
|
Utilise psycopg2 directement — pas d'ORM, code lisible et transparent.
|
|
"""
|
|
import os
|
|
import logging
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from contextlib import contextmanager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_connection():
|
|
database_url = os.environ.get("DATABASE_URL")
|
|
if not database_url:
|
|
raise RuntimeError("DATABASE_URL est requis.")
|
|
return psycopg2.connect(database_url, cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
|
|
@contextmanager
|
|
def db_cursor():
|
|
conn = get_connection()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
yield cur
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def init_db() -> None:
|
|
"""Crée les tables si elles n'existent pas, et applique les migrations nécessaires."""
|
|
with db_cursor() as cur:
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS ideas (
|
|
id SERIAL PRIMARY KEY,
|
|
content TEXT NOT NULL,
|
|
author VARCHAR(100),
|
|
accepted BOOLEAN NOT NULL DEFAULT FALSE,
|
|
rejection_reason TEXT,
|
|
legal_basis TEXT,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
""")
|
|
# Migrations incrémentales — idempotentes
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS legal_basis TEXT")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flagged BOOLEAN NOT NULL DEFAULT FALSE")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flag_count INTEGER NOT NULL DEFAULT 0")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS admin_note TEXT")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS fingerprint_hash VARCHAR(64)")
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS synthesis (
|
|
id SERIAL PRIMARY KEY,
|
|
text TEXT NOT NULL,
|
|
idea_count INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
""")
|
|
# Table de traçabilité des consentements RGPD (art. 7.1 — charge de la preuve)
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS consents (
|
|
id SERIAL PRIMARY KEY,
|
|
fingerprint_hash VARCHAR(64) NOT NULL,
|
|
consent_version VARCHAR(20) NOT NULL,
|
|
consented_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
""")
|
|
cur.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_consents_fingerprint ON consents(fingerprint_hash)"
|
|
)
|
|
logger.info("Base de données initialisée.")
|
|
|
|
|
|
def insert_idea(
|
|
content: str,
|
|
author: str | None,
|
|
accepted: bool,
|
|
rejection_reason: str | None,
|
|
legal_basis: str | None,
|
|
fingerprint_hash: str | None = None,
|
|
) -> dict:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO ideas (content, author, accepted, rejection_reason, legal_basis, fingerprint_hash)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING *
|
|
""",
|
|
(content, author, accepted, rejection_reason, legal_basis, fingerprint_hash),
|
|
)
|
|
return dict(cur.fetchone())
|
|
|
|
|
|
def create_consent(fingerprint_hash: str, consent_version: str) -> dict:
|
|
"""Enregistre un consentement explicite (art. 7.1 RGPD — preuve)."""
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO consents (fingerprint_hash, consent_version)
|
|
VALUES (%s, %s)
|
|
RETURNING *
|
|
""",
|
|
(fingerprint_hash, consent_version),
|
|
)
|
|
return dict(cur.fetchone())
|
|
|
|
|
|
def get_public_contributions(page: int = 1, per_page: int = 20) -> tuple[list[dict], int]:
|
|
"""Retourne les contributions acceptées paginées pour la vue publique."""
|
|
offset = (page - 1) * per_page
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"SELECT COUNT(*) as total FROM ideas WHERE accepted = TRUE AND flagged = FALSE"
|
|
)
|
|
total = cur.fetchone()["total"]
|
|
cur.execute(
|
|
"""
|
|
SELECT id, content, author, created_at
|
|
FROM ideas
|
|
WHERE accepted = TRUE AND flagged = FALSE
|
|
ORDER BY created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
""",
|
|
(per_page, offset),
|
|
)
|
|
rows = [dict(row) for row in cur.fetchall()]
|
|
return rows, total
|
|
|
|
|
|
def get_public_stats() -> dict:
|
|
"""Statistiques publiques — ne révèle pas les chiffres de rejet."""
|
|
with db_cursor() as cur:
|
|
cur.execute("SELECT COUNT(*) as total FROM ideas")
|
|
total = cur.fetchone()["total"]
|
|
cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE")
|
|
accepted = cur.fetchone()["accepted"]
|
|
cur.execute("SELECT updated_at FROM synthesis LIMIT 1")
|
|
row = cur.fetchone()
|
|
last_updated = row["updated_at"].isoformat() if row and row.get("updated_at") else None
|
|
return {"total": total, "accepted": accepted, "lastUpdated": last_updated}
|
|
|
|
|
|
def get_accepted_ideas() -> list[dict]:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM ideas WHERE accepted = TRUE ORDER BY created_at ASC"
|
|
)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def get_all_ideas(limit: int = 50) -> list[dict]:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM ideas ORDER BY created_at DESC LIMIT %s", (limit,)
|
|
)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def get_ideas_admin(
|
|
status: str = "all",
|
|
page: int = 1,
|
|
per_page: int = 50,
|
|
search: str = "",
|
|
) -> tuple[list[dict], int]:
|
|
offset = (page - 1) * per_page
|
|
conditions = []
|
|
params: list = []
|
|
|
|
if status == "accepted":
|
|
conditions.append("accepted = TRUE")
|
|
elif status == "rejected":
|
|
conditions.append("accepted = FALSE AND flagged = FALSE")
|
|
elif status == "flagged":
|
|
conditions.append("flagged = TRUE")
|
|
|
|
if search:
|
|
conditions.append("content ILIKE %s")
|
|
params.append(f"%{search}%")
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
|
|
with db_cursor() as cur:
|
|
cur.execute(f"SELECT COUNT(*) as total FROM ideas {where}", params)
|
|
total = cur.fetchone()["total"]
|
|
cur.execute(
|
|
f"SELECT * FROM ideas {where} ORDER BY flagged DESC, created_at DESC LIMIT %s OFFSET %s",
|
|
params + [per_page, offset],
|
|
)
|
|
rows = [dict(row) for row in cur.fetchall()]
|
|
return rows, total
|
|
|
|
|
|
def delete_idea(idea_id: int) -> bool:
|
|
with db_cursor() as cur:
|
|
cur.execute("DELETE FROM ideas WHERE id = %s RETURNING id", (idea_id,))
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def bulk_delete_ideas(idea_ids: list[int]) -> int:
|
|
if not idea_ids:
|
|
return 0
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"DELETE FROM ideas WHERE id = ANY(%s) RETURNING id",
|
|
(idea_ids,),
|
|
)
|
|
return len(cur.fetchall())
|
|
|
|
|
|
def override_idea(
|
|
idea_id: int,
|
|
accepted: bool,
|
|
reason: str | None,
|
|
note: str | None,
|
|
) -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE ideas
|
|
SET accepted = %s,
|
|
rejection_reason = %s,
|
|
flagged = FALSE,
|
|
admin_note = %s
|
|
WHERE id = %s
|
|
RETURNING *
|
|
""",
|
|
(accepted, reason, note, idea_id),
|
|
)
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def flag_idea(idea_id: int) -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE ideas
|
|
SET flagged = TRUE, flag_count = flag_count + 1
|
|
WHERE id = %s AND accepted = TRUE
|
|
RETURNING *
|
|
""",
|
|
(idea_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def unflag_idea(idea_id: int) -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE ideas SET flagged = FALSE WHERE id = %s RETURNING *",
|
|
(idea_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def get_stats() -> dict:
|
|
with db_cursor() as cur:
|
|
cur.execute("SELECT COUNT(*) as total FROM ideas")
|
|
total = cur.fetchone()["total"]
|
|
cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE")
|
|
accepted = cur.fetchone()["accepted"]
|
|
cur.execute("SELECT COUNT(*) as rejected FROM ideas WHERE accepted = FALSE")
|
|
rejected = cur.fetchone()["rejected"]
|
|
cur.execute("SELECT COUNT(*) as flagged FROM ideas WHERE flagged = TRUE")
|
|
flagged = cur.fetchone()["flagged"]
|
|
return {"total": total, "accepted": accepted, "rejected": rejected, "flagged": flagged}
|
|
|
|
|
|
def upsert_synthesis(text: str, idea_count: int) -> dict:
|
|
with db_cursor() as cur:
|
|
cur.execute("SELECT id FROM synthesis LIMIT 1")
|
|
row = cur.fetchone()
|
|
if row:
|
|
cur.execute(
|
|
"""
|
|
UPDATE synthesis
|
|
SET text = %s, idea_count = %s, updated_at = NOW()
|
|
WHERE id = %s
|
|
RETURNING *
|
|
""",
|
|
(text, idea_count, row["id"]),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO synthesis (text, idea_count)
|
|
VALUES (%s, %s)
|
|
RETURNING *
|
|
""",
|
|
(text, idea_count),
|
|
)
|
|
return dict(cur.fetchone())
|
|
|
|
|
|
def get_synthesis() -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute("SELECT * FROM synthesis LIMIT 1")
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|