Files
vela-platform/backend/main.py
oimwiodev d1160673a7 🎉 v1.0.0 — Vela Platform launch
Self-hosted P&L tracking app with component-level pricing.
Offers: Atlas/Atlas+/Rif/Rif+ with granular cost breakdown.
API + MCP + multi-user auth.
2026-06-15 23:05:59 +01:00

641 lines
20 KiB
Python

"""Vela Platform — FastAPI application with API endpoints and Jinja2 frontend."""
from datetime import datetime, timezone
from typing import Optional
from fastapi import Depends, FastAPI, Form, HTTPException, Request, status
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from auth import (
create_access_token,
get_current_user,
get_page_user,
hash_password,
require_admin,
verify_password,
)
from database import Base, SessionLocal, engine, get_db
from models import Service, ServiceComponent, Transaction, User
# ---------------------------------------------------------------------------
# App setup
# ---------------------------------------------------------------------------
app = FastAPI(title="Vela Platform", version="1.0.0")
# Mount templates via Jinja2
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")
HARDCODED_OPEX = 800.0 # MAD/month
# ---------------------------------------------------------------------------
# Helper: serialize a service with its components
# ---------------------------------------------------------------------------
def service_to_dict(svc: Service) -> dict:
return {
"id": svc.id,
"name": svc.name,
"description": svc.description,
"sell_price": svc.sell_price,
"cost_price": svc.cost_price,
"active": svc.active,
"components": [
{
"id": c.id,
"name": c.name,
"unit_cost": c.unit_cost,
"unit_sell": c.unit_sell,
"quantity": c.quantity,
"notes": c.notes,
}
for c in svc.components
],
}
# ---------------------------------------------------------------------------
# Startup: create tables + seed data + create component tables
# ---------------------------------------------------------------------------
COMPONENT_TEMPLATES = {
"Atlas": [
("HDD (1TB)", 600.0, 1000.0, 1, "Enterprise HDD"),
("RAM (16GB)", 300.0, 500.0, 1, "ECC RAM"),
("CPU alloc", 200.0, 400.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Setup & config", 300.0, 600.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
"Atlas+": [
("HDD (1TB)", 600.0, 1000.0, 1, "Enterprise HDD"),
("SSD (256GB)", 400.0, 700.0, 1, "Cache SSD"),
("RAM (32GB)", 600.0, 1000.0, 1, "ECC RAM"),
("CPU alloc", 300.0, 600.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Sauvegarde", 300.0, 500.0, 1, "Backup service"),
("Setup & config", 200.0, 400.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
"Rif": [
("SSD (500GB)", 300.0, 600.0, 1, "NVMe SSD"),
("RAM (16GB)", 300.0, 500.0, 1, "ECC RAM"),
("CPU alloc", 200.0, 400.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Setup & config", 300.0, 600.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
"Rif+": [
("SSD (500GB)", 300.0, 600.0, 1, "NVMe SSD"),
("HDD (1TB)", 600.0, 1000.0, 1, "Enterprise HDD"),
("RAM (32GB)", 600.0, 1000.0, 1, "ECC RAM"),
("CPU alloc", 300.0, 600.0, 1, "CPU core allocation"),
("Casing & PSU", 200.0, 300.0, 1, "Chassis + power"),
("Bandwidth 1Gbps", 200.0, 500.0, 1, "Network port"),
("Sauvegarde", 300.0, 500.0, 1, "Backup service"),
("Setup & config", 200.0, 400.0, 1, "One-time setup amortised"),
("Support", 200.0, 700.0, 1, "Monthly support"),
],
}
def seed_database():
"""Create tables and insert preseed data if missing."""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
try:
# Seed users
if not db.query(User).filter(User.username == "admin").first():
db.add(
User(
username="admin",
password_hash=hash_password("admin123"),
display_name="Administrator",
role="admin",
)
)
if not db.query(User).filter(User.username == "viewer").first():
db.add(
User(
username="viewer",
password_hash=hash_password("viewer123"),
display_name="Viewer",
role="viewer",
)
)
# Seed services + components
for name, desc, sell, cost in [
("Atlas", "Core storage service", 4000.0, 2000.0),
("Atlas+", "Premium storage + backup service", 5000.0, 2800.0),
("Rif", "Core server service", 3500.0, 1700.0),
("Rif+", "Premium server + backup service", 4500.0, 2200.0),
]:
existing = db.query(Service).filter(Service.name == name).first()
if not existing:
svc = Service(
name=name,
description=desc,
sell_price=sell,
cost_price=cost,
active=True,
)
db.add(svc)
db.flush() # get the id
# Add component templates
for cname, ucost, usell, qty, notes in COMPONENT_TEMPLATES.get(name, []):
db.add(
ServiceComponent(
service_id=svc.id,
name=cname,
unit_cost=ucost,
unit_sell=usell,
quantity=qty,
notes=notes,
)
)
db.commit()
finally:
db.close()
seed_database()
# ---------------------------------------------------------------------------
# Helper: compute monthly P&L
# ---------------------------------------------------------------------------
def compute_pnl(month: str, db: Session) -> dict:
"""Compute P&L for a given YYYY-MM month."""
transactions = db.query(Transaction).filter(Transaction.month == month).all()
total_revenue = 0.0
total_cost = 0.0
for txn in transactions:
total_revenue += txn.revenue
total_cost += txn.cost
gross_profit = total_revenue - total_cost
gross_margin = (gross_profit / total_revenue * 100) if total_revenue > 0 else 0.0
opex = HARDCODED_OPEX
net_profit = gross_profit - opex
net_margin = (net_profit / total_revenue * 100) if total_revenue > 0 else 0.0
return {
"month": month,
"total_revenue": round(total_revenue, 2),
"total_cost": round(total_cost, 2),
"gross_profit": round(gross_profit, 2),
"gross_margin_pct": round(gross_margin, 2),
"opex": opex,
"net_profit": round(net_profit, 2),
"net_margin_pct": round(net_margin, 2),
}
def get_trend(db: Session, months: int = 12) -> list:
"""Return P&L for the last N months (those with transactions)."""
month_rows = (
db.query(Transaction.month)
.distinct()
.order_by(Transaction.month.desc())
.limit(months)
.all()
)
month_list = [row[0] for row in reversed(month_rows)]
return [compute_pnl(m, db) for m in month_list]
# ---------------------------------------------------------------------------
# API: Auth
# ---------------------------------------------------------------------------
@app.post("/api/auth/login")
def api_login(
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
user = db.query(User).filter(User.username == username).first()
if not user or not verify_password(password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
)
token = create_access_token(user.id, user.role)
response = JSONResponse({
"token": token,
"user": {
"id": user.id,
"username": user.username,
"display_name": user.display_name,
"role": user.role,
},
})
response.set_cookie(
key="token",
value=token,
httponly=True,
max_age=86400,
samesite="lax",
secure=False,
path="/",
)
return response
@app.get("/api/auth/me")
def api_me(current_user: User = Depends(get_current_user)):
return {
"id": current_user.id,
"username": current_user.username,
"display_name": current_user.display_name,
"role": current_user.role,
}
# ---------------------------------------------------------------------------
# API: Services
# ---------------------------------------------------------------------------
@app.get("/api/services")
def api_list_services(db: Session = Depends(get_db)):
return [service_to_dict(s) for s in db.query(Service).order_by(Service.name).all()]
@app.get("/api/services/{service_id}")
def api_get_service(service_id: int, db: Session = Depends(get_db)):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
return service_to_dict(svc)
@app.post("/api/services")
def api_create_service(
name: str = Form(...),
description: str = Form(""),
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = Service(
name=name,
description=description,
sell_price=0.0,
cost_price=0.0,
active=True,
)
db.add(svc)
db.commit()
db.refresh(svc)
return service_to_dict(svc)
@app.put("/api/services/{service_id}")
def api_update_service(
service_id: int,
name: str = Form(...),
description: str = Form(""),
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
svc.name = name
svc.description = description
svc.recompute_prices()
db.commit()
return service_to_dict(svc)
@app.delete("/api/services/{service_id}")
def api_deactivate_service(
service_id: int,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
svc.active = False
db.commit()
return {"ok": True}
# ---------------------------------------------------------------------------
# API: Service Components
# ---------------------------------------------------------------------------
@app.get("/api/services/{service_id}/components")
def api_list_components(service_id: int, db: Session = Depends(get_db)):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
return [
{
"id": c.id,
"name": c.name,
"unit_cost": c.unit_cost,
"unit_sell": c.unit_sell,
"quantity": c.quantity,
"notes": c.notes,
}
for c in svc.components
]
class ComponentInput(BaseModel):
name: str
unit_cost: float = 0.0
unit_sell: float = 0.0
quantity: int = 1
notes: str = ""
@app.post("/api/services/{service_id}/components")
def api_create_component(
service_id: int,
input: ComponentInput,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
svc = db.query(Service).filter(Service.id == service_id).first()
if not svc:
raise HTTPException(status_code=404, detail="Service not found")
comp = ServiceComponent(
service_id=service_id,
name=input.name,
unit_cost=input.unit_cost,
unit_sell=input.unit_sell,
quantity=input.quantity,
notes=input.notes,
)
db.add(comp)
db.flush()
svc.recompute_prices()
db.commit()
db.refresh(comp)
return {
"id": comp.id,
"name": comp.name,
"unit_cost": comp.unit_cost,
"unit_sell": comp.unit_sell,
"quantity": comp.quantity,
"notes": comp.notes,
}
@app.put("/api/components/{component_id}")
def api_update_component(
component_id: int,
input: ComponentInput,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
comp = db.query(ServiceComponent).filter(ServiceComponent.id == component_id).first()
if not comp:
raise HTTPException(status_code=404, detail="Component not found")
comp.name = input.name
comp.unit_cost = input.unit_cost
comp.unit_sell = input.unit_sell
comp.quantity = input.quantity
comp.notes = input.notes
db.flush()
svc = db.query(Service).filter(Service.id == comp.service_id).first()
if svc:
svc.recompute_prices()
db.commit()
return {
"id": comp.id,
"name": comp.name,
"unit_cost": comp.unit_cost,
"unit_sell": comp.unit_sell,
"quantity": comp.quantity,
"notes": comp.notes,
}
@app.delete("/api/components/{component_id}")
def api_delete_component(
component_id: int,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
comp = db.query(ServiceComponent).filter(ServiceComponent.id == component_id).first()
if not comp:
raise HTTPException(status_code=404, detail="Component not found")
svc = db.query(Service).filter(Service.id == comp.service_id).first()
db.delete(comp)
db.flush() # <-- ensures the deleted component is removed before recompute
if svc:
svc.recompute_prices()
db.commit()
return {"ok": True}
# ---------------------------------------------------------------------------
# API: Transactions
# ---------------------------------------------------------------------------
@app.get("/api/transactions")
def api_list_transactions(
month: Optional[str] = None,
db: Session = Depends(get_db),
):
query = db.query(Transaction)
if month:
query = query.filter(Transaction.month == month)
transactions = query.order_by(Transaction.created_at.desc()).all()
return [
{
"id": t.id,
"service_id": t.service_id,
"service_name": t.service.name,
"quantity": t.quantity,
"revenue": t.revenue,
"cost": t.cost,
"month": t.month,
"notes": t.notes,
"created_by": t.created_by,
"creator_name": t.creator.display_name,
"created_at": t.created_at.isoformat() if t.created_at else None,
}
for t in transactions
]
class CreateTransactionInput(BaseModel):
service_id: int
quantity: int
month: str
notes: str = ""
@app.post("/api/transactions")
def api_create_transaction(
input: CreateTransactionInput,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = db.query(Service).filter(Service.id == input.service_id).first()
if not service or not service.active:
raise HTTPException(status_code=400, detail="Invalid or inactive service")
txn = Transaction(
service_id=input.service_id,
quantity=input.quantity,
month=input.month,
notes=input.notes,
created_by=current_user.id,
)
db.add(txn)
db.commit()
db.refresh(txn)
return {
"id": txn.id,
"service_id": txn.service_id,
"service_name": txn.service.name,
"quantity": txn.quantity,
"revenue": txn.revenue,
"cost": txn.cost,
"month": txn.month,
"notes": txn.notes,
"created_by": txn.created_by,
"created_at": txn.created_at.isoformat() if txn.created_at else None,
}
@app.delete("/api/transactions/{transaction_id}")
def api_delete_transaction(
transaction_id: int,
db: Session = Depends(get_db),
_admin: User = Depends(require_admin),
):
txn = db.query(Transaction).filter(Transaction.id == transaction_id).first()
if not txn:
raise HTTPException(status_code=404, detail="Transaction not found")
db.delete(txn)
db.commit()
return {"ok": True}
# ---------------------------------------------------------------------------
# API: P&L
# ---------------------------------------------------------------------------
@app.get("/api/pnl")
def api_pnl(month: str, db: Session = Depends(get_db)):
return compute_pnl(month, db)
# ---------------------------------------------------------------------------
# API: Dashboard
# ---------------------------------------------------------------------------
@app.get("/api/dashboard")
def api_dashboard(db: Session = Depends(get_db)):
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
pnl = compute_pnl(current_month, db)
trend = get_trend(db, months=12)
return {
"current_month": pnl,
"trend": trend,
}
# ---------------------------------------------------------------------------
# Frontend Routes
# ---------------------------------------------------------------------------
@app.get("/login", response_class=HTMLResponse)
def login_page(request: Request):
"""Serve the login page."""
return templates.TemplateResponse(request, "login.html")
@app.get("/", response_class=HTMLResponse)
def dashboard_page(
request: Request,
current_user: Optional[User] = Depends(get_page_user),
db: Session = Depends(get_db),
):
"""Serve the dashboard page."""
if not current_user:
return RedirectResponse(url="/login")
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
pnl = compute_pnl(current_month, db)
trend = get_trend(db, months=12)
services = db.query(Service).filter(Service.active == True).all()
return templates.TemplateResponse(
request, "dashboard.html",
{
"current_user": current_user,
"pnl": pnl,
"trend": trend,
"services": services,
},
)
@app.get("/transactions", response_class=HTMLResponse)
def transactions_page(
request: Request,
current_user: Optional[User] = Depends(get_page_user),
db: Session = Depends(get_db),
):
"""Serve the transactions page."""
if not current_user:
return RedirectResponse(url="/login")
current_month = datetime.now(timezone.utc).strftime("%Y-%m")
services = db.query(Service).filter(Service.active == True).all()
return templates.TemplateResponse(
request, "transactions.html",
{
"current_user": current_user,
"services": services,
"current_month": current_month,
},
)
@app.get("/services", response_class=HTMLResponse)
def services_page(
request: Request,
current_user: Optional[User] = Depends(get_page_user),
db: Session = Depends(get_db),
):
"""Serve the services management page."""
if not current_user:
return RedirectResponse(url="/login")
services = db.query(Service).order_by(Service.name).all()
services_json = [service_to_dict(s) for s in services]
return templates.TemplateResponse(
request, "services.html",
{
"services": services,
"services_json": services_json,
"current_user": current_user,
},
)
# ---------------------------------------------------------------------------
# Logout
# ---------------------------------------------------------------------------
@app.get("/logout")
def logout():
return RedirectResponse(url="/login")
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8788, reload=True)