Files
vela-platform/backend/main.py

708 lines
22 KiB
Python

"""Vela Platform — FastAPI application with API endpoints and Jinja2 frontend."""
from datetime import datetime, timezone
from typing import Optional
from fastapi import Depends, FastAPI, Form, HTTPException, Request, status
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from auth import (
create_access_token,
get_current_user,
get_page_user,
hash_password,
require_admin,
verify_password,
)
from database import Base, SessionLocal, engine, get_db
from models import Service, ServiceComponent, Transaction, User
# ---------------------------------------------------------------------------
# App setup
# ---------------------------------------------------------------------------
app = FastAPI(title="Vela Platform", version="1.0.0")
# Mount templates via Jinja2
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")
HARDCODED_OPEX = 800.0 # MAD/month
# ---------------------------------------------------------------------------
# Helper: serialize a service with its components
# ---------------------------------------------------------------------------
def service_to_dict(svc: Service) -> dict:
return {
"id": svc.id,
"name": svc.name,
"description": svc.description,
"sell_price": svc.sell_price,
"cost_price": svc.cost_price,
"active": svc.active,
"components": [
{
"id": c.id,
"name": c.name,
"unit_cost": c.unit_cost,
"unit_sell": c.unit_sell,
"quantity": c.quantity,
"notes": c.notes,
}
for c in svc.components
],
}
# ---------------------------------------------------------------------------
# Startup: create tables + seed data + create component tables
# ---------------------------------------------------------------------------
COMPONENT_TEMPLATES = {
"Atlas": [
("HDD (1TB)", 600.0, 1000.0, 1, "Enterprise HDD"),
("RAM (16GB)", 300.0, 500.0, 1, "ECC RAM"),
("CPU alloc", 200.0, 400.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Setup & config", 300.0, 600.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
"Atlas+": [
("HDD (1TB)", 600.0, 1000.0, 1, "Enterprise HDD"),
("SSD (256GB)", 400.0, 700.0, 1, "Cache SSD"),
("RAM (32GB)", 600.0, 1000.0, 1, "ECC RAM"),
("CPU alloc", 300.0, 600.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Sauvegarde", 300.0, 500.0, 1, "Backup service"),
("Setup & config", 200.0, 400.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
"Rif": [
("SSD (500GB)", 300.0, 600.0, 1, "NVMe SSD"),
("RAM (16GB)", 300.0, 500.0, 1, "ECC RAM"),
("CPU alloc", 200.0, 400.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Setup & config", 300.0, 600.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
"Rif+": [
("SSD (500GB)", 300.0, 600.0, 1, "NVMe SSD"),
("HDD (1TB)", 600.0, 1000.0, 1, "Enterprise HDD"),
("RAM (32GB)", 600.0, 1000.0, 1, "ECC RAM"),
("CPU alloc", 300.0, 600.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Sauvegarde", 300.0, 500.0, 1, "Backup service"),
("Setup & config", 200.0, 400.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
}
def seed_database():
"""Create tables and seed services (no default users — first-run setup creates them)."""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
try:
# Seed services + components (only if no services exist yet)
if not db.query(Service).first():
for name, desc, sell, cost in [
("Atlas", "Core storage service", 4000.0, 2000.0),
("Atlas+", "Premium storage + backup service", 5000.0, 2800.0),
("Rif", "Core server service", 3500.0, 1700.0),
("Rif+", "Premium server + backup service", 4500.0, 2200.0),
]:
svc = Service(
name=name,
description=desc,
sell_price=sell,
cost_price=cost,
active=True,
)
db.add(svc)
db.flush()
for cname, ucost, usell, qty, notes in COMPONENT_TEMPLATES.get(name, []):
db.add(
ServiceComponent(
service_id=svc.id,
name=cname,
unit_cost=ucost,
unit_sell=usell,
quantity=qty,
notes=notes,
)
)
db.commit()
finally:
db.close()
seed_database()
# ---------------------------------------------------------------------------
# Helper: check if setup is complete
# ---------------------------------------------------------------------------
def is_setup_complete(db: Session) -> bool:
"""Returns True if at least one admin user exists."""
return db.query(User).filter(User.role == "admin").first() is not None
def require_setup(db: Session = Depends(get_db)):
"""Dependency that raises 404 if setup is not complete (caller handles redirect)."""
return is_setup_complete(db)
# ---------------------------------------------------------------------------
# Helper: compute monthly P&L
# ---------------------------------------------------------------------------
def compute_pnl(month: str, db: Session) -> dict:
"""Compute P&L for a given YYYY-MM month."""
transactions = db.query(Transaction).filter(Transaction.month == month).all()
total_revenue = 0.0
total_cost = 0.0
for txn in transactions:
total_revenue += txn.revenue
total_cost += txn.cost
gross_profit = total_revenue - total_cost
gross_margin = (gross_profit / total_revenue * 100) if total_revenue > 0 else 0.0
opex = HARDCODED_OPEX
net_profit = gross_profit - opex
net_margin = (net_profit / total_revenue * 100) if total_revenue > 0 else 0.0
return {
"month": month,
"total_revenue": round(total_revenue, 2),
"total_cost": round(total_cost, 2),
"gross_profit": round(gross_profit, 2),
"gross_margin_pct": round(gross_margin, 2),
"opex": opex,
"net_profit": round(net_profit, 2),
"net_margin_pct": round(net_margin, 2),
}
def get_trend(db: Session, months: int = 12) -> list:
"""Return P&L for the last N months (those with transactions)."""
month_rows = (
db.query(Transaction.month)
.distinct()
.order_by(Transaction.month.desc())
.limit(months)
.all()
)
month_list = [row[0] for row in reversed(month_rows)]
return [compute_pnl(m, db) for m in month_list]
# ---------------------------------------------------------------------------
# API: Auth
# ---------------------------------------------------------------------------
@app.post("/api/auth/login")
def api_login(
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
user = db.query(User).filter(User.username == username).first()
if not user or not verify_password(password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
)
token = create_access_token(user.id, user.role)
response = JSONResponse({
"token": token,
"user": {
"id": user.id,
"username": user.username,
"display_name": user.display_name,
"role": user.role,
},
})
response.set_cookie(
key="token",
value=token,
httponly=True,
max_age=86400,
samesite="lax",
secure=False,
path="/",
)
return response
@app.get("/api/auth/me")
def api_me(current_user: User = Depends(get_current_user)):
return {
"id": current_user.id,
"username": current_user.username,
"display_name": current_user.display_name,
"role": current_user.role,
}
# ---------------------------------------------------------------------------
# API: Services
# ---------------------------------------------------------------------------
@app.get("/api/services")
def api_list_services(db: Session = Depends(get_db)):
return [service_to_dict(s) for s in db.query(Service).order_by(Service.name).all()]
@app.get("/api/services/{service_id}")
def api_get_service(service_id: int, db: Session = Depends(get_db)):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
return service_to_dict(svc)
@app.post("/api/services")
def api_create_service(
name: str = Form(...),
description: str = Form(""),
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = Service(
name=name,
description=description,
sell_price=0.0,
cost_price=0.0,
active=True,
)
db.add(svc)
db.commit()
db.refresh(svc)
return service_to_dict(svc)
@app.put("/api/services/{service_id}")
def api_update_service(
service_id: int,
name: str = Form(...),
description: str = Form(""),
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
svc.name = name
svc.description = description
svc.recompute_prices()
db.commit()
return service_to_dict(svc)
@app.delete("/api/services/{service_id}")
def api_deactivate_service(
service_id: int,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
svc.active = False
db.commit()
return {"ok": True}
# ---------------------------------------------------------------------------
# API: Service Components
# ---------------------------------------------------------------------------
@app.get("/api/services/{service_id}/components")
def api_list_components(service_id: int, db: Session = Depends(get_db)):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
return [
{
"id": c.id,
"name": c.name,
"unit_cost": c.unit_cost,
"unit_sell": c.unit_sell,
"quantity": c.quantity,
"notes": c.notes,
}
for c in svc.components
]
class ComponentInput(BaseModel):
name: str
unit_cost: float = 0.0
unit_sell: float = 0.0
quantity: int = 1
notes: str = ""
@app.post("/api/services/{service_id}/components")
def api_create_component(
service_id: int,
input: ComponentInput,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
comp = ServiceComponent(
service_id=service_id,
name=input.name,
unit_cost=input.unit_cost,
unit_sell=input.unit_sell,
quantity=input.quantity,
notes=input.notes,
)
db.add(comp)
db.flush()
svc.recompute_prices()
db.commit()
db.refresh(comp)
return {
"id": comp.id,
"name": comp.name,
"unit_cost": comp.unit_cost,
"unit_sell": comp.unit_sell,
"quantity": comp.quantity,
"notes": comp.notes,
}
@app.put("/api/components/{component_id}")
def api_update_component(
component_id: int,
input: ComponentInput,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
comp = db.query(ServiceComponent).filter(ServiceComponent.id == component_id).first()
if not comp:
raise HTTPException(status_code=404, detail="Component not found")
comp.name = input.name
comp.unit_cost = input.unit_cost
comp.unit_sell = input.unit_sell
comp.quantity = input.quantity
comp.notes = input.notes
db.flush()
svc = db.query(Service).filter(Service.id == comp.service_id).first()
if svc:
svc.recompute_prices()
db.commit()
return {
"id": comp.id,
"name": comp.name,
"unit_cost": comp.unit_cost,
"unit_sell": comp.unit_sell,
"quantity": comp.quantity,
"notes": comp.notes,
}
@app.delete("/api/components/{component_id}")
def api_delete_component(
component_id: int,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
comp = db.query(ServiceComponent).filter(ServiceComponent.id == component_id).first()
if not comp:
raise HTTPException(status_code=404, detail="Component not found")
svc = db.query(Service).filter(Service.id == comp.service_id).first()
db.delete(comp)
db.flush() # <-- ensures the deleted component is removed before recompute
if svc:
svc.recompute_prices()
db.commit()
return {"ok": True}
# ---------------------------------------------------------------------------
# API: Transactions
# ---------------------------------------------------------------------------
@app.get("/api/transactions")
def api_list_transactions(
month: Optional[str] = None,
db: Session = Depends(get_db),
):
query = db.query(Transaction)
if month:
query = query.filter(Transaction.month == month)
transactions = query.order_by(Transaction.created_at.desc()).all()
return [
{
"id": t.id,
"service_id": t.service_id,
"service_name": t.service.name,
"quantity": t.quantity,
"revenue": t.revenue,
"cost": t.cost,
"month": t.month,
"notes": t.notes,
"created_by": t.created_by,
"creator_name": t.creator.display_name,
"created_at": t.created_at.isoformat() if t.created_at else None,
}
for t in transactions
]
class CreateTransactionInput(BaseModel):
service_id: int
quantity: int
month: str
notes: str = ""
@app.post("/api/transactions")
def api_create_transaction(
input: CreateTransactionInput,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = db.query(Service).filter(Service.id == input.service_id).first()
if not service or not service.active:
raise HTTPException(status_code=400, detail="Invalid or inactive service")
txn = Transaction(
service_id=input.service_id,
quantity=input.quantity,
month=input.month,
notes=input.notes,
created_by=current_user.id,
)
db.add(txn)
db.commit()
db.refresh(txn)
return {
"id": txn.id,
"service_id": txn.service_id,
"service_name": txn.service.name,
"quantity": txn.quantity,
"revenue": txn.revenue,
"cost": txn.cost,
"month": txn.month,
"notes": txn.notes,
"created_by": txn.created_by,
"created_at": txn.created_at.isoformat() if txn.created_at else None,
}
@app.delete("/api/transactions/{transaction_id}")
def api_delete_transaction(
transaction_id: int,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
txn = db.query(Transaction).filter(Transaction.id == transaction_id).first()
if not txn:
raise HTTPException(status_code=404, detail="Transaction not found")
db.delete(txn)
db.commit()
return {"ok": True}
# ---------------------------------------------------------------------------
# API: P&L
# ---------------------------------------------------------------------------
@app.get("/api/pnl")
def api_pnl(month: str, db: Session = Depends(get_db)):
return compute_pnl(month, db)
# ---------------------------------------------------------------------------
# API: Dashboard
# ---------------------------------------------------------------------------
@app.get("/api/dashboard")
def api_dashboard(db: Session = Depends(get_db)):
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
pnl = compute_pnl(current_month, db)
trend = get_trend(db, months=12)
return {
"current_month": pnl,
"trend": trend,
}
# ---------------------------------------------------------------------------
# Frontend Routes
# ---------------------------------------------------------------------------
@app.get("/setup", response_class=HTMLResponse)
def setup_page(request: Request, db: Session = Depends(get_db)):
"""Serve the first-time setup page."""
if is_setup_complete(db):
return RedirectResponse(url="/login")
return templates.TemplateResponse(request, "setup.html")
class SetupInput(BaseModel):
admin_username: str
admin_password: str
admin_display: str = "Administrator"
@app.post("/api/setup")
def api_setup(
input: SetupInput,
db: Session = Depends(get_db),
):
"""Create the first admin user (only works if no admin exists yet)."""
if is_setup_complete(db):
raise HTTPException(status_code=400, detail="Setup already completed")
if not input.admin_username.strip() or len(input.admin_password) < 4:
raise HTTPException(status_code=400, detail="Username required, password min 4 chars")
# Create admin
admin = User(
username=input.admin_username.strip(),
password_hash=hash_password(input.admin_password),
display_name=input.admin_display.strip() or input.admin_username.strip(),
role="admin",
)
db.add(admin)
# Create guest viewer
import secrets
guest_pass = secrets.token_hex(6)
viewer = User(
username="viewer",
password_hash=hash_password(guest_pass),
display_name="Viewer",
role="viewer",
)
db.add(viewer)
db.commit()
db.refresh(admin)
db.refresh(viewer)
# Auto-login as admin
token = create_access_token(admin.id, admin.role)
response = JSONResponse({
"token": token,
"guest_password": guest_pass,
"user": {
"id": admin.id,
"username": admin.username,
"display_name": admin.display_name,
"role": admin.role,
},
})
response.set_cookie(
key="token", value=token,
httponly=True, max_age=86400, samesite="lax",
secure=False, path="/",
)
return response
@app.get("/login", response_class=HTMLResponse)
def login_page(request: Request, db: Session = Depends(get_db)):
"""Serve the login page. Redirect to /setup if no admin exists."""
if not is_setup_complete(db):
return RedirectResponse(url="/setup")
return templates.TemplateResponse(request, "login.html")
@app.get("/", response_class=HTMLResponse)
def dashboard_page(
request: Request,
current_user: Optional[User] = Depends(get_page_user),
db: Session = Depends(get_db),
):
"""Serve the dashboard page."""
if not is_setup_complete(db):
return RedirectResponse(url="/setup")
if not current_user:
return RedirectResponse(url="/login")
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
pnl = compute_pnl(current_month, db)
trend = get_trend(db, months=12)
services = db.query(Service).filter(Service.active == True).all()
return templates.TemplateResponse(
request, "dashboard.html",
{
"current_user": current_user,
"pnl": pnl,
"trend": trend,
"services": services,
},
)
@app.get("/transactions", response_class=HTMLResponse)
def transactions_page(
request: Request,
current_user: Optional[User] = Depends(get_page_user),
db: Session = Depends(get_db),
):
"""Serve the transactions page."""
if not is_setup_complete(db):
return RedirectResponse(url="/setup")
if not current_user:
return RedirectResponse(url="/login")
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
services = db.query(Service).filter(Service.active == True).all()
return templates.TemplateResponse(
request, "transactions.html",
{
"current_user": current_user,
"services": services,
"current_month": current_month,
},
)
@app.get("/services", response_class=HTMLResponse)
def services_page(
request: Request,
current_user: Optional[User] = Depends(get_page_user),
db: Session = Depends(get_db),
):
"""Serve the services management page."""
if not is_setup_complete(db):
return RedirectResponse(url="/setup")
if not current_user:
return RedirectResponse(url="/login")
services = db.query(Service).order_by(Service.name).all()
services_json = [service_to_dict(s) for s in services]
return templates.TemplateResponse(
request, "services.html",
{
"services": services,
"services_json": services_json,
"current_user": current_user,
},
)
# ---------------------------------------------------------------------------
# Logout
# ---------------------------------------------------------------------------
@app.get("/logout")
def logout():
return RedirectResponse(url="/login")
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8788, reload=True)