529 lines
18 KiB
Python
529 lines
18 KiB
Python
import sqlite3
|
|
import os
|
|
import json
|
|
import secrets
|
|
from contextlib import asynccontextmanager
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, HTTPException, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
DB_PATH = os.environ.get("DB_PATH", "/srv/form-backend/submissions.db")
|
|
|
|
# ---- DB Helpers ----
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
return conn
|
|
|
|
def init_schema(db):
|
|
db.executescript("""
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL DEFAULT ''
|
|
);
|
|
|
|
INSERT OR IGNORE INTO settings (key, value) VALUES ('admin_password', 'admin123');
|
|
INSERT OR IGNORE INTO settings (key, value) VALUES ('deepseek_api_key', '');
|
|
INSERT OR IGNORE INTO settings (key, value) VALUES ('deepseek_model', 'deepseek-chat');
|
|
|
|
CREATE TABLE IF NOT EXISTS forms (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
slug TEXT NOT NULL UNIQUE,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
INSERT INTO forms (id, name, slug)
|
|
SELECT 1, 'Customer Form', 'default'
|
|
WHERE NOT EXISTS (SELECT 1 FROM forms WHERE id = 1);
|
|
|
|
CREATE TABLE IF NOT EXISTS form_fields (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
form_id INTEGER NOT NULL REFERENCES forms(id) ON DELETE CASCADE,
|
|
label TEXT NOT NULL,
|
|
field_type TEXT NOT NULL DEFAULT 'text',
|
|
required INTEGER NOT NULL DEFAULT 1,
|
|
sort_order INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
INSERT INTO form_fields (form_id, label, field_type, required, sort_order)
|
|
SELECT 1, 'Email', 'email', 1, 1
|
|
WHERE NOT EXISTS (SELECT 1 FROM form_fields WHERE form_id = 1 AND sort_order = 1);
|
|
|
|
INSERT INTO form_fields (form_id, label, field_type, required, sort_order)
|
|
SELECT 1, 'Name', 'text', 1, 2
|
|
WHERE NOT EXISTS (SELECT 1 FROM form_fields WHERE form_id = 1 AND sort_order = 2);
|
|
|
|
INSERT INTO form_fields (form_id, label, field_type, required, sort_order)
|
|
SELECT 1, 'Domain', 'text', 1, 3
|
|
WHERE NOT EXISTS (SELECT 1 FROM form_fields WHERE form_id = 1 AND sort_order = 3);
|
|
|
|
CREATE TABLE IF NOT EXISTS submissions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
form_id INTEGER NOT NULL DEFAULT 1 REFERENCES forms(id),
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS submission_data (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
submission_id INTEGER NOT NULL REFERENCES submissions(id) ON DELETE CASCADE,
|
|
field_label TEXT NOT NULL,
|
|
field_value TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS submission_tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
submission_id INTEGER NOT NULL REFERENCES submissions(id) ON DELETE CASCADE,
|
|
tag TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS reply_drafts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
submission_id INTEGER NOT NULL UNIQUE REFERENCES submissions(id) ON DELETE CASCADE,
|
|
draft_text TEXT NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS admin_tokens (
|
|
token TEXT PRIMARY KEY,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
""")
|
|
db.commit()
|
|
|
|
def get_setting(db, key, default=""):
|
|
row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
|
|
return row["value"] if row else default
|
|
|
|
def set_setting(db, key, value):
|
|
db.execute(
|
|
"INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
|
(key, value)
|
|
)
|
|
|
|
def check_auth(request: Request):
|
|
token = request.headers.get("Authorization", "").replace("Bearer ", "")
|
|
if not token:
|
|
raise HTTPException(401, "Missing token")
|
|
db = get_db()
|
|
row = db.execute("SELECT 1 FROM admin_tokens WHERE token = ?", (token,)).fetchone()
|
|
db.close()
|
|
if not row:
|
|
raise HTTPException(401, "Invalid token")
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
db = get_db()
|
|
init_schema(db)
|
|
db.close()
|
|
yield
|
|
|
|
app = FastAPI(title="Form Backend", lifespan=lifespan)
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
|
|
|
# ---- Models ----
|
|
|
|
class FieldDef(BaseModel):
|
|
id: Optional[int] = None
|
|
label: str
|
|
field_type: str = "text"
|
|
required: bool = True
|
|
|
|
class FormDef(BaseModel):
|
|
name: str
|
|
fields: list[FieldDef] = []
|
|
|
|
class LoginBody(BaseModel):
|
|
password: str
|
|
|
|
class AnalyzeBody(BaseModel):
|
|
submission_ids: list[int] = []
|
|
all: bool = False
|
|
|
|
# ---- Auth ----
|
|
|
|
@app.post("/admin/login")
|
|
def admin_login(body: LoginBody):
|
|
db = get_db()
|
|
pw = get_setting(db, "admin_password", "admin123")
|
|
if body.password != pw:
|
|
db.close()
|
|
raise HTTPException(401, "Wrong password")
|
|
token = secrets.token_urlsafe(32)
|
|
db.execute("INSERT INTO admin_tokens (token) VALUES (?)", (token,))
|
|
db.commit()
|
|
db.close()
|
|
return {"token": token}
|
|
|
|
@app.post("/admin/logout")
|
|
def admin_logout(request: Request):
|
|
check_auth(request)
|
|
token = request.headers.get("Authorization", "").replace("Bearer ", "")
|
|
db = get_db()
|
|
db.execute("DELETE FROM admin_tokens WHERE token = ?", (token,))
|
|
db.commit()
|
|
db.close()
|
|
return {"ok": True}
|
|
|
|
# ---- Settings ----
|
|
|
|
@app.get("/admin/settings")
|
|
def get_settings(request: Request):
|
|
check_auth(request)
|
|
db = get_db()
|
|
keys = ["deepseek_api_key", "deepseek_model", "admin_password"]
|
|
result = {}
|
|
for k in keys:
|
|
result[k] = get_setting(db, k)
|
|
ak = result.get("deepseek_api_key", "")
|
|
if ak and len(ak) > 8:
|
|
result["deepseek_api_key_masked"] = ak[:4] + "***" + ak[-4:]
|
|
elif ak:
|
|
result["deepseek_api_key_masked"] = "***"
|
|
else:
|
|
result["deepseek_api_key_masked"] = ""
|
|
db.close()
|
|
return result
|
|
|
|
@app.post("/admin/settings")
|
|
def update_settings(body: dict, request: Request):
|
|
check_auth(request)
|
|
db = get_db()
|
|
allowed = {"deepseek_api_key", "deepseek_model", "admin_password"}
|
|
for k, v in body.items():
|
|
if k in allowed:
|
|
set_setting(db, k, str(v))
|
|
db.commit()
|
|
db.close()
|
|
return {"ok": True}
|
|
|
|
# ---- Stats ----
|
|
|
|
@app.get("/admin/stats")
|
|
def admin_stats(request: Request):
|
|
check_auth(request)
|
|
db = get_db()
|
|
total = db.execute("SELECT COUNT(*) as c FROM submissions").fetchone()["c"]
|
|
today = db.execute("SELECT COUNT(*) as c FROM submissions WHERE date(created_at) = date('now')").fetchone()["c"]
|
|
forms_count = db.execute("SELECT COUNT(*) as c FROM forms").fetchone()["c"]
|
|
tagged = db.execute("SELECT COUNT(DISTINCT submission_id) as c FROM submission_tags").fetchone()["c"]
|
|
db.close()
|
|
return {"total_submissions": total, "today": today, "forms": forms_count, "tagged": tagged}
|
|
|
|
# ---- Forms CRUD ----
|
|
|
|
@app.get("/forms")
|
|
def list_forms():
|
|
db = get_db()
|
|
rows = db.execute("SELECT * FROM forms ORDER BY id").fetchall()
|
|
result = []
|
|
for f in rows:
|
|
fields = db.execute(
|
|
"SELECT * FROM form_fields WHERE form_id = ? ORDER BY sort_order", (f["id"],)
|
|
).fetchall()
|
|
result.append({
|
|
"id": f["id"], "name": f["name"], "slug": f["slug"],
|
|
"created_at": f["created_at"],
|
|
"fields": [dict(ff) for ff in fields]
|
|
})
|
|
db.close()
|
|
return result
|
|
|
|
@app.get("/forms/{slug}")
|
|
def get_form(slug: str):
|
|
db = get_db()
|
|
form = db.execute("SELECT * FROM forms WHERE slug = ?", (slug,)).fetchone()
|
|
if not form:
|
|
db.close()
|
|
raise HTTPException(404, "Form not found")
|
|
fields = db.execute(
|
|
"SELECT * FROM form_fields WHERE form_id = ? ORDER BY sort_order", (form["id"],)
|
|
).fetchall()
|
|
db.close()
|
|
return {
|
|
"id": form["id"], "name": form["name"], "slug": form["slug"],
|
|
"fields": [dict(f) for f in fields]
|
|
}
|
|
|
|
@app.post("/forms", status_code=201)
|
|
def create_form(body: FormDef, request: Request):
|
|
check_auth(request)
|
|
slug = body.name.lower().replace(" ", "-")
|
|
db = get_db()
|
|
cur = db.execute("INSERT INTO forms (name, slug) VALUES (?, ?)", (body.name, slug))
|
|
form_id = cur.lastrowid
|
|
for i, f in enumerate(body.fields):
|
|
db.execute(
|
|
"INSERT INTO form_fields (form_id, label, field_type, required, sort_order) VALUES (?, ?, ?, ?, ?)",
|
|
(form_id, f.label, f.field_type, 1 if f.required else 0, i + 1)
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
return {"ok": True, "id": form_id, "slug": slug}
|
|
|
|
@app.put("/forms/{form_id}")
|
|
def update_form(form_id: int, body: FormDef, request: Request):
|
|
check_auth(request)
|
|
db = get_db()
|
|
db.execute("UPDATE forms SET name = ? WHERE id = ?", (body.name, form_id))
|
|
db.execute("DELETE FROM form_fields WHERE form_id = ?", (form_id,))
|
|
for i, f in enumerate(body.fields):
|
|
db.execute(
|
|
"INSERT INTO form_fields (form_id, label, field_type, required, sort_order) VALUES (?, ?, ?, ?, ?)",
|
|
(form_id, f.label, f.field_type, 1 if f.required else 0, i + 1)
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
return {"ok": True}
|
|
|
|
@app.delete("/forms/{form_id}")
|
|
def delete_form(form_id: int, request: Request):
|
|
check_auth(request)
|
|
if form_id == 1:
|
|
raise HTTPException(400, "Cannot delete default form")
|
|
db = get_db()
|
|
db.execute("DELETE FROM forms WHERE id = ?", (form_id,))
|
|
db.commit()
|
|
db.close()
|
|
return {"ok": True}
|
|
|
|
# ---- Public submit ----
|
|
|
|
@app.post("/submit/{slug}", status_code=201)
|
|
async def submit_form(slug: str, body: dict, request: Request):
|
|
db = get_db()
|
|
form = db.execute("SELECT id FROM forms WHERE slug = ?", (slug,)).fetchone()
|
|
if not form:
|
|
db.close()
|
|
raise HTTPException(404, "Form not found")
|
|
form_id = form["id"]
|
|
cur = db.execute("INSERT INTO submissions (form_id) VALUES (?)", (form_id,))
|
|
sub_id = cur.lastrowid
|
|
for key, val in body.items():
|
|
db.execute(
|
|
"INSERT INTO submission_data (submission_id, field_label, field_value) VALUES (?, ?, ?)",
|
|
(sub_id, key, str(val))
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
return {"ok": True, "id": sub_id}
|
|
|
|
@app.post("/api/submit", status_code=201)
|
|
async def submit_legacy(body: dict, request: Request):
|
|
return await submit_form("default", body, request)
|
|
|
|
# ---- Admin Submissions ----
|
|
|
|
@app.get("/admin/submissions")
|
|
def admin_submissions(
|
|
request: Request,
|
|
form_id: Optional[int] = None,
|
|
page: int = 1,
|
|
per_page: int = 50
|
|
):
|
|
check_auth(request)
|
|
db = get_db()
|
|
offset = (page - 1) * per_page
|
|
query = "SELECT s.*, f.name as form_name FROM submissions s JOIN forms f ON s.form_id = f.id"
|
|
count_query = "SELECT COUNT(*) as c FROM submissions"
|
|
params = []
|
|
if form_id:
|
|
query += " WHERE s.form_id = ?"
|
|
count_query += " WHERE form_id = ?"
|
|
params.append(form_id)
|
|
query += " ORDER BY s.id DESC LIMIT ? OFFSET ?"
|
|
total = db.execute(count_query, params).fetchone()["c"]
|
|
rows = db.execute(query, params + [per_page, offset]).fetchall()
|
|
result = []
|
|
for r in rows:
|
|
data = db.execute(
|
|
"SELECT * FROM submission_data WHERE submission_id = ?", (r["id"],)
|
|
).fetchall()
|
|
tags = db.execute(
|
|
"SELECT tag FROM submission_tags WHERE submission_id = ?", (r["id"],)
|
|
).fetchall()
|
|
reply = db.execute(
|
|
"SELECT draft_text, created_at FROM reply_drafts WHERE submission_id = ?", (r["id"],)
|
|
).fetchone()
|
|
result.append({
|
|
"id": r["id"], "form_id": r["form_id"], "form_name": r["form_name"],
|
|
"created_at": r["created_at"],
|
|
"data": [dict(d) for d in data],
|
|
"tags": [t["tag"] for t in tags],
|
|
"reply": dict(reply) if reply else None
|
|
})
|
|
db.close()
|
|
return {
|
|
"submissions": result, "total": total, "page": page,
|
|
"per_page": per_page, "pages": (total + per_page - 1) // per_page
|
|
}
|
|
|
|
# ---- DeepSeek: Auto-tag ----
|
|
|
|
@app.post("/admin/tag")
|
|
def auto_tag(body: AnalyzeBody, request: Request):
|
|
check_auth(request)
|
|
db = get_db()
|
|
api_key = get_setting(db, "deepseek_api_key")
|
|
if not api_key:
|
|
db.close()
|
|
raise HTTPException(400, "DeepSeek API key not configured. Add it in Settings.")
|
|
|
|
ids = body.submission_ids
|
|
if body.all:
|
|
rows = db.execute("SELECT id FROM submissions").fetchall()
|
|
ids = [r["id"] for r in rows]
|
|
|
|
if not ids:
|
|
db.close()
|
|
return {"tagged": 0}
|
|
|
|
lines = []
|
|
for sid in ids:
|
|
data = db.execute(
|
|
"SELECT * FROM submission_data WHERE submission_id = ?", (sid,)
|
|
).fetchall()
|
|
fields = {d["field_label"]: d["field_value"] for d in data}
|
|
email = fields.get("Email", fields.get("email", "unknown"))
|
|
name = fields.get("Name", fields.get("name", "unknown"))
|
|
domain = fields.get("Domain", fields.get("domain", ""))
|
|
lines.append(f"ID={sid} | {name} | {email} | domain={domain}")
|
|
|
|
joiner = "\n"
|
|
prompt = f"""Analyze these customer form submissions and assign 1-3 short tags to each (like enterprise, startup, personal, creative, tech, finance, education, spam, etc). Return ONLY a JSON object mapping submission IDs to arrays of tags.
|
|
|
|
Submissions:
|
|
{joiner.join(lines[:50])}
|
|
|
|
Output format: {{"1": ["tag1", "tag2"], "2": ["tag3"]}}"""
|
|
|
|
try:
|
|
import httpx
|
|
model = get_setting(db, "deepseek_model", "deepseek-chat")
|
|
resp = httpx.post(
|
|
"https://api.deepseek.com/v1/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json={
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.3,
|
|
"max_tokens": 1000
|
|
},
|
|
timeout=30
|
|
)
|
|
resp.raise_for_status()
|
|
content = resp.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
# Clean markdown fences
|
|
if content.startswith("```"):
|
|
content = "\n".join(content.split("\n")[1:])
|
|
if content.endswith("```"):
|
|
content = "\n".join(content.split("\n")[:-1])
|
|
content = content.strip()
|
|
|
|
tags_map = json.loads(content)
|
|
tagged = 0
|
|
for sid_str, tags in tags_map.items():
|
|
sid = int(sid_str)
|
|
db.execute("DELETE FROM submission_tags WHERE submission_id = ?", (sid,))
|
|
for tag in tags:
|
|
db.execute(
|
|
"INSERT INTO submission_tags (submission_id, tag) VALUES (?, ?)",
|
|
(sid, tag.strip())
|
|
)
|
|
tagged += 1
|
|
db.commit()
|
|
db.close()
|
|
return {"tagged": tagged, "tags_map": tags_map}
|
|
except json.JSONDecodeError:
|
|
db.close()
|
|
raise HTTPException(500, f"DeepSeek returned unparseable response: {content[:300]}")
|
|
except Exception as e:
|
|
db.close()
|
|
raise HTTPException(500, f"DeepSeek API error: {str(e)}")
|
|
|
|
# ---- DeepSeek: Reply draft ----
|
|
|
|
@app.post("/admin/reply/{submission_id}")
|
|
def generate_reply(submission_id: int, request: Request):
|
|
check_auth(request)
|
|
db = get_db()
|
|
api_key = get_setting(db, "deepseek_api_key")
|
|
if not api_key:
|
|
db.close()
|
|
raise HTTPException(400, "DeepSeek API key not configured. Add it in Settings.")
|
|
|
|
data = db.execute(
|
|
"SELECT * FROM submission_data WHERE submission_id = ?", (submission_id,)
|
|
).fetchall()
|
|
fields = {d["field_label"]: d["field_value"] for d in data}
|
|
name = fields.get("Name", fields.get("name", "there"))
|
|
email = fields.get("Email", fields.get("email", ""))
|
|
domain = fields.get("Domain", fields.get("domain", ""))
|
|
extra = {k: v for k, v in fields.items() if k.lower() not in ("name", "email", "domain")}
|
|
|
|
prompt = f"""Write a short, warm professional email reply to a customer who filled out our form.
|
|
|
|
Customer: {name} ({email})
|
|
Domain: {domain}
|
|
{json.dumps(extra) if extra else ''}
|
|
|
|
The reply should:
|
|
- Thank them by name
|
|
- Mention their domain name naturally
|
|
- Say we will be in touch within 24 hours
|
|
- Keep it under 4 sentences
|
|
- Sign as "The Team"
|
|
|
|
Subject: Thanks for reaching out, {name}!
|
|
|
|
Body:"""
|
|
|
|
try:
|
|
import httpx
|
|
model = get_setting(db, "deepseek_model", "deepseek-chat")
|
|
resp = httpx.post(
|
|
"https://api.deepseek.com/v1/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json={
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.7,
|
|
"max_tokens": 500
|
|
},
|
|
timeout=30
|
|
)
|
|
resp.raise_for_status()
|
|
draft = resp.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
db.execute("DELETE FROM reply_drafts WHERE submission_id = ?", (submission_id,))
|
|
db.execute(
|
|
"INSERT INTO reply_drafts (submission_id, draft_text) VALUES (?, ?)",
|
|
(submission_id, draft)
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
return {"ok": True, "draft": draft}
|
|
except Exception as e:
|
|
db.close()
|
|
raise HTTPException(500, f"DeepSeek API error: {str(e)}")
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
# Static frontend (must be last)
|
|
app.mount("/", StaticFiles(directory="/srv/form-backend/static", html=True), name="static")
|