#!/usr/bin/env python3 """Gradio web UI for launching YouTube Auto Dub jobs.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone import html import json from pathlib import Path import os import shutil import subprocess import sys import threading import uuid import gradio as gr from main import build_parser from src.audio_separation import DEFAULT_MIX_MODE from src.engines import OUTPUT_DIR from src.translation import ( DEFAULT_LM_STUDIO_API_KEY, DEFAULT_LM_STUDIO_BASE_URL, DEFAULT_LM_STUDIO_MODEL, ) BASE_DIR = Path(__file__).resolve().parent LOG_DIR = BASE_DIR / "logs" / "gradio" SETTINGS_FILE = BASE_DIR / ".cache" / "web_settings.json" UPLOAD_DIR = BASE_DIR / ".cache" / "uploads" PIPELINE_STEPS = [ ("STEP 1", "Preparing content"), ("STEP 2", "Speech transcription"), ("STEP 3", "Intelligent chunking"), ("STEP 4", "Translation"), ("STEP 5", "Dub audio synthesis"), ("STEP 6", "Subtitle generation"), ("STEP 7", "Audio bed preparation"), ("STEP 8", "Final video rendering"), ] @dataclass class DubJob: """Runtime state for a web-launched dub job.""" id: str command: list[str] log_path: Path env_overrides: dict[str, str] = field(default_factory=dict) created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) status: str = "queued" returncode: int | None = None completed_at: datetime | None = None JOBS: dict[str, DubJob] = {} JOBS_LOCK = threading.Lock() def _default_translation_settings() -> dict[str, str]: return { "base_url": os.getenv("LM_STUDIO_BASE_URL") or DEFAULT_LM_STUDIO_BASE_URL, "api_key": os.getenv("LM_STUDIO_API_KEY") or DEFAULT_LM_STUDIO_API_KEY, "model": os.getenv("LM_STUDIO_MODEL") or DEFAULT_LM_STUDIO_MODEL, } def load_translation_settings() -> dict[str, str]: """Load saved OpenAI-compatible translation settings.""" settings = _default_translation_settings() if not SETTINGS_FILE.exists(): return settings try: payload = json.loads(SETTINGS_FILE.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return settings if not isinstance(payload, dict): return settings for key in settings: value = payload.get(key) if isinstance(value, str) and value.strip(): settings[key] = value.strip() return settings def save_translation_settings(base_url: str, api_key: str, model: str) -> tuple[str, str, str, str]: """Persist OpenAI-compatible endpoint settings for future web jobs.""" settings = { "base_url": (base_url or "").strip() or DEFAULT_LM_STUDIO_BASE_URL, "api_key": (api_key or "").strip() or DEFAULT_LM_STUDIO_API_KEY, "model": (model or "").strip() or DEFAULT_LM_STUDIO_MODEL, } SETTINGS_FILE.parent.mkdir(parents=True, exist_ok=True) SETTINGS_FILE.write_text(json.dumps(settings, indent=2), encoding="utf-8") return ( settings["base_url"], settings["api_key"], settings["model"], f"Saved settings to {SETTINGS_FILE}", ) def _utc_iso(value: datetime | None) -> str | None: if value is None: return None return value.astimezone(timezone.utc).isoformat() def build_pipeline_command(form: dict[str, str | bool]) -> list[str]: """Build a validated command for the existing CLI pipeline.""" parser = build_parser() args = parser.parse_args(_form_to_cli_args(form)) command = [ sys.executable, str(BASE_DIR / "main.py"), "--lang", args.lang, "--mix-mode", args.mix_mode, ] if args.url: command.insert(2, args.url) if args.input_file: command.extend(["--input-file", args.input_file]) if args.translation_backend: command.extend(["--translation-backend", args.translation_backend]) optional_flags = { "--browser": args.browser, "--cookies": args.cookies, "--whisper_model": args.whisper_model, "--lmstudio-base-url": args.lmstudio_base_url, "--lmstudio-model": args.lmstudio_model, } for flag, value in optional_flags.items(): if value: command.extend([flag, value]) if args.gpu: command.append("--gpu") return command def _form_to_cli_args(form: dict[str, str | bool]) -> list[str]: url = (form.get("url") or "").strip() input_file = (form.get("input_file") or "").strip() if not url and not input_file: raise ValueError("A YouTube URL or uploaded MP4 is required.") if url and input_file: raise ValueError("Use either a YouTube URL or uploaded MP4, not both.") cli_args = [url] if url else [] if input_file: cli_args.extend(["--input-file", input_file]) field_flags = { "lang": "--lang", "browser": "--browser", "cookies": "--cookies", "whisper_model": "--whisper_model", "mix_mode": "--mix-mode", "translation_backend": "--translation-backend", "lmstudio_base_url": "--lmstudio-base-url", "lmstudio_model": "--lmstudio-model", } defaults = { "lang": "es", "mix_mode": DEFAULT_MIX_MODE, "translation_backend": "lmstudio", } for field_name, flag in field_flags.items(): value = (form.get(field_name) or defaults.get(field_name) or "").strip() if value: cli_args.extend([flag, value]) gpu_value = form.get("gpu") if gpu_value is True or str(gpu_value).lower() in {"1", "true", "on", "yes"}: cli_args.append("--gpu") return cli_args def _stage_uploaded_mp4(uploaded_file: str | None) -> str: return _stage_uploaded_file(uploaded_file, allowed_suffixes={".mp4"}, fallback_name="upload") def _stage_uploaded_cookies(uploaded_file: str | None) -> str: return _stage_uploaded_file( uploaded_file, allowed_suffixes={".txt", ".cookies", ".cookie"}, fallback_name="cookies", ) def _stage_uploaded_file( uploaded_file: str | None, allowed_suffixes: set[str], fallback_name: str, ) -> str: if not uploaded_file: return "" source_path = Path(uploaded_file) suffix = source_path.suffix.lower() if suffix not in allowed_suffixes: expected = ", ".join(sorted(allowed_suffixes)) raise ValueError(f"Unsupported upload type. Expected one of: {expected}.") if not source_path.exists(): raise FileNotFoundError(f"Uploaded file not found: {source_path}") safe_stem = "".join(char if char.isalnum() or char in {"-", "_"} else "_" for char in source_path.stem) staged_name = f"{uuid.uuid4().hex[:12]}_{safe_stem or fallback_name}{suffix}" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) staged_path = UPLOAD_DIR / staged_name shutil.copy2(source_path, staged_path) return str(staged_path) def _format_job_status(job: DubJob | None) -> str: if job is None: return "Ready" lines = [ f"Job: {job.id}", f"Status: {job.status}", f"Created: {_utc_iso(job.created_at)}", ] if job.completed_at: lines.append(f"Completed: {_utc_iso(job.completed_at)}") if job.returncode is not None: lines.append(f"Return code: {job.returncode}") return "\n".join(lines) def _read_log_tail(log_path: Path, max_chars: int = 20000) -> str: if not log_path.exists(): return "" text = log_path.read_text(encoding="utf-8", errors="replace") return text[-max_chars:] def _job_progress(job: DubJob | None) -> tuple[int, str]: """Return a coarse progress percentage and HTML step summary.""" if job is None: return 0, _render_steps_html(0, "queued") log_text = _read_log_tail(job.log_path) current_step = 0 for index, (marker, _) in enumerate(PIPELINE_STEPS, start=1): if marker in log_text: current_step = index if job.status == "succeeded": return 100, _render_steps_html(len(PIPELINE_STEPS), job.status) progress = int((current_step / len(PIPELINE_STEPS)) * 100) if job.status == "running" and progress == 0: progress = 3 return progress, _render_steps_html(current_step, job.status) def _render_steps_html(current_step: int, status: str) -> str: rows = [] failed = status == "failed" for index, (_, label) in enumerate(PIPELINE_STEPS, start=1): if failed and index == max(current_step, 1): state = "failed" elif index < current_step or status == "succeeded": state = "done" elif index == current_step and status in {"queued", "running"}: state = "active" else: state = "todo" rows.append( "