Add MP4 upload support
This commit is contained in:
@@ -78,6 +78,8 @@ Open `http://127.0.0.1:7860` and submit a YouTube URL. Jobs run through the same
|
||||
|
||||
The OpenAI-compatible translation endpoint, API key, and model can be changed in the UI under **OpenAI-Compatible Settings**. Click **Save Settings** to persist them to `.cache/web_settings.json` for future web jobs. Unsaved values in the fields are still used for the next job you start.
|
||||
|
||||
You can also upload a local `.mp4` instead of entering a YouTube URL. Uploaded videos are staged under `.cache/uploads` and processed with the same transcription, translation, dubbing, and render pipeline.
|
||||
|
||||
### Docker
|
||||
|
||||
Build and run the Gradio UI in a container:
|
||||
@@ -120,11 +122,18 @@ Authentication options for restricted videos still work as before:
|
||||
.venv\Scripts\python.exe main.py "https://youtube.com/watch?v=VIDEO_ID" --lang de --cookies cookies.txt
|
||||
```
|
||||
|
||||
Process a local MP4:
|
||||
|
||||
```powershell
|
||||
.venv\Scripts\python.exe main.py --input-file "C:\path\to\video.mp4" --lang es
|
||||
```
|
||||
|
||||
## CLI Options
|
||||
|
||||
| Option | Description |
|
||||
| --- | --- |
|
||||
| `url` | YouTube video URL to process |
|
||||
| `--input-file` | Local MP4 file to process instead of a YouTube URL |
|
||||
| `--lang`, `-l` | Target language code |
|
||||
| `--browser`, `-b` | Browser name for cookie extraction |
|
||||
| `--cookies`, `-c` | Path to exported cookies file |
|
||||
|
||||
82
main.py
82
main.py
@@ -7,6 +7,7 @@ import argparse
|
||||
import asyncio
|
||||
import shutil
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from src.audio_separation import DEFAULT_MIX_MODE
|
||||
from src.core_utils import ConfigurationError
|
||||
@@ -28,7 +29,11 @@ Examples:
|
||||
""",
|
||||
)
|
||||
|
||||
parser.add_argument("url", help="YouTube video URL to subtitle")
|
||||
parser.add_argument("url", nargs="?", help="YouTube video URL to subtitle")
|
||||
parser.add_argument(
|
||||
"--input-file",
|
||||
help="Path to a local MP4 file to dub instead of downloading from YouTube.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lang",
|
||||
"-l",
|
||||
@@ -148,6 +153,24 @@ def _build_translation_config(args: argparse.Namespace) -> TranslationConfig:
|
||||
)
|
||||
|
||||
|
||||
def _validate_source_args(args: argparse.Namespace) -> None:
|
||||
"""Ensure exactly one source input is configured."""
|
||||
if bool(args.url) == bool(args.input_file):
|
||||
raise SystemExit("Provide either a YouTube URL or --input-file, but not both.")
|
||||
|
||||
|
||||
def _prepare_local_video(input_file: str, media_module, cache_dir: Path) -> tuple[Path, Path]:
|
||||
"""Validate a local MP4 and extract its audio for the shared pipeline."""
|
||||
video_path = Path(input_file).expanduser().resolve()
|
||||
if not video_path.exists():
|
||||
raise FileNotFoundError(f"Input file not found: {video_path}")
|
||||
if video_path.suffix.lower() != ".mp4":
|
||||
raise ValueError("Only MP4 input files are supported.")
|
||||
|
||||
audio_path = cache_dir / f"{video_path.stem}_uploaded.wav"
|
||||
return video_path, media_module.extract_audio_from_video(video_path, audio_path)
|
||||
|
||||
|
||||
def _get_source_language_hint() -> str:
|
||||
"""Read an optional source language override from the environment."""
|
||||
import os
|
||||
@@ -190,6 +213,7 @@ def main() -> None:
|
||||
"""Run the full YouTube Auto Dub pipeline."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args()
|
||||
_validate_source_args(args)
|
||||
|
||||
import src.engines
|
||||
import src.media
|
||||
@@ -233,32 +257,42 @@ def main() -> None:
|
||||
)
|
||||
|
||||
print(f"\n{'=' * 60}")
|
||||
print("STEP 1: DOWNLOADING CONTENT")
|
||||
print("STEP 1: PREPARING CONTENT")
|
||||
print(f"{'=' * 60}")
|
||||
print(f"[*] Target URL: {args.url}")
|
||||
print(f"[*] Target Language: {args.lang.upper()}")
|
||||
|
||||
try:
|
||||
video_path = src.youtube.downloadVideo(
|
||||
args.url,
|
||||
browser=args.browser,
|
||||
cookies_file=args.cookies,
|
||||
)
|
||||
audio_path = src.youtube.downloadAudio(
|
||||
args.url,
|
||||
browser=args.browser,
|
||||
cookies_file=args.cookies,
|
||||
)
|
||||
print(f"[+] Video downloaded: {video_path}")
|
||||
print(f"[+] Audio extracted: {audio_path}")
|
||||
except Exception as exc:
|
||||
print(f"\n[!] DOWNLOAD FAILED: {exc}")
|
||||
print("\n[-] TROUBLESHOOTING TIPS:")
|
||||
print(" 1. Close all browser windows if using --browser")
|
||||
print(" 2. Export fresh cookies.txt and use --cookies")
|
||||
print(" 3. Check if video is private/region-restricted")
|
||||
print(" 4. Verify YouTube URL is correct")
|
||||
return
|
||||
if args.input_file:
|
||||
print(f"[*] Source MP4: {args.input_file}")
|
||||
try:
|
||||
video_path, audio_path = _prepare_local_video(args.input_file, src.media, src.engines.CACHE_DIR)
|
||||
print(f"[+] Local video ready: {video_path}")
|
||||
print(f"[+] Audio extracted: {audio_path}")
|
||||
except Exception as exc:
|
||||
print(f"\n[!] LOCAL INPUT FAILED: {exc}")
|
||||
return
|
||||
else:
|
||||
print(f"[*] Target URL: {args.url}")
|
||||
try:
|
||||
video_path = src.youtube.downloadVideo(
|
||||
args.url,
|
||||
browser=args.browser,
|
||||
cookies_file=args.cookies,
|
||||
)
|
||||
audio_path = src.youtube.downloadAudio(
|
||||
args.url,
|
||||
browser=args.browser,
|
||||
cookies_file=args.cookies,
|
||||
)
|
||||
print(f"[+] Video downloaded: {video_path}")
|
||||
print(f"[+] Audio extracted: {audio_path}")
|
||||
except Exception as exc:
|
||||
print(f"\n[!] DOWNLOAD FAILED: {exc}")
|
||||
print("\n[-] TROUBLESHOOTING TIPS:")
|
||||
print(" 1. Close all browser windows if using --browser")
|
||||
print(" 2. Export fresh cookies.txt and use --cookies")
|
||||
print(" 3. Check if video is private/region-restricted")
|
||||
print(" 4. Verify YouTube URL is correct")
|
||||
return
|
||||
|
||||
print(f"\n{'=' * 60}")
|
||||
print("STEP 2: SPEECH TRANSCRIPTION")
|
||||
|
||||
23
src/media.py
23
src/media.py
@@ -22,6 +22,29 @@ from src.engines import SAMPLE_RATE
|
||||
FINAL_MIX_CHANNELS = 2
|
||||
|
||||
|
||||
def extract_audio_from_video(video_path: Path, output_path: Path) -> Path:
|
||||
"""Extract mono WAV audio from a local video file for transcription."""
|
||||
if not video_path.exists():
|
||||
raise FileNotFoundError(f"Source video is missing: {video_path}")
|
||||
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cmd = [
|
||||
'ffmpeg', '-y', '-v', 'error',
|
||||
'-i', str(video_path),
|
||||
'-vn',
|
||||
'-acodec', 'pcm_s16le',
|
||||
'-ar', str(SAMPLE_RATE),
|
||||
'-ac', '1',
|
||||
str(output_path),
|
||||
]
|
||||
subprocess.run(cmd, check=True, timeout=None)
|
||||
|
||||
if not output_path.exists() or output_path.stat().st_size < 1024:
|
||||
raise RuntimeError(f"Audio extraction did not create a usable WAV file: {output_path}")
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def _build_subtitle_filter(subtitle_path: Path) -> str:
|
||||
"""Build a Windows-safe FFmpeg subtitles filter expression."""
|
||||
escaped_path = str(subtitle_path.resolve()).replace("\\", "/").replace(":", "\\:")
|
||||
|
||||
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
from src.audio_separation import DEFAULT_MIX_MODE
|
||||
|
||||
from main import _build_translation_config, build_parser
|
||||
from main import _build_translation_config, _validate_source_args, build_parser
|
||||
|
||||
|
||||
def test_parser_accepts_lmstudio_flags():
|
||||
@@ -69,3 +69,37 @@ def test_parser_defaults_to_instrumental_only_mix_mode():
|
||||
args = parser.parse_args(["https://youtube.com/watch?v=demo"])
|
||||
|
||||
assert args.mix_mode == DEFAULT_MIX_MODE
|
||||
|
||||
|
||||
def test_parser_accepts_local_input_file_without_url():
|
||||
parser = build_parser()
|
||||
|
||||
args = parser.parse_args(["--input-file", "demo.mp4", "--lang", "fr"])
|
||||
|
||||
assert args.url is None
|
||||
assert args.input_file == "demo.mp4"
|
||||
assert args.lang == "fr"
|
||||
|
||||
|
||||
def test_validate_source_args_rejects_missing_source():
|
||||
parser = build_parser()
|
||||
args = parser.parse_args([])
|
||||
|
||||
try:
|
||||
_validate_source_args(args)
|
||||
except SystemExit as exc:
|
||||
assert "Provide either" in str(exc)
|
||||
else:
|
||||
raise AssertionError("Expected SystemExit for missing source")
|
||||
|
||||
|
||||
def test_validate_source_args_rejects_two_sources():
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(["https://youtube.com/watch?v=demo", "--input-file", "demo.mp4"])
|
||||
|
||||
try:
|
||||
_validate_source_args(args)
|
||||
except SystemExit as exc:
|
||||
assert "not both" in str(exc)
|
||||
else:
|
||||
raise AssertionError("Expected SystemExit for two sources")
|
||||
|
||||
@@ -39,6 +39,20 @@ def test_build_pipeline_command_accepts_optional_settings():
|
||||
assert "--gpu" in command
|
||||
|
||||
|
||||
def test_build_pipeline_command_accepts_uploaded_mp4():
|
||||
command = build_pipeline_command(
|
||||
{
|
||||
"input_file": "C:\\videos\\demo.mp4",
|
||||
"lang": "de",
|
||||
}
|
||||
)
|
||||
|
||||
assert "https://youtube.com/watch?v=demo" not in command
|
||||
assert "--input-file" in command
|
||||
assert command[command.index("--input-file") + 1] == "C:\\videos\\demo.mp4"
|
||||
assert command[command.index("--lang") + 1] == "de"
|
||||
|
||||
|
||||
def test_create_app_builds_gradio_blocks():
|
||||
app = create_app()
|
||||
|
||||
|
||||
50
web_app.py
50
web_app.py
@@ -8,6 +8,7 @@ from datetime import datetime, timezone
|
||||
import json
|
||||
from pathlib import Path
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
@@ -28,6 +29,7 @@ from src.translation import (
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
LOG_DIR = BASE_DIR / "logs" / "gradio"
|
||||
SETTINGS_FILE = BASE_DIR / ".cache" / "web_settings.json"
|
||||
UPLOAD_DIR = BASE_DIR / ".cache" / "uploads"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -107,12 +109,15 @@ def build_pipeline_command(form: dict[str, str | bool]) -> list[str]:
|
||||
command = [
|
||||
sys.executable,
|
||||
str(BASE_DIR / "main.py"),
|
||||
args.url,
|
||||
"--lang",
|
||||
args.lang,
|
||||
"--mix-mode",
|
||||
args.mix_mode,
|
||||
]
|
||||
if args.url:
|
||||
command.insert(2, args.url)
|
||||
if args.input_file:
|
||||
command.extend(["--input-file", args.input_file])
|
||||
if args.translation_backend:
|
||||
command.extend(["--translation-backend", args.translation_backend])
|
||||
|
||||
@@ -135,10 +140,15 @@ def build_pipeline_command(form: dict[str, str | bool]) -> list[str]:
|
||||
|
||||
def _form_to_cli_args(form: dict[str, str | bool]) -> list[str]:
|
||||
url = (form.get("url") or "").strip()
|
||||
if not url:
|
||||
raise ValueError("A YouTube URL is required.")
|
||||
input_file = (form.get("input_file") or "").strip()
|
||||
if not url and not input_file:
|
||||
raise ValueError("A YouTube URL or uploaded MP4 is required.")
|
||||
if url and input_file:
|
||||
raise ValueError("Use either a YouTube URL or uploaded MP4, not both.")
|
||||
|
||||
cli_args = [url]
|
||||
cli_args = [url] if url else []
|
||||
if input_file:
|
||||
cli_args.extend(["--input-file", input_file])
|
||||
field_flags = {
|
||||
"lang": "--lang",
|
||||
"browser": "--browser",
|
||||
@@ -168,6 +178,24 @@ def _form_to_cli_args(form: dict[str, str | bool]) -> list[str]:
|
||||
return cli_args
|
||||
|
||||
|
||||
def _stage_uploaded_mp4(uploaded_file: str | None) -> str:
|
||||
if not uploaded_file:
|
||||
return ""
|
||||
|
||||
source_path = Path(uploaded_file)
|
||||
if source_path.suffix.lower() != ".mp4":
|
||||
raise ValueError("Only MP4 uploads are supported.")
|
||||
if not source_path.exists():
|
||||
raise FileNotFoundError(f"Uploaded file not found: {source_path}")
|
||||
|
||||
safe_stem = "".join(char if char.isalnum() or char in {"-", "_"} else "_" for char in source_path.stem)
|
||||
staged_name = f"{uuid.uuid4().hex[:12]}_{safe_stem or 'upload'}.mp4"
|
||||
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
||||
staged_path = UPLOAD_DIR / staged_name
|
||||
shutil.copy2(source_path, staged_path)
|
||||
return str(staged_path)
|
||||
|
||||
|
||||
def _format_job_status(job: DubJob | None) -> str:
|
||||
if job is None:
|
||||
return "Ready"
|
||||
@@ -234,6 +262,7 @@ def _output_choices() -> list[str]:
|
||||
|
||||
def _start_job(
|
||||
url: str,
|
||||
uploaded_mp4: str | None,
|
||||
lang: str,
|
||||
whisper_model: str,
|
||||
mix_mode: str,
|
||||
@@ -248,8 +277,15 @@ def _start_job(
|
||||
base_url = (lmstudio_base_url or "").strip() or saved_settings["base_url"]
|
||||
api_key = (lmstudio_api_key or "").strip() or saved_settings["api_key"]
|
||||
model = (lmstudio_model or "").strip() or saved_settings["model"]
|
||||
try:
|
||||
input_file = _stage_uploaded_mp4(uploaded_mp4)
|
||||
except (OSError, ValueError) as exc:
|
||||
message = str(exc) or "Invalid uploaded MP4."
|
||||
return "", message, message, gr.update(choices=_output_choices())
|
||||
|
||||
form = {
|
||||
"url": url,
|
||||
"input_file": input_file,
|
||||
"lang": lang,
|
||||
"whisper_model": whisper_model,
|
||||
"mix_mode": mix_mode,
|
||||
@@ -323,6 +359,11 @@ def create_app() -> gr.Blocks:
|
||||
with gr.Row():
|
||||
with gr.Column(scale=5):
|
||||
url = gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=...")
|
||||
uploaded_mp4 = gr.File(
|
||||
label="Upload MP4",
|
||||
file_types=[".mp4"],
|
||||
type="filepath",
|
||||
)
|
||||
with gr.Row():
|
||||
lang = gr.Textbox(label="Target Language", value="es", max_lines=1)
|
||||
whisper_model = gr.Dropdown(
|
||||
@@ -386,6 +427,7 @@ def create_app() -> gr.Blocks:
|
||||
|
||||
inputs = [
|
||||
url,
|
||||
uploaded_mp4,
|
||||
lang,
|
||||
whisper_model,
|
||||
mix_mode,
|
||||
|
||||
Reference in New Issue
Block a user