baseline: initial working version

This commit is contained in:
2026-03-30 18:18:41 +01:00
commit 27cfe2a3f5
19 changed files with 3878 additions and 0 deletions

4
src/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""YouTube Auto Dub - Automated Video Translation and Dubbing"""
__version__ = "1.0.0"
__author__ = "Nguyen Cong Thuan Huy (mangodxd)"

181
src/core_utils.py Normal file
View File

@@ -0,0 +1,181 @@
"""Core utilities and exceptions for YouTube Auto Sub.
This module consolidates shared utilities, exceptions, and helper functions
used across the entire pipeline to reduce code duplication.
Author: Nguyen Cong Thuan Huy (mangodxd)
Version: 1.0.0
"""
import subprocess
import time
import traceback
from pathlib import Path
from typing import Dict, List, Optional, Union
class YouTubeAutoSubError(Exception):
"""Base exception for all YouTube Auto Sub errors."""
pass
class ModelLoadError(YouTubeAutoSubError):
"""Raised when AI/ML model fails to load."""
pass
class AudioProcessingError(YouTubeAutoSubError):
"""Raised when audio processing operations fail."""
pass
class TranscriptionError(YouTubeAutoSubError):
"""Raised when speech transcription fails."""
pass
class TranslationError(YouTubeAutoSubError):
"""Raised when text translation fails."""
pass
class TTSError(YouTubeAutoSubError):
"""Raised when text-to-speech synthesis fails."""
pass
class VideoProcessingError(YouTubeAutoSubError):
"""Raised when video processing operations fail."""
pass
class ConfigurationError(YouTubeAutoSubError):
"""Raised when configuration is invalid or missing."""
pass
class DependencyError(YouTubeAutoSubError):
"""Raised when required dependencies are missing."""
pass
class ValidationError(YouTubeAutoSubError):
"""Raised when input validation fails."""
pass
class ResourceError(YouTubeAutoSubError):
"""Raised when system resources are insufficient."""
pass
def _handleError(error: Exception, context: str = "") -> None:
"""Centralized error handling with context.
Args:
error: The exception that occurred.
context: Additional context about where the error occurred.
Returns:
None
"""
if context:
print(f"[!] ERROR in {context}: {error}")
else:
print(f"[!] ERROR: {error}")
print(f" Full traceback: {traceback.format_exc()}")
def _runFFmpegCmd(cmd: List[str], timeout: int = 300, description: str = "FFmpeg operation") -> None:
"""Run FFmpeg command with consistent error handling.
Args:
cmd: FFmpeg command to run.
timeout: Command timeout in seconds.
description: Description for error messages.
Raises:
RuntimeError: If FFmpeg command fails.
"""
try:
subprocess.run(cmd, check=True, timeout=timeout)
except subprocess.TimeoutExpired:
raise RuntimeError(f"{description} timed out")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"{description} failed: {e}")
except Exception as e:
raise RuntimeError(f"Unexpected error during {description}: {e}")
def _validateAudioFile(file_path: Path, min_size: int = 1024) -> bool:
"""Validate that audio file exists and has minimum size.
Args:
file_path: Path to audio file.
min_size: Minimum file size in bytes.
Returns:
True if file is valid, False otherwise.
"""
if not file_path.exists():
return False
if file_path.stat().st_size < min_size:
return False
return True
def _safeFileDelete(file_path: Path) -> None:
"""Safely delete file with error handling.
Args:
file_path: Path to file to delete.
Returns:
None
"""
try:
if file_path.exists():
file_path.unlink()
except Exception as e:
print(f"[!] WARNING: Could not delete file {file_path}: {e}")
class ProgressTracker:
"""Simple progress tracking for long operations."""
def __init__(self, total: int, description: str = "Processing", update_interval: int = 10):
"""Initialize progress tracker.
Args:
total: Total number of items to process.
description: Description for progress messages.
update_interval: How often to update progress (every N items).
"""
self.total = total
self.description = description
self.update_interval = update_interval
self.current = 0
def update(self, increment: int = 1) -> None:
"""Update progress counter.
Args:
increment: Number of items processed.
Returns:
None
"""
self.current += increment
if self.current % self.update_interval == 0 or self.current >= self.total:
progress = (self.current / self.total) * 100
print(f"[-] {self.description}: {self.current}/{self.total} ({progress:.1f}%)", end='\r')
if self.current >= self.total:
print()

547
src/engines.py Normal file
View File

@@ -0,0 +1,547 @@
"""
AI/ML Engines Module for YouTube Auto Dub.
This module provides the core AI/ML functionality including:
- Device and configuration management
- Whisper-based speech transcription
- LM Studio translation integration
- Edge TTS synthesis
- Pipeline orchestration and chunking
Author: Nguyen Cong Thuan Huy (mangodxd)
Version: 1.0.0
"""
import torch
import asyncio
import edge_tts
import gc
import json
import os
from abc import ABC
import numpy as np
from pathlib import Path
from typing import List, Dict, Optional, Union, Any
# Local imports
from src.core_utils import (
ModelLoadError, TranscriptionError, TranslationError, TTSError,
AudioProcessingError, _handleError, _runFFmpegCmd, ProgressTracker,
_validateAudioFile, _safeFileDelete
)
from src.translation import LMStudioTranslator, TranslationConfig
# =============================================================================
# CONFIGURATION
# =============================================================================
# Base directory of the project
BASE_DIR = Path(__file__).resolve().parent.parent
# Working directories
CACHE_DIR = BASE_DIR / ".cache"
OUTPUT_DIR = BASE_DIR / "output"
TEMP_DIR = BASE_DIR / "temp"
# Configuration files
LANG_MAP_FILE = BASE_DIR / "language_map.json"
# Ensure directories exist
for directory_path in [CACHE_DIR, OUTPUT_DIR, TEMP_DIR]:
directory_path.mkdir(parents=True, exist_ok=True)
# Audio processing settings
SAMPLE_RATE = 24000
AUDIO_CHANNELS = 1
def _select_optimal_whisper_model(device: str = "cpu") -> str:
"""Select optimal Whisper model based on available VRAM and device.
Args:
device: Device type ('cuda' or 'cpu').
Returns:
Optimal Whisper model name.
"""
if device == "cpu":
return "base" # CPU works best with base model
try:
import torch
if not torch.cuda.is_available():
return "base"
# Get VRAM information
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3) # GB
if gpu_memory < 4:
return "tiny" # < 4GB VRAM
elif gpu_memory < 8:
return "base" # 4-8GB VRAM
elif gpu_memory < 12:
return "small" # 8-12GB VRAM
elif gpu_memory < 16:
return "medium" # 12-16GB VRAM
else:
return "large-v3" # > 16GB VRAM - use latest large model
except Exception:
return "base" # Fallback to base if detection fails
ASR_MODEL = _select_optimal_whisper_model(device="cuda" if torch.cuda.is_available() else "cpu")
DEFAULT_VOICE = "en-US-AriaNeural"
# Load language configuration
try:
with open(LANG_MAP_FILE, "r", encoding="utf-8") as f:
LANG_DATA = json.load(f)
print(f"[*] Loaded language configuration for {len(LANG_DATA)} languages")
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"[!] WARNING: Could not load language map from {LANG_MAP_FILE}")
LANG_DATA = {}
class DeviceManager:
"""Centralized device detection and management."""
def __init__(self, device: Optional[str] = None):
"""Initialize device manager.
Args:
device: Device type ('cuda' or 'cpu'). If None, auto-detects.
"""
if device is None:
if torch.backends.mps.is_available(): #macOS
device = "mps"
elif torch.cuda.is_available():
device = "cuda"
else:
device = "cpu"
self.device = device
self._logDeviceInfo()
def _logDeviceInfo(self) -> None:
"""Log device information to console.
Args:
None
Returns:
None
"""
print(f"[*] Device initialized: {self.device.upper()}")
if self.device == "cuda":
gpu_name = torch.cuda.get_device_name(0)
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
print(f" GPU: {gpu_name} | VRAM: {gpu_memory:.1f} GB")
def getMemoryInfo(self) -> Dict[str, float]:
"""Get GPU memory usage information.
Args:
None
Returns:
Dictionary with allocated and reserved memory in GB.
"""
if self.device != "cuda":
return {"allocated": 0.0, "reserved": 0.0}
return {
"allocated": torch.cuda.memory_allocated(0) / (1024**3),
"reserved": torch.cuda.memory_reserved(0) / (1024**3)
}
def clearCache(self) -> None:
"""Clear GPU cache and run garbage collection.
Args:
None
Returns:
None
"""
if self.device == "cuda":
torch.cuda.empty_cache()
gc.collect()
class ConfigManager:
"""Centralized configuration access with validation."""
def getLanguageConfig(self, lang_code: str) -> Dict[str, Any]:
"""Get language configuration by language code.
Args:
lang_code: ISO language code.
Returns:
Language configuration dictionary.
"""
return LANG_DATA.get(lang_code, {})
def extractVoice(self, voice_data, fallback_gender: str = "female") -> str:
"""Extract voice string from various data formats.
Args:
voice_data: Voice data in list, string, or other format.
fallback_gender: Default gender to use if extraction fails.
Returns:
Voice string for TTS.
"""
if isinstance(voice_data, list):
return voice_data[0] if voice_data else DEFAULT_VOICE
if isinstance(voice_data, str):
return voice_data
return DEFAULT_VOICE
def getVoicePool(self, lang_code: str, gender: str) -> list:
"""Get pool of available voices for language and gender.
Args:
lang_code: ISO language code.
gender: Voice gender (male/female).
Returns:
List of available voice strings.
"""
lang_config = self.getLanguageConfig(lang_code)
voices = lang_config.get('voices', {})
pool = voices.get(gender, [DEFAULT_VOICE])
if isinstance(pool, str):
pool = [pool]
return pool
class PipelineComponent(ABC):
"""Base class for pipeline components with shared utilities."""
def __init__(self, device_manager: DeviceManager, config_manager: ConfigManager):
"""Initialize pipeline component.
Args:
device_manager: Device management instance.
config_manager: Configuration management instance.
"""
self.device_manager = device_manager
self.config_manager = config_manager
self.device = device_manager.device
def _validateFileExists(self, file_path: Path, description: str = "File") -> None:
"""Validate that a file exists.
Args:
file_path: Path to validate.
description: Description for error messages.
Raises:
FileNotFoundError: If file doesn't exist.
"""
if not file_path.exists():
raise FileNotFoundError(f"{description} not found: {file_path}")
def _ensureDirectory(self, directory: Path) -> None:
"""Ensure directory exists, create if necessary.
Args:
directory: Directory path to ensure exists.
Returns:
None
"""
directory.mkdir(parents=True, exist_ok=True)
# =============================================================================
# MAIN AI/ML ENGINE
# =============================================================================
class Engine(PipelineComponent):
"""Central AI/ML engine for YouTube Auto Dub pipeline."""
def __init__(
self,
device: Optional[str] = None,
translation_config: Optional[TranslationConfig] = None,
source_language_hint: Optional[str] = None,
):
device_manager = DeviceManager(device)
config_manager = ConfigManager()
super().__init__(device_manager, config_manager)
self._asr = None
self.source_language_hint = (source_language_hint or os.getenv("SOURCE_LANGUAGE_HINT") or "").strip()
self.detected_source_lang = self.source_language_hint or "auto"
self.translation_config = translation_config or TranslationConfig.from_env()
self.translator = LMStudioTranslator(self.translation_config)
print(f"[+] AI Engine initialized successfully")
@property
def asrModel(self):
"""Lazy-load Whisper ASR model.
Returns:
Loaded Whisper model instance.
Raises:
ModelLoadError: If model fails to load.
"""
if not self._asr:
print(f"[*] Loading Whisper model ({ASR_MODEL}) on {self.device}...")
try:
from faster_whisper import WhisperModel
compute_type = "float16" if self.device == "cuda" else "int8"
self._asr = WhisperModel(ASR_MODEL, device=self.device, compute_type=compute_type)
print(f"[+] Whisper model loaded successfully")
except Exception as e:
raise ModelLoadError(f"Failed to load Whisper model: {e}") from e
return self._asr
def _getLangConfig(self, lang: str) -> Dict:
"""Get language configuration.
Args:
lang: Language code.
Returns:
Language configuration dictionary.
"""
return self.config_manager.getLanguageConfig(lang)
def _extractVoiceString(self, voice_data: Union[str, List[str], None]) -> str:
"""Extract voice string from data.
Args:
voice_data: Voice data in various formats.
Returns:
Voice string for TTS.
"""
return self.config_manager.extractVoice(voice_data)
def releaseMemory(self, component: Optional[str] = None) -> None:
"""Release VRAM and clean up GPU memory.
Args:
component: Specific component to release ('asr').
If None, releases all components.
Returns:
None
"""
if component in [None, 'asr'] and self._asr:
del self._asr
self._asr = None
print("[*] ASR VRAM cleared")
self.device_manager.clearCache()
def transcribeSafe(self, audio_path: Path) -> List[Dict]:
"""Transcribe audio with automatic memory management.
Args:
audio_path: Path to audio file.
Returns:
List of transcription segments with timing.
Raises:
TranscriptionError: If transcription fails.
"""
try:
res = self.transcribe(audio_path)
self.releaseMemory('asr')
return res
except Exception as e:
_handleError(e, "transcription")
raise TranscriptionError(f"Transcription failed: {e}") from e
def translateSafe(self, texts: List[str], target_lang: str) -> List[str]:
"""Translate texts safely with memory management.
Args:
texts: List of text strings to translate.
target_lang: Target language code.
Returns:
List of translated text strings.
"""
self.releaseMemory()
return self.translate(texts, target_lang)
def transcribe(self, audio_path: Path) -> List[Dict]:
"""Transcribe audio using Whisper model.
Args:
audio_path: Path to audio file.
Returns:
List of transcription segments with start/end times and text.
"""
segments, info = self.asrModel.transcribe(str(audio_path), word_timestamps=False, language=None)
detected = getattr(info, "language", "auto") or "auto"
self.detected_source_lang = self.source_language_hint or detected
print(f"[*] Detected source language: {self.detected_source_lang}")
return [{'start': s.start, 'end': s.end, 'text': s.text.strip()} for s in segments]
def translate(self, texts: List[str], target_lang: str) -> List[str]:
"""Translate texts to target language.
Args:
texts: List of text strings to translate.
target_lang: Target language code.
Returns:
List of translated text strings.
Raises:
TranslationError: If translation fails.
"""
if not texts: return []
print(f"[*] Translating {len(texts)} segments to '{target_lang}'...")
source_lang = self.detected_source_lang or "auto"
try:
return self.translator.translate_segments(
texts=texts,
target_language=target_lang,
source_language=source_lang,
)
except Exception as e:
_handleError(e, "translation")
raise TranslationError(f"Translation failed: {e}") from e
def calcRate(self, text: str, target_dur: float, original_text: str = "") -> str:
"""Calculate speech rate adjustment for TTS with dynamic limits.
Args:
text: Text to be synthesized (translated text).
target_dur: Target duration in seconds.
original_text: Original text for length comparison (optional).
Returns:
Rate adjustment string (e.g., '+10%', '-5%').
"""
words = len(text.split())
if words == 0 or target_dur <= 0: return "+0%"
# Base calculation
wps = words / target_dur
estimated_time = words / wps
if estimated_time <= target_dur:
return "+0%"
ratio = estimated_time / target_dur
speed_percent = int((ratio - 1) * 100)
# Dynamic speed limits based on text length comparison
if original_text:
orig_len = len(original_text.split())
trans_len = words
# If translated text is significantly longer, allow more slowdown
if trans_len > orig_len * 1.5:
# Allow up to -25% slowdown for longer translations
speed_percent = max(-25, min(speed_percent, 90))
elif trans_len < orig_len * 0.7:
# If translation is shorter, be more conservative with speedup
speed_percent = max(-15, min(speed_percent, 50))
else:
# Normal case: -10% to 90%
speed_percent = max(-10, min(speed_percent, 90))
else:
# Fallback to original limits
speed_percent = max(-10, min(speed_percent, 90))
return f"{speed_percent:+d}%"
async def synthesize(
self,
text: str,
target_lang: str,
out_path: Path,
gender: str = "female",
rate: str = "+0%"
) -> None:
if not text.strip(): raise ValueError("Text empty")
out_path.parent.mkdir(parents=True, exist_ok=True)
try:
lang_cfg = self._getLangConfig(target_lang)
voice_pool = self.config_manager.getVoicePool(target_lang, gender)
voice = voice_pool[0] if voice_pool else DEFAULT_VOICE
communicate = edge_tts.Communicate(text, voice=voice, rate=rate)
await communicate.save(str(out_path))
if not out_path.exists() or out_path.stat().st_size < 1024:
raise RuntimeError("TTS file invalid")
except Exception as e:
if out_path.exists(): out_path.unlink(missing_ok=True)
_handleError(e, "TTS synthesis")
raise TTSError(f"TTS failed: {e}") from e
def smartChunk(segments: List[Dict]) -> List[Dict]:
n = len(segments)
if n == 0: return []
# Calculate segment durations and gaps for dynamic analysis
durations = [s['end'] - s['start'] for s in segments]
gaps = [segments[i]['start'] - segments[i-1]['end'] for i in range(1, n)]
# Dynamic parameters based on actual video content
avg_seg_dur = sum(durations) / n
avg_gap = sum(gaps) / len(gaps) if gaps else 0.5
# Dynamic min/max duration based on content characteristics
min_dur = max(1.0, avg_seg_dur * 0.5) # Minimum 1s, or 50% of average
max_dur = np.percentile(durations, 90) if n > 5 else min(15.0, avg_seg_dur * 3)
max_dur = max(5.0, min(30.0, max_dur)) # Clamp between 5-30 seconds
# Hard threshold for gap-based splitting (1.5x average gap)
gap_threshold = max(0.4, avg_gap * 1.5)
path = []
curr_chunk_segs = [segments[0]]
for i in range(1, n):
prev = segments[i-1]
curr = segments[i]
gap = curr['start'] - prev['end']
# Dynamic splitting criteria:
# 1. Gap exceeds threshold (natural pause)
# 2. Current chunk exceeds safe duration
# 3. Dynamic lookback: consider context but don't go too far back
current_dur = curr['end'] - curr_chunk_segs[0]['start']
if gap > gap_threshold or current_dur > max_dur:
# Close current chunk
path.append({
'start': curr_chunk_segs[0]['start'],
'end': curr_chunk_segs[-1]['end'],
'text': " ".join(s['text'] for s in curr_chunk_segs).strip()
})
curr_chunk_segs = [curr]
else:
curr_chunk_segs.append(curr)
# Add final chunk
if curr_chunk_segs:
path.append({
'start': curr_chunk_segs[0]['start'],
'end': curr_chunk_segs[-1]['end'],
'text': " ".join(s['text'] for s in curr_chunk_segs).strip()
})
print(f"[+] Smart chunking: {len(path)} chunks (Dynamic: min={min_dur:.1f}s, max={max_dur:.1f}s, gap_thr={gap_threshold:.2f}s)")
return path

410
src/media.py Normal file
View File

@@ -0,0 +1,410 @@
"""Media Processing Module for YouTube Auto Dub.
This module handles all audio/video processing operations using FFmpeg.
It provides functionality for:
- Audio duration detection and analysis
- Silence generation for gap filling
- Audio time-stretching and duration fitting (PADDING logic added)
- Video concatenation and rendering (Volume Mixing fixed)
- Audio synchronization and mixing
Author: Nguyen Cong Thuan Huy (mangodxd)
Version: 1.1.0 (Patched)
"""
import subprocess
from pathlib import Path
from typing import List, Dict, Optional
from src.engines import SAMPLE_RATE, AUDIO_CHANNELS
def _build_subtitle_filter(subtitle_path: Path) -> str:
"""Build a Windows-safe FFmpeg subtitles filter expression."""
escaped_path = str(subtitle_path.resolve()).replace("\\", "/").replace(":", "\\:")
return f"subtitles=filename='{escaped_path}'"
def _render_with_soft_subtitles(video_path: Path, output_path: Path, subtitle_path: Path) -> None:
"""Fallback render path that muxes subtitles instead of hard-burning them."""
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-i', str(video_path),
'-i', str(subtitle_path),
'-map', '0:v',
'-map', '0:a?',
'-map', '1:0',
'-c:v', 'copy',
'-c:a', 'copy',
'-c:s', 'mov_text',
str(output_path)
]
subprocess.run(cmd, check=True, timeout=None)
def _render_mixed_with_soft_subtitles(
video_path: Path,
concat_file: Path,
output_path: Path,
subtitle_path: Path,
filter_complex: str,
) -> None:
"""Fallback render path that muxes subtitles while preserving mixed dubbed audio."""
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-i', str(video_path),
'-f', 'concat', '-safe', '0', '-i', str(concat_file),
'-i', str(subtitle_path),
'-filter_complex', filter_complex,
'-map', '0:v',
'-map', '[outa]',
'-map', '2:0',
'-c:v', 'copy',
'-c:a', 'aac', '-b:a', '192k',
'-ar', str(SAMPLE_RATE),
'-ac', str(AUDIO_CHANNELS),
'-c:s', 'mov_text',
'-shortest',
str(output_path),
]
subprocess.run(cmd, check=True, timeout=None)
def _get_duration(path: Path) -> float:
"""Get the duration of an audio/video file using FFprobe."""
if not path.exists():
print(f"[!] ERROR: Media file not found: {path}")
return 0.0
try:
cmd = [
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
str(path)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
timeout=60 # Increased from 30s to 60s for better reliability
)
duration_str = result.stdout.strip()
if duration_str:
return float(duration_str)
else:
return 0.0
except Exception as e:
print(f"[!] ERROR: Getting duration failed for {path}: {e}")
return 0.0
def _generate_silence_segment(duration: float, silence_ref: Path) -> Optional[Path]:
"""Generate a small silence segment for the concat list."""
if duration <= 0:
return None
# Use the parent folder of the reference silence file
output_path = silence_ref.parent / f"gap_{duration:.4f}.wav"
if output_path.exists():
return output_path
try:
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-f', 'lavfi', '-i', f'anullsrc=r={SAMPLE_RATE}:cl=mono',
'-t', f"{duration:.4f}",
'-c:a', 'pcm_s16le',
str(output_path)
]
subprocess.run(cmd, check=True)
return output_path
except Exception:
return None
def _analyze_audio_loudness(audio_path: Path) -> Optional[float]:
"""Analyze audio loudness using FFmpeg volumedetect filter.
Args:
audio_path: Path to audio file to analyze.
Returns:
Mean volume in dB, or None if analysis fails.
"""
if not audio_path.exists():
return None
try:
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-i', str(audio_path),
'-filter:a', 'volumedetect',
'-f', 'null', '-'
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=30)
# Parse mean volume from output
for line in result.stderr.split('\n'):
if 'mean_volume:' in line:
# Extract dB value from line like: "mean_volume: -15.2 dB"
parts = line.split()
if len(parts) >= 2:
try:
return float(parts[1])
except ValueError:
continue
return None
except Exception:
return None
def fit_audio(audio_path: Path, target_dur: float) -> Path:
if not audio_path.exists() or target_dur <= 0:
return audio_path
actual_dur = _get_duration(audio_path)
if actual_dur == 0.0:
return audio_path
out_path = audio_path.parent / f"{audio_path.stem}_fit.wav"
# Increased tolerance from 0.05s to 0.15s for more natural audio
if actual_dur > target_dur + 0.15:
ratio = actual_dur / target_dur
filter_chain = []
current_ratio = ratio
# Dynamic speed limit: max 1.5x instead of 2.0x to avoid chipmunk effect
max_speed_ratio = 1.5
while current_ratio > max_speed_ratio:
filter_chain.append(f"atempo={max_speed_ratio}")
current_ratio /= max_speed_ratio
if current_ratio > 1.0:
filter_chain.append(f"atempo={current_ratio:.4f}")
filter_complex = ",".join(filter_chain)
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-i', str(audio_path),
'-filter:a', f"{filter_complex},aresample=24000",
'-t', f"{target_dur:.4f}",
'-c:a', 'pcm_s16le',
str(out_path)
]
else:
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-i', str(audio_path),
'-filter:a', f"apad,aresample=24000",
'-t', f"{target_dur:.4f}",
'-c:a', 'pcm_s16le',
str(out_path)
]
print(f"Fiting {actual_dur:.4f}s to {target_dur:.4f}s")
try:
subprocess.run(cmd, check=True, timeout=120)
return out_path
except Exception:
return audio_path
def create_concat_file(segments: List[Dict], silence_ref: Path, output_txt: Path) -> None:
if not segments:
return
try:
with open(output_txt, 'w', encoding='utf-8') as f:
current_timeline = 0.0
for segment in segments:
start_time = segment['start']
end_time = segment['end']
audio_path = segment.get('processed_audio')
gap = start_time - current_timeline
if gap > 0.01:
silence_gap = _generate_silence_segment(gap, silence_ref)
if silence_gap:
f.write(f"file '{silence_gap.resolve().as_posix()}'\n")
current_timeline += gap
if audio_path and audio_path.exists():
f.write(f"file '{audio_path.resolve().as_posix()}'\n")
current_timeline += (end_time - start_time)
else:
dur = end_time - start_time
silence_err = _generate_silence_segment(dur, silence_ref)
if silence_err:
f.write(f"file '{silence_err.resolve().as_posix()}'\n")
current_timeline += dur
except Exception as e:
raise RuntimeError(f"Failed to create concat manifest: {e}")
def render_video(
video_path: Path,
concat_file: Optional[Path],
output_path: Path,
subtitle_path: Optional[Path] = None,
) -> None:
"""Render final video with Dynamic Volume Mixing."""
if not video_path.exists():
raise FileNotFoundError("Source video for rendering is missing")
if concat_file is not None and not concat_file.exists():
raise FileNotFoundError("Concat audio manifest for rendering is missing")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
print(f"[*] Rendering final video...")
if concat_file is None:
video_codec = 'copy'
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-i', str(video_path),
'-map', '0:v',
'-map', '0:a?',
]
if subtitle_path:
video_codec = 'libx264'
cmd.extend(['-vf', _build_subtitle_filter(subtitle_path)])
cmd.extend([
'-c:v', video_codec,
'-c:a', 'copy',
])
cmd.append(str(output_path))
try:
subprocess.run(cmd, check=True, timeout=None, capture_output=True, text=True)
except subprocess.CalledProcessError as exc:
if subtitle_path and "No such filter: 'subtitles'" in (exc.stderr or ""):
print("[!] FFmpeg subtitles filter is unavailable. Falling back to soft subtitles.")
_render_with_soft_subtitles(video_path, output_path, subtitle_path)
else:
raise
if not output_path.exists():
raise RuntimeError("Output file not created")
print(f"[+] Video rendered successfully: {output_path}")
return
# DYNAMIC VOLUME MIXING STRATEGY:
# Analyze original audio loudness to determine optimal background volume
original_loudness = _analyze_audio_loudness(video_path)
if original_loudness is not None:
# Calculate background volume based on loudness analysis
# Target: voice should be 10-15dB louder than background
if original_loudness > -10: # Very loud audio
bg_volume = 0.08 # 8% - reduce more for loud content
elif original_loudness > -20: # Normal audio
bg_volume = 0.15 # 15% - standard reduction
else: # Quiet audio
bg_volume = 0.25 # 25% - reduce less for quiet content
print(f"[*] Dynamic volume mixing: original={original_loudness:.1f}dB, bg_volume={bg_volume*100:.0f}%")
else:
# Fallback to default if analysis fails
bg_volume = 0.15
print(f"[*] Using default volume mixing: bg_volume={bg_volume*100:.0f}%")
filter_complex = (
f"[0:a]volume={bg_volume}[bg]; "
"[bg][1:a]amix=inputs=2:duration=first:dropout_transition=0[outa]"
)
video_codec = 'copy'
cmd = [
'ffmpeg', '-y', '-v', 'error',
'-i', str(video_path),
'-f', 'concat', '-safe', '0', '-i', str(concat_file),
'-filter_complex', filter_complex,
]
# Handle Hard Subtitles (Requires re-encoding)
if subtitle_path:
video_codec = 'libx264'
cmd.extend(['-vf', _build_subtitle_filter(subtitle_path)])
cmd.extend([
'-map', '0:v',
'-map', '[outa]',
'-c:v', video_codec,
'-c:a', 'aac', '-b:a', '192k',
'-ar', str(SAMPLE_RATE),
'-ac', str(AUDIO_CHANNELS),
'-shortest'
])
cmd.append(str(output_path))
# Run rendering
try:
subprocess.run(cmd, check=True, timeout=None, capture_output=True, text=True)
except subprocess.CalledProcessError as exc:
if subtitle_path and "No such filter: 'subtitles'" in (exc.stderr or ""):
print("[!] FFmpeg subtitles filter is unavailable. Falling back to soft subtitles.")
_render_mixed_with_soft_subtitles(
video_path=video_path,
concat_file=concat_file,
output_path=output_path,
subtitle_path=subtitle_path,
filter_complex=filter_complex,
)
else:
raise
if not output_path.exists():
raise RuntimeError("Output file not created")
print(f"[+] Video rendered successfully: {output_path}")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"FFmpeg rendering failed: {e}")
except Exception as e:
raise RuntimeError(f"Rendering error: {e}")
def generate_srt(segments: List[Dict], output_path: Path) -> None:
"""Generate SRT subtitle file."""
if not segments: return
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(output_path, 'w', encoding='utf-8') as f:
for i, segment in enumerate(segments, 1):
start = _format_timestamp_srt(segment['start'])
end = _format_timestamp_srt(segment['end'])
text = segment.get('trans_text', '').strip()
f.write(f"{i}\n{start} --> {end}\n{text}\n\n")
print(f"[+] SRT subtitles generated")
except Exception as e:
print(f"[!] Warning: SRT generation failed: {e}")
def _format_timestamp_srt(seconds: float) -> str:
"""Convert seconds to HH:MM:SS,mmm."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"

