Files
youtube-auto-dub/main.py

365 lines
12 KiB
Python

#!/usr/bin/env python3
"""YouTube Auto Dub command-line entrypoint."""
from __future__ import annotations
import argparse
import asyncio
import shutil
import time
from src.core_utils import ConfigurationError
from src.translation import TranslationConfig
def build_parser() -> argparse.ArgumentParser:
"""Build the command-line parser."""
parser = argparse.ArgumentParser(
description="YouTube Auto Dub - Automated Video Subtitling",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python main.py "https://youtube.com/watch?v=VIDEO_ID" --lang es
python main.py "https://youtube.com/watch?v=VIDEO_ID" --lang fr --gpu
python main.py "https://youtube.com/watch?v=VIDEO_ID" --lang ja --browser chrome
python main.py "https://youtube.com/watch?v=VIDEO_ID" --whisper_model large-v3
python main.py "https://youtube.com/watch?v=VIDEO_ID" --lmstudio-model gemma-3-4b-it
""",
)
parser.add_argument("url", help="YouTube video URL to subtitle")
parser.add_argument(
"--lang",
"-l",
default="es",
help="Target language ISO code (e.g., es, fr, ja, vi).",
)
parser.add_argument(
"--browser",
"-b",
help="Browser to extract cookies from (chrome, edge, firefox). Close browser first!",
)
parser.add_argument(
"--cookies",
"-c",
help="Path to cookies.txt file (Netscape format) for YouTube authentication",
)
parser.add_argument(
"--gpu",
action="store_true",
help="Use GPU acceleration for Whisper when CUDA is available.",
)
parser.add_argument(
"--whisper_model",
"-wm",
help="Whisper model to use (tiny, base, small, medium, large-v3). Default: auto-select based on VRAM",
)
parser.add_argument(
"--translation-backend",
default="lmstudio",
choices=["lmstudio"],
help="Translation backend to use. Currently only 'lmstudio' is supported.",
)
parser.add_argument(
"--lmstudio-base-url",
help="Override the LM Studio OpenAI-compatible base URL (default: env or http://127.0.0.1:1234/v1).",
)
parser.add_argument(
"--lmstudio-model",
help="Override the LM Studio model name (default: env or gemma-3-4b-it).",
)
return parser
def _check_deps() -> None:
"""Verify critical runtime dependencies."""
from shutil import which
missing = []
if not which("ffmpeg"):
missing.append("ffmpeg")
if not which("ffprobe"):
missing.append("ffprobe")
if missing:
print(f"[!] CRITICAL: Missing dependencies: {', '.join(missing)}")
print(" Please install FFmpeg and add it to your System PATH.")
print(" Download: https://ffmpeg.org/download.html")
raise SystemExit(1)
try:
import torch
print(f"[*] PyTorch {torch.__version__} | CUDA Available: {torch.cuda.is_available()}")
except ImportError:
print("[!] CRITICAL: PyTorch not installed.")
print(" Install with your UV env, for example:")
print(" uv pip install --python .venv\\Scripts\\python.exe -r requirements.txt")
raise SystemExit(1)
def _cleanup() -> None:
"""Clean up the temp directory with retries for Windows file locks."""
import src.engines
max_retries = 5
for attempt in range(max_retries):
try:
if src.engines.TEMP_DIR.exists():
shutil.rmtree(src.engines.TEMP_DIR)
src.engines.TEMP_DIR.mkdir(parents=True, exist_ok=True)
return
except PermissionError:
wait_time = 0.5 * (2 ** attempt)
print(f"[-] File locked (attempt {attempt + 1}/{max_retries}). Retrying in {wait_time}s...")
time.sleep(wait_time)
print(f"[!] WARNING: Could not fully clean temp directory after {max_retries} attempts.")
print(f" Files may persist in: {src.engines.TEMP_DIR}")
def _detect_device() -> str:
"""Detect the best available inference device."""
import torch
if torch.backends.mps.is_available():
return "mps"
if torch.cuda.is_available():
return "cuda"
return "cpu"
def _build_translation_config(args: argparse.Namespace) -> TranslationConfig:
"""Resolve translation configuration from env vars plus CLI overrides."""
return TranslationConfig.from_env(
backend=args.translation_backend,
base_url=args.lmstudio_base_url,
model=args.lmstudio_model,
)
def _get_source_language_hint() -> str:
"""Read an optional source language override from the environment."""
import os
return (os.getenv("SOURCE_LANGUAGE_HINT") or "").strip()
async def _synthesize_dub_audio(engine, chunks, target_lang: str, media_module, temp_dir) -> None:
"""Generate and fit dubbed audio clips for each translated chunk."""
total = len(chunks)
for index, chunk in enumerate(chunks, start=1):
translated_text = chunk.get("trans_text", "").strip()
target_duration = max(0.0, chunk["end"] - chunk["start"])
if not translated_text or target_duration <= 0:
chunk["processed_audio"] = None
continue
raw_audio_path = temp_dir / f"tts_{index:04d}.mp3"
rate = engine.calcRate(
text=translated_text,
target_dur=target_duration,
original_text=chunk.get("text", ""),
)
await engine.synthesize(
text=translated_text,
target_lang=target_lang,
out_path=raw_audio_path,
rate=rate,
)
chunk["processed_audio"] = media_module.fit_audio(raw_audio_path, target_duration)
if index == 1 or index % 10 == 0 or index == total:
print(f"[-] Dub synthesis progress: {index}/{total}")
def main() -> None:
"""Run the full YouTube Auto Dub pipeline."""
parser = build_parser()
args = parser.parse_args()
import src.engines
import src.media
import src.youtube
print("\n" + "=" * 60)
print("YOUTUBE AUTO SUB - INITIALIZING")
print("=" * 60)
_check_deps()
try:
translation_config = _build_translation_config(args)
except ConfigurationError as exc:
print(f"[!] INVALID TRANSLATION CONFIG: {exc}")
raise SystemExit(1) from exc
_cleanup()
device = _detect_device()
print(f"[*] Using device: {device.upper()}")
print(f"[*] Translation backend: {translation_config.backend}")
print(f"[*] LM Studio endpoint: {translation_config.base_url}")
print(f"[*] LM Studio model: {translation_config.model}")
if args.whisper_model:
src.engines.ASR_MODEL = args.whisper_model
print(f"[*] Using specified Whisper model: {args.whisper_model}")
else:
print(f"[*] Auto-selected Whisper model: {src.engines.ASR_MODEL} (based on VRAM)")
try:
source_language_hint = _get_source_language_hint()
if source_language_hint:
print(f"[*] Source language hint: {source_language_hint}")
engine = src.engines.Engine(
device,
translation_config=translation_config,
source_language_hint=source_language_hint,
)
print(f"\n{'=' * 60}")
print("STEP 1: DOWNLOADING CONTENT")
print(f"{'=' * 60}")
print(f"[*] Target URL: {args.url}")
print(f"[*] Target Language: {args.lang.upper()}")
try:
video_path = src.youtube.downloadVideo(
args.url,
browser=args.browser,
cookies_file=args.cookies,
)
audio_path = src.youtube.downloadAudio(
args.url,
browser=args.browser,
cookies_file=args.cookies,
)
print(f"[+] Video downloaded: {video_path}")
print(f"[+] Audio extracted: {audio_path}")
except Exception as exc:
print(f"\n[!] DOWNLOAD FAILED: {exc}")
print("\n[-] TROUBLESHOOTING TIPS:")
print(" 1. Close all browser windows if using --browser")
print(" 2. Export fresh cookies.txt and use --cookies")
print(" 3. Check if video is private/region-restricted")
print(" 4. Verify YouTube URL is correct")
return
print(f"\n{'=' * 60}")
print("STEP 2: SPEECH TRANSCRIPTION")
print(f"{'=' * 60}")
print(f"[*] Transcribing audio with Whisper ({src.engines.ASR_MODEL})...")
raw_segments = engine.transcribeSafe(audio_path)
print(f"[+] Transcription complete: {len(raw_segments)} segments")
if raw_segments:
print(f"[*] Sample segment: '{raw_segments[0]['text'][:50]}...'")
print(f"\n{'=' * 60}")
print("STEP 3: INTELLIGENT CHUNKING")
print(f"{'=' * 60}")
chunks = src.engines.smartChunk(raw_segments)
print(f"[+] Optimized {len(raw_segments)} raw segments into {len(chunks)} chunks")
print(f"[*] Average chunk duration: {sum(c['end'] - c['start'] for c in chunks) / len(chunks):.2f}s")
print(f"\n{'=' * 60}")
print(f"STEP 4: TRANSLATION ({args.lang.upper()})")
print(f"{'=' * 60}")
texts = [chunk["text"] for chunk in chunks]
print(f"[*] Translating {len(texts)} text segments...")
translated_texts = engine.translateSafe(texts, args.lang)
for index, chunk in enumerate(chunks):
chunk["trans_text"] = translated_texts[index]
print("[+] Translation complete")
if chunks:
original = chunks[0]["text"][:50]
translated = chunks[0]["trans_text"][:50]
print(f"[*] Sample: '{original}' -> '{translated}'")
print(f"\n{'=' * 60}")
print("STEP 5: DUB AUDIO SYNTHESIS")
print(f"{'=' * 60}")
print(f"[*] Synthesizing dubbed speech for {len(chunks)} translated chunks...")
asyncio.run(_synthesize_dub_audio(engine, chunks, args.lang, src.media, src.engines.TEMP_DIR))
concat_manifest_path = src.engines.TEMP_DIR / "dub_audio_manifest.txt"
silence_ref_path = src.engines.TEMP_DIR / "silence_ref.wav"
src.media.create_concat_file(chunks, silence_ref_path, concat_manifest_path)
print(f"[+] Dub audio manifest generated: {concat_manifest_path}")
print(f"\n{'=' * 60}")
print("STEP 6: SUBTITLE GENERATION")
print(f"{'=' * 60}")
subtitle_path = src.engines.TEMP_DIR / "subtitles.srt"
src.media.generate_srt(chunks, subtitle_path)
print(f"[+] Subtitles generated: {subtitle_path}")
print(f"\n{'=' * 60}")
print("STEP 7: FINAL VIDEO RENDERING")
print(f"{'=' * 60}")
try:
video_name = video_path.stem
output_name = f"dubbed_{args.lang}_{video_name}.mp4"
final_output = src.engines.OUTPUT_DIR / output_name
print("[*] Rendering final video with dubbed audio and subtitles...")
print(f" Source: {video_path}")
print(f" Output: {final_output}")
print(f" Dub audio manifest: {concat_manifest_path}")
print(f" Subtitles: {subtitle_path}")
src.media.render_video(
video_path,
concat_manifest_path,
final_output,
subtitle_path=subtitle_path,
)
if final_output.exists():
file_size = final_output.stat().st_size / (1024 * 1024)
print("\n[+] SUCCESS! Video rendered successfully.")
print(f" Output: {final_output}")
print(f" Size: {file_size:.1f} MB")
else:
print(f"\n[!] ERROR: Output file not created at {final_output}")
except Exception as exc:
print(f"\n[!] RENDERING FAILED: {exc}")
print("[-] This may be due to:")
print(" 1. Corrupted audio chunks")
print(" 2. FFmpeg compatibility issues")
print(" 3. Insufficient disk space")
return
finally:
if "engine" in locals():
engine.translator.close()
print(f"\n{'=' * 60}")
print("YOUTUBE AUTO SUB - PIPELINE COMPLETE")
print(f"{'=' * 60}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[!] Process interrupted by user")
raise SystemExit(1)
except Exception as exc:
print(f"\n[!] UNEXPECTED ERROR: {exc}")
print("[-] Please report this issue with the full error message")
raise SystemExit(1) from exc