diff --git a/README.md b/README.md index 8504c16..70fc155 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ Basic example: ### Gradio Web UI -Gradio provides a local browser UI for starting dub jobs, watching logs, and downloading finished videos: +Gradio provides a local browser UI for starting dub jobs, watching progress, and downloading finished videos: ```powershell .venv\Scripts\python.exe web_app.py @@ -80,6 +80,8 @@ The OpenAI-compatible translation endpoint, API key, and model can be changed in You can also upload a local `.mp4` instead of entering a YouTube URL. Uploaded videos are staged under `.cache/uploads` and processed with the same transcription, translation, dubbing, and render pipeline. Restricted YouTube videos can use the **Upload Cookies File** control instead of typing a local cookies path. +The web UI automatically refreshes job status, progress, steps, and output choices every few seconds while it is open. The manual **Refresh** button is still available. + ### Docker Build and run the Gradio UI in a container: diff --git a/tests/test_web_app.py b/tests/test_web_app.py index 6189c83..4ee6a0e 100644 --- a/tests/test_web_app.py +++ b/tests/test_web_app.py @@ -6,6 +6,8 @@ import sys import web_app from web_app import ( + DubJob, + _job_progress, _stage_uploaded_cookies, build_pipeline_command, create_app, @@ -123,3 +125,27 @@ def test_stage_uploaded_cookies_rejects_unsupported_extension(tmp_path): assert "Expected one of" in str(exc) else: raise AssertionError("Expected ValueError for unsupported cookie upload") + + +def test_job_progress_tracks_pipeline_steps(tmp_path): + log_path = tmp_path / "job.log" + log_path.write_text("STEP 1: PREPARING CONTENT\nSTEP 2: SPEECH TRANSCRIPTION\n", encoding="utf-8") + job = DubJob(id="demo", command=[], log_path=log_path, status="running") + + progress, steps_html = _job_progress(job) + + assert progress == 25 + assert "[done]" in steps_html + assert "[active]" in steps_html + assert "Speech transcription" in steps_html + + +def test_job_progress_marks_succeeded_complete(tmp_path): + log_path = tmp_path / "job.log" + log_path.write_text("", encoding="utf-8") + job = DubJob(id="demo", command=[], log_path=log_path, status="succeeded") + + progress, steps_html = _job_progress(job) + + assert progress == 100 + assert "[todo]" not in steps_html diff --git a/web_app.py b/web_app.py index 757ecb3..f7f41b4 100644 --- a/web_app.py +++ b/web_app.py @@ -5,6 +5,7 @@ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone +import html import json from pathlib import Path import os @@ -30,6 +31,16 @@ BASE_DIR = Path(__file__).resolve().parent LOG_DIR = BASE_DIR / "logs" / "gradio" SETTINGS_FILE = BASE_DIR / ".cache" / "web_settings.json" UPLOAD_DIR = BASE_DIR / ".cache" / "uploads" +PIPELINE_STEPS = [ + ("STEP 1", "Preparing content"), + ("STEP 2", "Speech transcription"), + ("STEP 3", "Intelligent chunking"), + ("STEP 4", "Translation"), + ("STEP 5", "Dub audio synthesis"), + ("STEP 6", "Subtitle generation"), + ("STEP 7", "Audio bed preparation"), + ("STEP 8", "Final video rendering"), +] @dataclass @@ -237,6 +248,60 @@ def _read_log_tail(log_path: Path, max_chars: int = 20000) -> str: return text[-max_chars:] +def _job_progress(job: DubJob | None) -> tuple[int, str]: + """Return a coarse progress percentage and HTML step summary.""" + if job is None: + return 0, _render_steps_html(0, "queued") + + log_text = _read_log_tail(job.log_path) + current_step = 0 + for index, (marker, _) in enumerate(PIPELINE_STEPS, start=1): + if marker in log_text: + current_step = index + + if job.status == "succeeded": + return 100, _render_steps_html(len(PIPELINE_STEPS), job.status) + + progress = int((current_step / len(PIPELINE_STEPS)) * 100) + if job.status == "running" and progress == 0: + progress = 3 + return progress, _render_steps_html(current_step, job.status) + + +def _render_steps_html(current_step: int, status: str) -> str: + rows = [] + failed = status == "failed" + for index, (_, label) in enumerate(PIPELINE_STEPS, start=1): + if failed and index == max(current_step, 1): + state = "failed" + elif index < current_step or status == "succeeded": + state = "done" + elif index == current_step and status in {"queued", "running"}: + state = "active" + else: + state = "todo" + + rows.append( + "