358
src/translation.py Normal file
View File

@@ -0,0 +1,358 @@
"""LM Studio translation client for YouTube Auto Dub."""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
import httpx
from src.core_utils import ConfigurationError, TranslationError
DEFAULT_LM_STUDIO_BASE_URL = "http://127.0.0.1:1234/v1"
DEFAULT_LM_STUDIO_API_KEY = "lm-studio"
DEFAULT_LM_STUDIO_MODEL = "gemma-3-4b-it"
DEFAULT_TRANSLATION_BACKEND = "lmstudio"
def _normalize_base_url(base_url: str) -> str:
"""Normalize LM Studio base URLs to the OpenAI-compatible /v1 root."""
if not base_url or not isinstance(base_url, str):
raise ConfigurationError("LM Studio base URL must be a non-empty string.")
normalized = base_url.strip().rstrip("/")
if normalized.endswith("/chat/completions"):
normalized = normalized[: -len("/chat/completions")]
if not normalized.endswith("/v1"):
normalized = f"{normalized}/v1"
parsed = urlparse(normalized)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ConfigurationError(
"LM Studio base URL must be a valid http(s) URL, for example "
"'http://127.0.0.1:1234/v1'."
)
return normalized
@dataclass(frozen=True)
class TranslationConfig:
"""Runtime configuration for the translation backend."""
backend: str = DEFAULT_TRANSLATION_BACKEND
base_url: str = DEFAULT_LM_STUDIO_BASE_URL
api_key: str = DEFAULT_LM_STUDIO_API_KEY
model: str = DEFAULT_LM_STUDIO_MODEL
timeout_seconds: float = 45.0
max_retries: int = 3
retry_backoff_seconds: float = 1.0
@classmethod
def from_env(
cls,
backend: Optional[str] = None,
base_url: Optional[str] = None,
model: Optional[str] = None,
api_key: Optional[str] = None,
) -> "TranslationConfig":
"""Build config from environment variables plus optional overrides."""
config = cls(
backend=(backend or os.getenv("TRANSLATION_BACKEND") or DEFAULT_TRANSLATION_BACKEND).strip().lower(),
base_url=_normalize_base_url(base_url or os.getenv("LM_STUDIO_BASE_URL") or DEFAULT_LM_STUDIO_BASE_URL),
api_key=api_key or os.getenv("LM_STUDIO_API_KEY") or DEFAULT_LM_STUDIO_API_KEY,
model=model or os.getenv("LM_STUDIO_MODEL") or DEFAULT_LM_STUDIO_MODEL,
)
config.validate()
return config
@property
def chat_completions_url(self) -> str:
return f"{_normalize_base_url(self.base_url)}/chat/completions"
def validate(self) -> None:
"""Validate the translation configuration."""
if self.backend != DEFAULT_TRANSLATION_BACKEND:
raise ConfigurationError(
f"Unsupported translation backend '{self.backend}'. "
f"Only '{DEFAULT_TRANSLATION_BACKEND}' is supported."
)
if not self.model or not isinstance(self.model, str):
raise ConfigurationError("LM Studio model must be a non-empty string.")
if not self.api_key or not isinstance(self.api_key, str):
raise ConfigurationError("LM Studio API key must be a non-empty string.")
if self.timeout_seconds <= 0:
raise ConfigurationError("LM Studio timeout must be greater than zero.")
if self.max_retries < 1:
raise ConfigurationError("LM Studio max retries must be at least 1.")
if self.retry_backoff_seconds < 0:
raise ConfigurationError("LM Studio retry backoff cannot be negative.")
_normalize_base_url(self.base_url)
def _build_system_prompt(source_language: str, target_language: str) -> str:
source_descriptor = source_language or "auto"
return (
"You are a professional audiovisual translator.\n"
f"Translate the user-provided text from {source_descriptor} to {target_language}.\n"
"Preserve meaning, tone, style, and intent as closely as possible.\n"
"Keep punctuation natural and keep subtitle-like lines concise when the source is concise.\n"
"Return only the translation.\n"
"Do not explain anything.\n"
"Do not add notes, headings, metadata, or commentary.\n"
"Do not add quotation marks unless they are part of the source.\n"
"Preserve line breaks and segment boundaries exactly.\n"
"Keep names, brands, URLs, emails, code, and proper nouns unchanged unless transliteration "
"is clearly appropriate.\n"
"Expand abbreviations only when needed for a natural translation.\n"
"Do not censor, summarize, or omit content."
)
class LMStudioTranslator:
"""OpenAI-style chat completions client for LM Studio."""
def __init__(
self,
config: TranslationConfig,
client: Optional[httpx.Client] = None,
sleeper=time.sleep,
) -> None:
self.config = config
self.config.validate()
self._client = client or httpx.Client(timeout=httpx.Timeout(self.config.timeout_seconds))
self._owns_client = client is None
self._sleeper = sleeper
def build_payload(self, text: str, source_language: str, target_language: str) -> Dict[str, Any]:
"""Build the OpenAI-compatible chat completions payload."""
return {
"model": self.config.model,
"messages": [
{"role": "system", "content": _build_system_prompt(source_language, target_language)},
{"role": "user", "content": text},
],
"temperature": 0.1,
"top_p": 1,
"stream": False,
}
def build_user_only_payload(
self,
text: str,
source_language: str,
target_language: str,
) -> Dict[str, Any]:
"""Build a fallback payload for models that require the first turn to be user."""
instructions = _build_system_prompt(source_language, target_language)
merged_prompt = f"{instructions}\n\nText to translate:\n{text}"
return {
"model": self.config.model,
"messages": [
{"role": "user", "content": merged_prompt},
],
"temperature": 0.1,
"top_p": 1,
"stream": False,
}
def build_structured_translation_payload(
self,
text: str,
source_language: str,
target_language: str,
) -> Dict[str, Any]:
"""Build a payload for custom translation models with structured user content."""
return {
"model": self.config.model,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"source_lang_code": source_language or "auto",
"target_lang_code": target_language,
"text": text,
"image": None,
}
],
}
],
"temperature": 0.1,
"top_p": 1,
"stream": False,
}
@staticmethod
def parse_response_content(payload: Dict[str, Any]) -> str:
"""Extract translated text from an OpenAI-compatible response payload."""
try:
content = payload["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError) as exc:
raise TranslationError("LM Studio response did not contain a chat completion message.") from exc
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict) and item.get("type") == "text":
parts.append(str(item.get("text", "")))
content = "".join(parts)
if not isinstance(content, str):
raise TranslationError("LM Studio response content was not a text string.")
translated = content.strip()
if not translated:
raise TranslationError("LM Studio returned an empty translation.")
return translated
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json",
}
def _should_retry(self, exc: Exception) -> bool:
if isinstance(exc, (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout, httpx.PoolTimeout)):
return True
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code in {408, 409, 429, 500, 502, 503, 504}
return False
@staticmethod
def _should_retry_with_user_only_prompt(exc: Exception) -> bool:
if not isinstance(exc, httpx.HTTPStatusError):
return False
if exc.response.status_code != 400:
return False
response_text = exc.response.text.lower()
return "conversations must start with a user prompt" in response_text
@staticmethod
def _should_retry_with_structured_translation_prompt(exc: Exception) -> bool:
if not isinstance(exc, httpx.HTTPStatusError):
return False
if exc.response.status_code != 400:
return False
response_text = exc.response.text.lower()
return "source_lang_code" in response_text and "target_lang_code" in response_text
def _post_chat_completion(self, payload: Dict[str, Any]) -> str:
response = self._client.post(
self.config.chat_completions_url,
headers=self._headers(),
json=payload,
)
response.raise_for_status()
return self.parse_response_content(response.json())
def translate_text(
self,
text: str,
target_language: str,
source_language: str = "auto",
) -> str:
"""Translate a single text segment."""
if not text.strip():
return ""
payload = self.build_payload(text, source_language, target_language)
last_error: Optional[Exception] = None
for attempt in range(1, self.config.max_retries + 1):
try:
return self._post_chat_completion(payload)
except (httpx.HTTPError, ValueError, TranslationError) as exc:
last_error = exc
if self._should_retry_with_user_only_prompt(exc):
try:
fallback_payload = self.build_user_only_payload(text, source_language, target_language)
return self._post_chat_completion(fallback_payload)
except (httpx.HTTPError, ValueError, TranslationError) as fallback_exc:
last_error = fallback_exc
if self._should_retry_with_structured_translation_prompt(last_error):
try:
structured_payload = self.build_structured_translation_payload(
text,
source_language,
target_language,
)
return self._post_chat_completion(structured_payload)
except (httpx.HTTPError, ValueError, TranslationError) as structured_exc:
last_error = structured_exc
if attempt >= self.config.max_retries or not self._should_retry(exc):
break
self._sleeper(self.config.retry_backoff_seconds * attempt)
if isinstance(last_error, TranslationError):
raise last_error
if isinstance(last_error, ValueError):
raise TranslationError("LM Studio returned a non-JSON response.") from last_error
raise TranslationError(f"LM Studio request failed: {last_error}") from last_error
def translate_segments(
self,
texts: List[str],
target_language: str,
source_language: str = "auto",
) -> List[str]:
"""Translate an ordered list of subtitle-like segments."""
results: List[str] = []
for text in texts:
results.append(
self.translate_text(
text=text,
target_language=target_language,
source_language=source_language,
)
)
return results
def close(self) -> None:
if self._owns_client:
self._client.close()
def translate_text(
text: str,
target_language: str,
source_language: str = "auto",
config: Optional[TranslationConfig] = None,
client: Optional[httpx.Client] = None,
) -> str:
"""Translate a single text string using LM Studio."""
translator = LMStudioTranslator(config or TranslationConfig.from_env(), client=client)
try:
return translator.translate_text(text, target_language, source_language)
finally:
translator.close()
def translate_segments(
texts: List[str],
target_language: str,
source_language: str = "auto",
config: Optional[TranslationConfig] = None,
client: Optional[httpx.Client] = None,
) -> List[str]:
"""Translate a list of text strings using LM Studio."""
translator = LMStudioTranslator(config or TranslationConfig.from_env(), client=client)
try:
return translator.translate_segments(texts, target_language, source_language)
finally:
translator.close()

