import sqlite3 import os import json import secrets from contextlib import asynccontextmanager from typing import Optional from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel DB_PATH = os.environ.get("DB_PATH", "/srv/form-backend/submissions.db") # ---- DB Helpers ---- def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_schema(db): db.executescript(""" CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL DEFAULT '' ); INSERT OR IGNORE INTO settings (key, value) VALUES ('admin_password', 'admin123'); INSERT OR IGNORE INTO settings (key, value) VALUES ('deepseek_api_key', ''); INSERT OR IGNORE INTO settings (key, value) VALUES ('deepseek_model', 'deepseek-chat'); CREATE TABLE IF NOT EXISTS forms ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, slug TEXT NOT NULL UNIQUE, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); INSERT INTO forms (id, name, slug) SELECT 1, 'Customer Form', 'default' WHERE NOT EXISTS (SELECT 1 FROM forms WHERE id = 1); CREATE TABLE IF NOT EXISTS form_fields ( id INTEGER PRIMARY KEY AUTOINCREMENT, form_id INTEGER NOT NULL REFERENCES forms(id) ON DELETE CASCADE, label TEXT NOT NULL, field_type TEXT NOT NULL DEFAULT 'text', required INTEGER NOT NULL DEFAULT 1, sort_order INTEGER NOT NULL DEFAULT 0 ); INSERT INTO form_fields (form_id, label, field_type, required, sort_order) SELECT 1, 'Email', 'email', 1, 1 WHERE NOT EXISTS (SELECT 1 FROM form_fields WHERE form_id = 1 AND sort_order = 1); INSERT INTO form_fields (form_id, label, field_type, required, sort_order) SELECT 1, 'Name', 'text', 1, 2 WHERE NOT EXISTS (SELECT 1 FROM form_fields WHERE form_id = 1 AND sort_order = 2); INSERT INTO form_fields (form_id, label, field_type, required, sort_order) SELECT 1, 'Domain', 'text', 1, 3 WHERE NOT EXISTS (SELECT 1 FROM form_fields WHERE form_id = 1 AND sort_order = 3); CREATE TABLE IF NOT EXISTS submissions ( id INTEGER PRIMARY KEY AUTOINCREMENT, form_id INTEGER NOT NULL DEFAULT 1 REFERENCES forms(id), created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS submission_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, submission_id INTEGER NOT NULL REFERENCES submissions(id) ON DELETE CASCADE, field_label TEXT NOT NULL, field_value TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS submission_tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, submission_id INTEGER NOT NULL REFERENCES submissions(id) ON DELETE CASCADE, tag TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS reply_drafts ( id INTEGER PRIMARY KEY AUTOINCREMENT, submission_id INTEGER NOT NULL UNIQUE REFERENCES submissions(id) ON DELETE CASCADE, draft_text TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS admin_tokens ( token TEXT PRIMARY KEY, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); """) db.commit() def get_setting(db, key, default=""): row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone() return row["value"] if row else default def set_setting(db, key, value): db.execute( "INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value) ) def check_auth(request: Request): token = request.headers.get("Authorization", "").replace("Bearer ", "") if not token: raise HTTPException(401, "Missing token") db = get_db() row = db.execute("SELECT 1 FROM admin_tokens WHERE token = ?", (token,)).fetchone() db.close() if not row: raise HTTPException(401, "Invalid token") @asynccontextmanager async def lifespan(app: FastAPI): db = get_db() init_schema(db) db.close() yield app = FastAPI(title="Form Backend", lifespan=lifespan) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # ---- Models ---- class FieldDef(BaseModel): id: Optional[int] = None label: str field_type: str = "text" required: bool = True class FormDef(BaseModel): name: str fields: list[FieldDef] = [] class LoginBody(BaseModel): password: str class AnalyzeBody(BaseModel): submission_ids: list[int] = [] all: bool = False # ---- Auth ---- @app.post("/admin/login") def admin_login(body: LoginBody): db = get_db() pw = get_setting(db, "admin_password", "admin123") if body.password != pw: db.close() raise HTTPException(401, "Wrong password") token = secrets.token_urlsafe(32) db.execute("INSERT INTO admin_tokens (token) VALUES (?)", (token,)) db.commit() db.close() return {"token": token} @app.post("/admin/logout") def admin_logout(request: Request): check_auth(request) token = request.headers.get("Authorization", "").replace("Bearer ", "") db = get_db() db.execute("DELETE FROM admin_tokens WHERE token = ?", (token,)) db.commit() db.close() return {"ok": True} # ---- Settings ---- @app.get("/admin/settings") def get_settings(request: Request): check_auth(request) db = get_db() keys = ["deepseek_api_key", "deepseek_model", "admin_password"] result = {} for k in keys: result[k] = get_setting(db, k) ak = result.get("deepseek_api_key", "") if ak and len(ak) > 8: result["deepseek_api_key_masked"] = ak[:4] + "***" + ak[-4:] elif ak: result["deepseek_api_key_masked"] = "***" else: result["deepseek_api_key_masked"] = "" db.close() return result @app.post("/admin/settings") def update_settings(body: dict, request: Request): check_auth(request) db = get_db() allowed = {"deepseek_api_key", "deepseek_model", "admin_password"} for k, v in body.items(): if k in allowed: set_setting(db, k, str(v)) db.commit() db.close() return {"ok": True} # ---- Stats ---- @app.get("/admin/stats") def admin_stats(request: Request): check_auth(request) db = get_db() total = db.execute("SELECT COUNT(*) as c FROM submissions").fetchone()["c"] today = db.execute("SELECT COUNT(*) as c FROM submissions WHERE date(created_at) = date('now')").fetchone()["c"] forms_count = db.execute("SELECT COUNT(*) as c FROM forms").fetchone()["c"] tagged = db.execute("SELECT COUNT(DISTINCT submission_id) as c FROM submission_tags").fetchone()["c"] db.close() return {"total_submissions": total, "today": today, "forms": forms_count, "tagged": tagged} # ---- Forms CRUD ---- @app.get("/forms") def list_forms(): db = get_db() rows = db.execute("SELECT * FROM forms ORDER BY id").fetchall() result = [] for f in rows: fields = db.execute( "SELECT * FROM form_fields WHERE form_id = ? ORDER BY sort_order", (f["id"],) ).fetchall() result.append({ "id": f["id"], "name": f["name"], "slug": f["slug"], "created_at": f["created_at"], "fields": [dict(ff) for ff in fields] }) db.close() return result @app.get("/forms/{slug}") def get_form(slug: str): db = get_db() form = db.execute("SELECT * FROM forms WHERE slug = ?", (slug,)).fetchone() if not form: db.close() raise HTTPException(404, "Form not found") fields = db.execute( "SELECT * FROM form_fields WHERE form_id = ? ORDER BY sort_order", (form["id"],) ).fetchall() db.close() return { "id": form["id"], "name": form["name"], "slug": form["slug"], "fields": [dict(f) for f in fields] } @app.post("/forms", status_code=201) def create_form(body: FormDef, request: Request): check_auth(request) slug = body.name.lower().replace(" ", "-") db = get_db() cur = db.execute("INSERT INTO forms (name, slug) VALUES (?, ?)", (body.name, slug)) form_id = cur.lastrowid for i, f in enumerate(body.fields): db.execute( "INSERT INTO form_fields (form_id, label, field_type, required, sort_order) VALUES (?, ?, ?, ?, ?)", (form_id, f.label, f.field_type, 1 if f.required else 0, i + 1) ) db.commit() db.close() return {"ok": True, "id": form_id, "slug": slug} @app.put("/forms/{form_id}") def update_form(form_id: int, body: FormDef, request: Request): check_auth(request) db = get_db() db.execute("UPDATE forms SET name = ? WHERE id = ?", (body.name, form_id)) db.execute("DELETE FROM form_fields WHERE form_id = ?", (form_id,)) for i, f in enumerate(body.fields): db.execute( "INSERT INTO form_fields (form_id, label, field_type, required, sort_order) VALUES (?, ?, ?, ?, ?)", (form_id, f.label, f.field_type, 1 if f.required else 0, i + 1) ) db.commit() db.close() return {"ok": True} @app.delete("/forms/{form_id}") def delete_form(form_id: int, request: Request): check_auth(request) if form_id == 1: raise HTTPException(400, "Cannot delete default form") db = get_db() db.execute("DELETE FROM forms WHERE id = ?", (form_id,)) db.commit() db.close() return {"ok": True} # ---- Public submit ---- @app.post("/submit/{slug}", status_code=201) async def submit_form(slug: str, body: dict, request: Request): db = get_db() form = db.execute("SELECT id FROM forms WHERE slug = ?", (slug,)).fetchone() if not form: db.close() raise HTTPException(404, "Form not found") form_id = form["id"] cur = db.execute("INSERT INTO submissions (form_id) VALUES (?)", (form_id,)) sub_id = cur.lastrowid for key, val in body.items(): db.execute( "INSERT INTO submission_data (submission_id, field_label, field_value) VALUES (?, ?, ?)", (sub_id, key, str(val)) ) db.commit() db.close() return {"ok": True, "id": sub_id} @app.post("/api/submit", status_code=201) async def submit_legacy(body: dict, request: Request): return await submit_form("default", body, request) # ---- Admin Submissions ---- @app.get("/admin/submissions") def admin_submissions( request: Request, form_id: Optional[int] = None, page: int = 1, per_page: int = 50 ): check_auth(request) db = get_db() offset = (page - 1) * per_page query = "SELECT s.*, f.name as form_name FROM submissions s JOIN forms f ON s.form_id = f.id" count_query = "SELECT COUNT(*) as c FROM submissions" params = [] if form_id: query += " WHERE s.form_id = ?" count_query += " WHERE form_id = ?" params.append(form_id) query += " ORDER BY s.id DESC LIMIT ? OFFSET ?" total = db.execute(count_query, params).fetchone()["c"] rows = db.execute(query, params + [per_page, offset]).fetchall() result = [] for r in rows: data = db.execute( "SELECT * FROM submission_data WHERE submission_id = ?", (r["id"],) ).fetchall() tags = db.execute( "SELECT tag FROM submission_tags WHERE submission_id = ?", (r["id"],) ).fetchall() reply = db.execute( "SELECT draft_text, created_at FROM reply_drafts WHERE submission_id = ?", (r["id"],) ).fetchone() result.append({ "id": r["id"], "form_id": r["form_id"], "form_name": r["form_name"], "created_at": r["created_at"], "data": [dict(d) for d in data], "tags": [t["tag"] for t in tags], "reply": dict(reply) if reply else None }) db.close() return { "submissions": result, "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page } # ---- DeepSeek: Auto-tag ---- @app.post("/admin/tag") def auto_tag(body: AnalyzeBody, request: Request): check_auth(request) db = get_db() api_key = get_setting(db, "deepseek_api_key") if not api_key: db.close() raise HTTPException(400, "DeepSeek API key not configured. Add it in Settings.") ids = body.submission_ids if body.all: rows = db.execute("SELECT id FROM submissions").fetchall() ids = [r["id"] for r in rows] if not ids: db.close() return {"tagged": 0} lines = [] for sid in ids: data = db.execute( "SELECT * FROM submission_data WHERE submission_id = ?", (sid,) ).fetchall() fields = {d["field_label"]: d["field_value"] for d in data} email = fields.get("Email", fields.get("email", "unknown")) name = fields.get("Name", fields.get("name", "unknown")) domain = fields.get("Domain", fields.get("domain", "")) lines.append(f"ID={sid} | {name} | {email} | domain={domain}") joiner = "\n" prompt = f"""Analyze these customer form submissions and assign 1-3 short tags to each (like enterprise, startup, personal, creative, tech, finance, education, spam, etc). Return ONLY a JSON object mapping submission IDs to arrays of tags. Submissions: {joiner.join(lines[:50])} Output format: {{"1": ["tag1", "tag2"], "2": ["tag3"]}}""" try: import httpx model = get_setting(db, "deepseek_model", "deepseek-chat") resp = httpx.post( "https://api.deepseek.com/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "max_tokens": 1000 }, timeout=30 ) resp.raise_for_status() content = resp.json()["choices"][0]["message"]["content"].strip() # Clean markdown fences if content.startswith("```"): content = "\n".join(content.split("\n")[1:]) if content.endswith("```"): content = "\n".join(content.split("\n")[:-1]) content = content.strip() tags_map = json.loads(content) tagged = 0 for sid_str, tags in tags_map.items(): sid = int(sid_str) db.execute("DELETE FROM submission_tags WHERE submission_id = ?", (sid,)) for tag in tags: db.execute( "INSERT INTO submission_tags (submission_id, tag) VALUES (?, ?)", (sid, tag.strip()) ) tagged += 1 db.commit() db.close() return {"tagged": tagged, "tags_map": tags_map} except json.JSONDecodeError: db.close() raise HTTPException(500, f"DeepSeek returned unparseable response: {content[:300]}") except Exception as e: db.close() raise HTTPException(500, f"DeepSeek API error: {str(e)}") # ---- DeepSeek: Reply draft ---- @app.post("/admin/reply/{submission_id}") def generate_reply(submission_id: int, request: Request): check_auth(request) db = get_db() api_key = get_setting(db, "deepseek_api_key") if not api_key: db.close() raise HTTPException(400, "DeepSeek API key not configured. Add it in Settings.") data = db.execute( "SELECT * FROM submission_data WHERE submission_id = ?", (submission_id,) ).fetchall() fields = {d["field_label"]: d["field_value"] for d in data} name = fields.get("Name", fields.get("name", "there")) email = fields.get("Email", fields.get("email", "")) domain = fields.get("Domain", fields.get("domain", "")) extra = {k: v for k, v in fields.items() if k.lower() not in ("name", "email", "domain")} prompt = f"""Write a short, warm professional email reply to a customer who filled out our form. Customer: {name} ({email}) Domain: {domain} {json.dumps(extra) if extra else ''} The reply should: - Thank them by name - Mention their domain name naturally - Say we will be in touch within 24 hours - Keep it under 4 sentences - Sign as "The Team" Subject: Thanks for reaching out, {name}! Body:""" try: import httpx model = get_setting(db, "deepseek_model", "deepseek-chat") resp = httpx.post( "https://api.deepseek.com/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.7, "max_tokens": 500 }, timeout=30 ) resp.raise_for_status() draft = resp.json()["choices"][0]["message"]["content"].strip() db.execute("DELETE FROM reply_drafts WHERE submission_id = ?", (submission_id,)) db.execute( "INSERT INTO reply_drafts (submission_id, draft_text) VALUES (?, ?)", (submission_id, draft) ) db.commit() db.close() return {"ok": True, "draft": draft} except Exception as e: db.close() raise HTTPException(500, f"DeepSeek API error: {str(e)}") @app.get("/health") def health(): return {"status": "ok"} # Static frontend (must be last) app.mount("/", StaticFiles(directory="/srv/form-backend/static", html=True), name="static")