2a792cbbb5
Adds an admin interface with authentication for manual content deletion and flagging. Implements a flagging system for user contributions and secures the admin panel with a secret token. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 923ae0e3-a363-4db8-b04a-e8baca2a1330 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 7e5834b1-796d-4a9e-bbde-cd91012292de Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/8af7d2ec-2cc3-4ece-8af3-9f071488d072/923ae0e3-a363-4db8-b04a-e8baca2a1330/nghZcOj Replit-Helium-Checkpoint-Created: true
229 lines
7.1 KiB
Python
229 lines
7.1 KiB
Python
"""
|
|
Couche d'accès à la base de données PostgreSQL.
|
|
Utilise psycopg2 directement — pas d'ORM, code lisible et transparent.
|
|
"""
|
|
import os
|
|
import logging
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from contextlib import contextmanager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_connection():
|
|
database_url = os.environ.get("DATABASE_URL")
|
|
if not database_url:
|
|
raise RuntimeError("DATABASE_URL est requis.")
|
|
return psycopg2.connect(database_url, cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
|
|
@contextmanager
|
|
def db_cursor():
|
|
conn = get_connection()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
yield cur
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def init_db():
|
|
"""Crée les tables si elles n'existent pas, et applique les migrations nécessaires."""
|
|
with db_cursor() as cur:
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS ideas (
|
|
id SERIAL PRIMARY KEY,
|
|
content TEXT NOT NULL,
|
|
author VARCHAR(100),
|
|
accepted BOOLEAN NOT NULL DEFAULT FALSE,
|
|
rejection_reason TEXT,
|
|
legal_basis TEXT,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
""")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS legal_basis TEXT")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flagged BOOLEAN NOT NULL DEFAULT FALSE")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS flag_count INTEGER NOT NULL DEFAULT 0")
|
|
cur.execute("ALTER TABLE ideas ADD COLUMN IF NOT EXISTS admin_note TEXT")
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS synthesis (
|
|
id SERIAL PRIMARY KEY,
|
|
text TEXT NOT NULL,
|
|
idea_count INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
""")
|
|
logger.info("Base de données initialisée.")
|
|
|
|
|
|
def insert_idea(content: str, author: str | None, accepted: bool,
|
|
rejection_reason: str | None, legal_basis: str | None) -> dict:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO ideas (content, author, accepted, rejection_reason, legal_basis)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING *
|
|
""",
|
|
(content, author, accepted, rejection_reason, legal_basis),
|
|
)
|
|
return dict(cur.fetchone())
|
|
|
|
|
|
def get_accepted_ideas() -> list[dict]:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM ideas WHERE accepted = TRUE ORDER BY created_at ASC"
|
|
)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def get_all_ideas(limit: int = 50) -> list[dict]:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM ideas ORDER BY created_at DESC LIMIT %s", (limit,)
|
|
)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def get_ideas_admin(status: str = "all", page: int = 1,
|
|
per_page: int = 50, search: str = "") -> tuple[list[dict], int]:
|
|
offset = (page - 1) * per_page
|
|
conditions = []
|
|
params: list = []
|
|
|
|
if status == "accepted":
|
|
conditions.append("accepted = TRUE")
|
|
elif status == "rejected":
|
|
conditions.append("accepted = FALSE AND flagged = FALSE")
|
|
elif status == "flagged":
|
|
conditions.append("flagged = TRUE")
|
|
|
|
if search:
|
|
conditions.append("content ILIKE %s")
|
|
params.append(f"%{search}%")
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
|
|
with db_cursor() as cur:
|
|
cur.execute(f"SELECT COUNT(*) as total FROM ideas {where}", params)
|
|
total = cur.fetchone()["total"]
|
|
cur.execute(
|
|
f"SELECT * FROM ideas {where} ORDER BY flagged DESC, created_at DESC LIMIT %s OFFSET %s",
|
|
params + [per_page, offset],
|
|
)
|
|
rows = [dict(row) for row in cur.fetchall()]
|
|
return rows, total
|
|
|
|
|
|
def delete_idea(idea_id: int) -> bool:
|
|
with db_cursor() as cur:
|
|
cur.execute("DELETE FROM ideas WHERE id = %s RETURNING id", (idea_id,))
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def bulk_delete_ideas(idea_ids: list[int]) -> int:
|
|
if not idea_ids:
|
|
return 0
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"DELETE FROM ideas WHERE id = ANY(%s) RETURNING id",
|
|
(idea_ids,),
|
|
)
|
|
return len(cur.fetchall())
|
|
|
|
|
|
def override_idea(idea_id: int, accepted: bool,
|
|
reason: str | None, note: str | None) -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE ideas
|
|
SET accepted = %s,
|
|
rejection_reason = %s,
|
|
flagged = FALSE,
|
|
admin_note = %s
|
|
WHERE id = %s
|
|
RETURNING *
|
|
""",
|
|
(accepted, reason, note, idea_id),
|
|
)
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def flag_idea(idea_id: int) -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE ideas
|
|
SET flagged = TRUE, flag_count = flag_count + 1
|
|
WHERE id = %s AND accepted = TRUE
|
|
RETURNING *
|
|
""",
|
|
(idea_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def unflag_idea(idea_id: int) -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE ideas SET flagged = FALSE WHERE id = %s RETURNING *",
|
|
(idea_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def get_stats() -> dict:
|
|
with db_cursor() as cur:
|
|
cur.execute("SELECT COUNT(*) as total FROM ideas")
|
|
total = cur.fetchone()["total"]
|
|
cur.execute("SELECT COUNT(*) as accepted FROM ideas WHERE accepted = TRUE")
|
|
accepted = cur.fetchone()["accepted"]
|
|
cur.execute("SELECT COUNT(*) as rejected FROM ideas WHERE accepted = FALSE")
|
|
rejected = cur.fetchone()["rejected"]
|
|
cur.execute("SELECT COUNT(*) as flagged FROM ideas WHERE flagged = TRUE")
|
|
flagged = cur.fetchone()["flagged"]
|
|
return {"total": total, "accepted": accepted, "rejected": rejected, "flagged": flagged}
|
|
|
|
|
|
def upsert_synthesis(text: str, idea_count: int) -> dict:
|
|
with db_cursor() as cur:
|
|
cur.execute("SELECT id FROM synthesis LIMIT 1")
|
|
row = cur.fetchone()
|
|
if row:
|
|
cur.execute(
|
|
"""
|
|
UPDATE synthesis
|
|
SET text = %s, idea_count = %s, updated_at = NOW()
|
|
WHERE id = %s
|
|
RETURNING *
|
|
""",
|
|
(text, idea_count, row["id"]),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO synthesis (text, idea_count)
|
|
VALUES (%s, %s)
|
|
RETURNING *
|
|
""",
|
|
(text, idea_count),
|
|
)
|
|
return dict(cur.fetchone())
|
|
|
|
|
|
def get_synthesis() -> dict | None:
|
|
with db_cursor() as cur:
|
|
cur.execute("SELECT * FROM synthesis LIMIT 1")
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|