329
src/youtube.py Normal file
View File

@@ -0,0 +1,329 @@
"""YouTube Content Download Module for YouTube Auto Dub.
This module provides a robust interface for downloading YouTube content
using yt-dlp. It handles:
- Video and audio extraction from YouTube URLs
- Authentication via cookies or browser integration
- Format selection and quality optimization
- Error handling and retry logic
- Metadata extraction and validation
Author: Nguyen Cong Thuan Huy (mangodxd)
Version: 1.0.0
"""
import yt_dlp
from pathlib import Path
from typing import Optional, Dict, Any
from src.engines import CACHE_DIR
def _format_minutes_seconds(total_seconds: float) -> str:
"""Format seconds as M:SS for logging."""
seconds = int(round(total_seconds))
minutes, remaining_seconds = divmod(seconds, 60)
return f"{minutes}:{remaining_seconds:02d}"
def _getOpts(browser: Optional[str] = None,
cookies_file: Optional[str] = None,
quiet: bool = True) -> Dict[str, Any]:
"""Generate common yt-dlp options with authentication configuration.
Args:
browser: Browser name for cookie extraction (chrome, edge, firefox).
If provided, cookies will be extracted from this browser.
cookies_file: Path to cookies.txt file in Netscape format.
Takes priority over browser extraction if both provided.
quiet: Whether to suppress yt-dlp output messages.
Returns:
Dictionary of yt-dlp options.
Raises:
ValueError: If invalid browser name is provided.
Note:
Priority order: cookies_file > browser > no authentication.
"""
opts = {
'quiet': quiet,
'no_warnings': True,
'extract_flat': False,
}
if cookies_file:
cookies_path = Path(cookies_file)
if not cookies_path.exists():
raise FileNotFoundError(f"Cookies file not found: {cookies_file}")
opts['cookiefile'] = str(cookies_path)
print(f"[*] Using cookies file: {cookies_file}")
elif browser:
valid_browsers = ['chrome', 'firefox', 'edge', 'safari', 'opera', 'brave']
browser_lower = browser.lower()
if browser_lower not in valid_browsers:
raise ValueError(f"Invalid browser '{browser}'. Supported: {', '.join(valid_browsers)}")
opts['cookiesfrombrowser'] = (browser_lower,)
print(f"[*] Extracting cookies from browser: {browser}")
else:
print(f"[*] No authentication configured (public videos only)")
return opts
def getId(url: str,
browser: Optional[str] = None,
cookies_file: Optional[str] = None) -> str:
"""Extract YouTube video ID from URL with authentication support.
Args:
url: YouTube video URL to extract ID from.
browser: Browser name for cookie extraction.
cookies_file: Path to cookies.txt file.
Returns:
YouTube video ID as string.
Raises:
ValueError: If URL is invalid or video ID cannot be extracted.
RuntimeError: If yt-dlp fails to extract information.
Note:
This function validates the URL and extracts metadata
without downloading the actual content.
"""
if not url or not isinstance(url, str):
raise ValueError("URL must be a non-empty string")
if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']):
raise ValueError(f"Invalid YouTube URL: {url}")
try:
print(f"[*] Extracting video ID from: {url[:50]}...")
opts = _getOpts(browser=browser, cookies_file=cookies_file)
with yt_dlp.YoutubeDL(opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
video_id = info.get('id')
if not video_id:
raise RuntimeError("No video ID found in extracted information")
title = info.get('title', 'Unknown')
duration = info.get('duration', 0)
uploader = info.get('uploader', 'Unknown')
print(f"[+] Video ID extracted: {video_id}")
print(f" Title: {title[:50]}{'...' if len(title) > 50 else ''}")
print(f" Duration: {duration}s ({_format_minutes_seconds(duration)})")
print(f" Uploader: {uploader}")
return video_id
except yt_dlp.DownloadError as e:
if "Sign in to confirm" in str(e) or "private video" in str(e).lower():
raise ValueError(f"Authentication required for this video. Please use --browser or --cookies. Original error: {e}")
else:
raise RuntimeError(f"yt-dlp extraction failed: {e}")
except Exception as e:
if isinstance(e, (ValueError, RuntimeError)):
raise
raise RuntimeError(f"Failed to extract video ID: {e}") from e
def downloadVideo(url: str,
browser: Optional[str] = None,
cookies_file: Optional[str] = None) -> Path:
"""Download the best quality video with audio from YouTube.
Args:
url: YouTube video URL to download.
browser: Browser name for cookie extraction.
cookies_file: Path to cookies.txt file.
Returns:
Path to the downloaded video file.
Raises:
ValueError: If URL is invalid or authentication is required.
RuntimeError: If download fails or file is corrupted.
Note:
This function downloads both video and audio in a single file.
If the video already exists in cache, it returns the existing file.
"""
try:
video_id = getId(url, browser=browser, cookies_file=cookies_file)
except Exception as e:
raise ValueError(f"Failed to validate video URL: {e}") from e
out_path = CACHE_DIR / f"{video_id}.mp4"
if out_path.exists():
file_size = out_path.stat().st_size
if file_size > 1024 * 1024:
print(f"[*] Video already cached: {out_path}")
return out_path
else:
print(f"[!] WARNING: Cached video seems too small ({file_size} bytes), re-downloading")
out_path.unlink()
try:
print(f"[*] Downloading video: {video_id}")
opts = _getOpts(browser=browser, cookies_file=cookies_file)
opts.update({
'format': (
'bestvideo[ext=mp4][vcodec^=avc]+bestaudio[ext=m4a]/'
'best[ext=mp4]/'
'best'
),
'outtmpl': str(out_path),
'merge_output_format': 'mp4',
'postprocessors': [],
})
with yt_dlp.YoutubeDL(opts) as ydl:
ydl.download([url])
if not out_path.exists():
raise RuntimeError(f"Video file not created after download: {out_path}")
file_size = out_path.stat().st_size
if file_size < 1024 * 1024:
raise RuntimeError(f"Downloaded video file is too small: {file_size} bytes")
print(f"[+] Video downloaded successfully:")
print(f" File: {out_path}")
print(f" Size: {file_size / (1024*1024):.1f} MB")
return out_path
except yt_dlp.DownloadError as e:
error_msg = str(e).lower()
if "sign in to confirm" in error_msg or "private video" in error_msg:
raise ValueError(
f"Authentication required for this video. Please try:\n"
f"1. Close all browser windows and use --browser\n"
f"2. Export fresh cookies.txt and use --cookies\n"
f"3. Check if video is public/accessible\n"
f"Original error: {e}"
)
else:
raise RuntimeError(f"Video download failed: {e}")
except Exception as e:
if out_path.exists():
out_path.unlink()
raise RuntimeError(f"Video download failed: {e}") from e
def downloadAudio(url: str,
browser: Optional[str] = None,
cookies_file: Optional[str] = None) -> Path:
"""Download audio-only from YouTube for transcription processing.
Args:
url: YouTube video URL to extract audio from.
browser: Browser name for cookie extraction.
cookies_file: Path to cookies.txt file.
Returns:
Path to the downloaded WAV audio file.
Raises:
ValueError: If URL is invalid or authentication is required.
RuntimeError: If audio download or conversion fails.
Note:
The output is always in WAV format at the project's sample rate
for consistency with the transcription pipeline.
"""
try:
video_id = getId(url, browser=browser, cookies_file=cookies_file)
except Exception as e:
raise ValueError(f"Failed to validate video URL: {e}") from e
temp_path = CACHE_DIR / f"{video_id}"
final_path = CACHE_DIR / f"{video_id}.wav"
if final_path.exists():
file_size = final_path.stat().st_size
if file_size > 1024 * 100:
print(f"[*] Audio already cached: {final_path}")
return final_path
else:
print(f"[!] WARNING: Cached audio seems too small ({file_size} bytes), re-downloading")
final_path.unlink()
try:
print(f"[*] Downloading audio: {video_id}")
opts = _getOpts(browser=browser, cookies_file=cookies_file)
opts.update({
'format': 'bestaudio/best',
'outtmpl': str(temp_path),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}],
})
with yt_dlp.YoutubeDL(opts) as ydl:
ydl.download([url])
if not final_path.exists():
temp_files = list(CACHE_DIR.glob(f"{video_id}.*"))
if temp_files:
print(f"[!] WARNING: Expected {final_path} but found {temp_files[0]}")
final_path = temp_files[0]
else:
raise RuntimeError(f"Audio file not created after download: {final_path}")
file_size = final_path.stat().st_size
if file_size < 1024 * 100:
raise RuntimeError(f"Downloaded audio file is too small: {file_size} bytes")
print(f"[+] Audio downloaded successfully:")
print(f" File: {final_path}")
print(f" Size: {file_size / (1024*1024):.1f} MB")
try:
from src.media import _get_duration
duration = _get_duration(final_path)
if duration > 0:
print(f" Duration: {duration:.1f}s ({_format_minutes_seconds(duration)})")
else:
print(f"[!] WARNING: Could not determine audio duration")
except Exception as e:
print(f"[!] WARNING: Audio validation failed: {e}")
return final_path
except yt_dlp.DownloadError as e:
error_msg = str(e).lower()
if "sign in to confirm" in error_msg or "private video" in error_msg:
raise ValueError(
f"Authentication required for this video. Please try:\n"
f"1. Close all browser windows and use --browser\n"
f"2. Export fresh cookies.txt and use --cookies\n"
f"3. Check if video is public/accessible\n"
f"Original error: {e}"
)
else:
raise RuntimeError(f"Audio download failed: {e}")
except Exception as e:
for path in [temp_path, final_path]:
if path.exists():
path.unlink()
raise RuntimeError(f"Audio download failed: {e}") from e