diff --git a/main.py b/main.py index b66823b..e146037 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ import asyncio import shutil import time +from src.audio_separation import DEFAULT_MIX_MODE from src.core_utils import ConfigurationError from src.translation import TranslationConfig @@ -54,6 +55,15 @@ Examples: "-wm", help="Whisper model to use (tiny, base, small, medium, large-v3). Default: auto-select based on VRAM", ) + parser.add_argument( + "--mix-mode", + default=DEFAULT_MIX_MODE, + choices=[DEFAULT_MIX_MODE, "original-audio", "dub-only"], + help=( + "Final audio bed for the dubbed output. " + "Default 'instrumental-only' uses a no-vocals bed when separation succeeds." + ), + ) parser.add_argument( "--translation-backend", default="lmstudio", @@ -309,7 +319,36 @@ def main() -> None: print(f"[+] Subtitles generated: {subtitle_path}") print(f"\n{'=' * 60}") - print("STEP 7: FINAL VIDEO RENDERING") + print("STEP 7: AUDIO BED PREPARATION") + print(f"{'=' * 60}") + + effective_mix_mode = args.mix_mode + background_audio_path = None + background_volume = 0.10 + + if effective_mix_mode == DEFAULT_MIX_MODE: + print(f"[*] Preparing default {DEFAULT_MIX_MODE} mix bed...") + separation_result = engine.separate_audio(audio_path, src.engines.TEMP_DIR) + if separation_result.warning: + print(f"[!] WARNING: {separation_result.warning}") + + if separation_result.instrumental_path: + background_audio_path = separation_result.instrumental_path + background_volume = separation_result.recommended_bg_volume + print(f"[+] Background stem ready: {background_audio_path}") + if separation_result.vocals_path: + print(f"[*] Original vocal stem isolated at: {separation_result.vocals_path}") + else: + effective_mix_mode = "dub-only" + print("[!] WARNING: Could not build a safe instrumental bed. Falling back to dub-only output.") + elif effective_mix_mode == "original-audio": + background_audio_path = audio_path + print("[!] WARNING: original-audio mix keeps the source speech bed and may reintroduce bleed.") + else: + print("[*] Dub-only mix selected. No source audio bed will be used.") + + print(f"\n{'=' * 60}") + print("STEP 8: FINAL VIDEO RENDERING") print(f"{'=' * 60}") try: @@ -328,6 +367,9 @@ def main() -> None: concat_manifest_path, final_output, subtitle_path=subtitle_path, + background_audio_path=background_audio_path, + mix_mode=effective_mix_mode, + background_volume=background_volume, ) if final_output.exists(): diff --git a/src/audio_separation.py b/src/audio_separation.py new file mode 100644 index 0000000..46c28dd --- /dev/null +++ b/src/audio_separation.py @@ -0,0 +1,218 @@ +"""Audio separation helpers for cleaner dubbed remixes. + +This module prefers Demucs when it is installed, and otherwise falls back to a +lightweight FFmpeg center-cancel pass that reduces centered vocals while +preserving as much music and ambience as the remix path can safely keep. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import importlib.util +import shutil +import subprocess +import sys +from pathlib import Path +from typing import Optional + +from src.core_utils import _runFFmpegCmd + + +DEFAULT_MIX_MODE = "instrumental-only" +DEMUCS_MODEL_CANDIDATES = ("mdx_extra_q", "htdemucs") + + +@dataclass(frozen=True) +class SeparationResult: + """Represents the usable outputs of a separation attempt.""" + + instrumental_path: Optional[Path] + vocals_path: Optional[Path] + method: Optional[str] + warning: Optional[str] = None + recommended_bg_volume: float = 0.45 + + +class AudioSeparator: + """Produce a background bed without the original vocals when possible.""" + + def __init__(self) -> None: + self._demucs_available = importlib.util.find_spec("demucs") is not None + self._ffprobe_available = shutil.which("ffprobe") is not None + + def separate_audio(self, audio_path: Path, output_dir: Path) -> SeparationResult: + """Create instrumental/no-vocals output for the final remix.""" + if not audio_path.exists(): + raise FileNotFoundError(f"Audio file not found for separation: {audio_path}") + + output_dir.mkdir(parents=True, exist_ok=True) + + demucs_warning: Optional[str] = None + if self._demucs_available: + try: + return self._run_demucs(audio_path, output_dir) + except Exception as exc: # pragma: no cover - exercised only when Demucs is installed + demucs_warning = f"Demucs separation failed: {exc}" + + try: + fallback_result = self._run_ffmpeg_center_cancel(audio_path, output_dir) + if demucs_warning: + return SeparationResult( + instrumental_path=fallback_result.instrumental_path, + vocals_path=fallback_result.vocals_path, + method=fallback_result.method, + warning=f"{demucs_warning}. Falling back to FFmpeg center-cancel separation.", + recommended_bg_volume=fallback_result.recommended_bg_volume, + ) + return fallback_result + except Exception as exc: + warning_parts = [] + if demucs_warning: + warning_parts.append(demucs_warning) + warning_parts.append(f"Fallback separation failed: {exc}") + return SeparationResult( + instrumental_path=None, + vocals_path=None, + method=None, + warning=" ".join(warning_parts), + recommended_bg_volume=0.0, + ) + + def _run_demucs(self, audio_path: Path, output_dir: Path) -> SeparationResult: + """Use Demucs when available to create clean stems.""" + demucs_root = output_dir / "demucs" + demucs_root.mkdir(parents=True, exist_ok=True) + + last_error: Optional[Exception] = None + for model_name in DEMUCS_MODEL_CANDIDATES: + cmd = [ + sys.executable, + "-m", + "demucs.separate", + "-n", + model_name, + "--two-stems", + "vocals", + "-o", + str(demucs_root), + str(audio_path), + ] + try: + subprocess.run(cmd, check=True, capture_output=True, text=True, timeout=None) + stem_dir = demucs_root / model_name / audio_path.stem + instrumental_src = stem_dir / "no_vocals.wav" + vocals_src = stem_dir / "vocals.wav" + + if not instrumental_src.exists(): + raise RuntimeError(f"Demucs did not create {instrumental_src}") + + instrumental_path = output_dir / f"{audio_path.stem}_instrumental.wav" + vocals_path = output_dir / f"{audio_path.stem}_vocals.wav" + shutil.copyfile(instrumental_src, instrumental_path) + if vocals_src.exists(): + shutil.copyfile(vocals_src, vocals_path) + else: + vocals_path = None + + return SeparationResult( + instrumental_path=instrumental_path, + vocals_path=vocals_path, + method=f"demucs:{model_name}", + warning=None, + recommended_bg_volume=0.75, + ) + except Exception as exc: # pragma: no cover - exercised only when Demucs is installed + last_error = exc + + raise RuntimeError(last_error or "Demucs separation failed") + + def _run_ffmpeg_center_cancel(self, audio_path: Path, output_dir: Path) -> SeparationResult: + """Approximate a no-vocals bed by removing centered content.""" + channels = self._get_channel_count(audio_path) + if channels is None: + raise RuntimeError("could not determine channel layout") + if channels < 2: + raise RuntimeError("input is mono, so center-cancel separation is not possible") + + instrumental_path = output_dir / f"{audio_path.stem}_instrumental.wav" + vocals_path = output_dir / f"{audio_path.stem}_vocals.wav" + + instrumental_cmd = [ + "ffmpeg", + "-y", + "-v", + "error", + "-i", + str(audio_path), + "-filter:a", + "pan=stereo|c0=FL-FR|c1=FR-FL,volume=1.35,alimiter=limit=0.95", + "-c:a", + "pcm_s16le", + str(instrumental_path), + ] + vocals_cmd = [ + "ffmpeg", + "-y", + "-v", + "error", + "-i", + str(audio_path), + "-filter:a", + "pan=mono|c0=0.5*FL+0.5*FR,highpass=f=120,alimiter=limit=0.95", + "-c:a", + "pcm_s16le", + str(vocals_path), + ] + + _runFFmpegCmd(instrumental_cmd, timeout=300, description="FFmpeg instrumental separation") + _runFFmpegCmd(vocals_cmd, timeout=300, description="FFmpeg vocal guide extraction") + + if not instrumental_path.exists(): + raise RuntimeError("FFmpeg fallback did not create an instrumental stem") + + return SeparationResult( + instrumental_path=instrumental_path, + vocals_path=vocals_path if vocals_path.exists() else None, + method="ffmpeg-center-cancel", + warning=( + "Using FFmpeg center-cancel fallback instead of Demucs. " + "Centered speech should be reduced, but some residual vocals may remain." + ), + recommended_bg_volume=0.35, + ) + + def _get_channel_count(self, audio_path: Path) -> Optional[int]: + """Inspect channel count so mono inputs can fail safely.""" + if not self._ffprobe_available: + return None + + cmd = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "a:0", + "-show_entries", + "stream=channels", + "-of", + "default=noprint_wrappers=1:nokey=1", + str(audio_path), + ] + try: + result = subprocess.run( + cmd, + check=True, + capture_output=True, + text=True, + timeout=30, + ) + except Exception: + return None + + value = result.stdout.strip() + if not value: + return None + try: + return int(value) + except ValueError: + return None diff --git a/src/engines.py b/src/engines.py index ff2084e..aa17712 100644 --- a/src/engines.py +++ b/src/engines.py @@ -276,6 +276,7 @@ class Engine(PipelineComponent): super().__init__(device_manager, config_manager) self._asr = None + self._separator = None self.source_language_hint = (source_language_hint or os.getenv("SOURCE_LANGUAGE_HINT") or "").strip() self.detected_source_lang = self.source_language_hint or "auto" self.translation_config = translation_config or TranslationConfig.from_env() @@ -303,6 +304,15 @@ class Engine(PipelineComponent): except Exception as e: raise ModelLoadError(f"Failed to load Whisper model: {e}") from e return self._asr + + @property + def separator(self): + """Lazy-load audio separation only for the final remix stage.""" + if not self._separator: + from src.audio_separation import AudioSeparator + + self._separator = AudioSeparator() + return self._separator def _getLangConfig(self, lang: str) -> Dict: """Get language configuration. @@ -417,6 +427,11 @@ class Engine(PipelineComponent): _handleError(e, "translation") raise TranslationError(f"Translation failed: {e}") from e + def separate_audio(self, audio_path: Path, output_dir: Optional[Path] = None): + """Produce an instrumental bed for the final dub mix.""" + target_dir = output_dir or TEMP_DIR + return self.separator.separate_audio(audio_path, target_dir) + def calcRate(self, text: str, target_dur: float, original_text: str = "") -> str: """Calculate speech rate adjustment for TTS with dynamic limits. diff --git a/src/media.py b/src/media.py index 35afabb..a7d6a3d 100644 --- a/src/media.py +++ b/src/media.py @@ -16,7 +16,10 @@ import subprocess from pathlib import Path from typing import List, Dict, Optional -from src.engines import SAMPLE_RATE, AUDIO_CHANNELS +from src.audio_separation import DEFAULT_MIX_MODE +from src.engines import SAMPLE_RATE + +FINAL_MIX_CHANNELS = 2 def _build_subtitle_filter(subtitle_path: Path) -> str: @@ -48,28 +51,66 @@ def _render_mixed_with_soft_subtitles( output_path: Path, subtitle_path: Path, filter_complex: str, + background_audio_path: Optional[Path] = None, ) -> None: """Fallback render path that muxes subtitles while preserving mixed dubbed audio.""" cmd = [ 'ffmpeg', '-y', '-v', 'error', '-i', str(video_path), + ] + subtitle_input_index = 2 + + if background_audio_path is not None: + cmd.extend(['-i', str(background_audio_path)]) + subtitle_input_index = 3 + + cmd.extend([ '-f', 'concat', '-safe', '0', '-i', str(concat_file), '-i', str(subtitle_path), '-filter_complex', filter_complex, '-map', '0:v', '-map', '[outa]', - '-map', '2:0', + '-map', f'{subtitle_input_index}:0', '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k', '-ar', str(SAMPLE_RATE), - '-ac', str(AUDIO_CHANNELS), + '-ac', str(FINAL_MIX_CHANNELS), '-c:s', 'mov_text', '-shortest', str(output_path), - ] + ]) subprocess.run(cmd, check=True, timeout=None) +def _build_audio_mix_filter( + dub_input_label: str, + background_input_label: Optional[str] = None, + background_volume: float = 0.35, +) -> str: + """Build the FFmpeg filter graph for the final dub mix.""" + dub_chain = ( + f"[{dub_input_label}]aresample={SAMPLE_RATE}," + f"aformat=channel_layouts=stereo,volume=1.10[dubpre];" + "[dubpre]asplit=2[dubmix][dubduck]" + ) + + if background_input_label is None: + return ( + f"{dub_chain};" + "[dubmix]loudnorm=I=-16:LRA=11:TP=-1.5,alimiter=limit=0.95[outa]" + ) + + return ( + f"[{background_input_label}]aresample={SAMPLE_RATE}," + f"aformat=channel_layouts=stereo,volume={background_volume:.3f}[bgbed];" + f"{dub_chain};" + "[bgbed][dubduck]sidechaincompress=" + "threshold=0.015:ratio=10:attack=12:release=350:makeup=1[bgduck];" + "[bgduck][dubmix]amix=inputs=2:duration=first:dropout_transition=0:normalize=0," + "loudnorm=I=-16:LRA=11:TP=-1.5,alimiter=limit=0.95[outa]" + ) + + def _get_duration(path: Path) -> float: """Get the duration of an audio/video file using FFprobe.""" if not path.exists(): @@ -257,13 +298,19 @@ def render_video( concat_file: Optional[Path], output_path: Path, subtitle_path: Optional[Path] = None, + background_audio_path: Optional[Path] = None, + mix_mode: str = DEFAULT_MIX_MODE, + background_volume: float = 0.35, ) -> None: - """Render final video with Dynamic Volume Mixing.""" + """Render the final dubbed video with a configurable background bed.""" if not video_path.exists(): raise FileNotFoundError("Source video for rendering is missing") if concat_file is not None and not concat_file.exists(): raise FileNotFoundError("Concat audio manifest for rendering is missing") + + if background_audio_path is not None and not background_audio_path.exists(): + raise FileNotFoundError(f"Background audio for rendering is missing: {background_audio_path}") output_path.parent.mkdir(parents=True, exist_ok=True) @@ -303,41 +350,58 @@ def render_video( print(f"[+] Video rendered successfully: {output_path}") return - - # DYNAMIC VOLUME MIXING STRATEGY: - # Analyze original audio loudness to determine optimal background volume - original_loudness = _analyze_audio_loudness(video_path) - - if original_loudness is not None: - # Calculate background volume based on loudness analysis - # Target: voice should be 10-15dB louder than background - if original_loudness > -10: # Very loud audio - bg_volume = 0.08 # 8% - reduce more for loud content - elif original_loudness > -20: # Normal audio - bg_volume = 0.15 # 15% - standard reduction - else: # Quiet audio - bg_volume = 0.25 # 25% - reduce less for quiet content - - print(f"[*] Dynamic volume mixing: original={original_loudness:.1f}dB, bg_volume={bg_volume*100:.0f}%") + + resolved_mix_mode = (mix_mode or DEFAULT_MIX_MODE).strip().lower() + if resolved_mix_mode not in {DEFAULT_MIX_MODE, 'original-audio', 'dub-only'}: + raise ValueError(f"Unsupported mix mode: {mix_mode}") + + external_bed_path: Optional[Path] = None + background_input_label: Optional[str] = None + dub_input_label = '1:a' + + if resolved_mix_mode == DEFAULT_MIX_MODE: + if background_audio_path is None: + print("[!] WARNING: Instrumental bed unavailable. Falling back to dub-only mix.") + resolved_mix_mode = 'dub-only' + else: + external_bed_path = background_audio_path + background_input_label = '1:a' + dub_input_label = '2:a' + elif resolved_mix_mode == 'original-audio': + if background_audio_path is not None: + external_bed_path = background_audio_path + background_input_label = '1:a' + dub_input_label = '2:a' + else: + background_input_label = '0:a' else: - # Fallback to default if analysis fails - bg_volume = 0.15 - print(f"[*] Using default volume mixing: bg_volume={bg_volume*100:.0f}%") - - filter_complex = ( - f"[0:a]volume={bg_volume}[bg]; " - "[bg][1:a]amix=inputs=2:duration=first:dropout_transition=0[outa]" + resolved_mix_mode = 'dub-only' + + if resolved_mix_mode == 'dub-only': + print("[*] Mix mode: dub-only (no source audio bed)") + elif resolved_mix_mode == DEFAULT_MIX_MODE: + print( + f"[*] Mix mode: {DEFAULT_MIX_MODE} " + f"({background_volume * 100:.0f}% background bed from {external_bed_path.name})" + ) + else: + print(f"[*] Mix mode: original-audio ({background_volume * 100:.0f}% background bed)") + + filter_complex = _build_audio_mix_filter( + dub_input_label=dub_input_label, + background_input_label=background_input_label, + background_volume=background_volume, ) video_codec = 'copy' - cmd = [ - 'ffmpeg', '-y', '-v', 'error', - '-i', str(video_path), + cmd = ['ffmpeg', '-y', '-v', 'error', '-i', str(video_path)] + if external_bed_path is not None: + cmd.extend(['-i', str(external_bed_path)]) + cmd.extend([ '-f', 'concat', '-safe', '0', '-i', str(concat_file), '-filter_complex', filter_complex, - ] - - # Handle Hard Subtitles (Requires re-encoding) + ]) + if subtitle_path: video_codec = 'libx264' cmd.extend(['-vf', _build_subtitle_filter(subtitle_path)]) @@ -348,13 +412,12 @@ def render_video( '-c:v', video_codec, '-c:a', 'aac', '-b:a', '192k', '-ar', str(SAMPLE_RATE), - '-ac', str(AUDIO_CHANNELS), - '-shortest' + '-ac', str(FINAL_MIX_CHANNELS), + '-shortest', ]) - + cmd.append(str(output_path)) - - # Run rendering + try: subprocess.run(cmd, check=True, timeout=None, capture_output=True, text=True) except subprocess.CalledProcessError as exc: @@ -366,6 +429,7 @@ def render_video( output_path=output_path, subtitle_path=subtitle_path, filter_complex=filter_complex, + background_audio_path=external_bed_path, ) else: raise diff --git a/tests/test_audio_pipeline.py b/tests/test_audio_pipeline.py new file mode 100644 index 0000000..a95e28e --- /dev/null +++ b/tests/test_audio_pipeline.py @@ -0,0 +1,145 @@ +"""Focused tests for vocal-bleed reduction in the final dub mix.""" + +from __future__ import annotations + +import math +import shutil +import subprocess +from pathlib import Path + +import numpy as np +import pytest +import soundfile as sf + +from src.audio_separation import AudioSeparator, DEFAULT_MIX_MODE +from src import media + + +FFMPEG_READY = shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None + + +def _sine_wave(frequency: float, duration: float, sample_rate: int, amplitude: float) -> np.ndarray: + t = np.linspace(0.0, duration, int(sample_rate * duration), endpoint=False) + return (amplitude * np.sin(2.0 * math.pi * frequency * t)).astype(np.float32) + + +def _run(cmd: list[str]) -> None: + subprocess.run(cmd, check=True, capture_output=True, text=True, timeout=None) + + +def _extract_peak(signal: np.ndarray, sample_rate: int, frequency: float) -> float: + window = np.hanning(len(signal)) + spectrum = np.fft.rfft(signal * window) + freqs = np.fft.rfftfreq(len(signal), d=1.0 / sample_rate) + index = int(np.argmin(np.abs(freqs - frequency))) + return float(np.abs(spectrum[index])) + + +@pytest.mark.skipif(not FFMPEG_READY, reason="FFmpeg is required for audio pipeline tests") +def test_default_mix_prefers_instrumental_bed_and_keeps_dub_prominent(tmp_path: Path): + sample_rate = 24_000 + duration = 2.0 + + centered_voice = _sine_wave(440.0, duration, sample_rate, amplitude=0.35) + ambience_left = _sine_wave(660.0, duration, sample_rate, amplitude=0.18) + ambience_right = -ambience_left + original_stereo = np.column_stack( + [centered_voice + ambience_left, centered_voice + ambience_right] + ) + + original_audio = tmp_path / "original.wav" + sf.write(original_audio, original_stereo, sample_rate) + + dub_audio = tmp_path / "dub.wav" + sf.write(dub_audio, _sine_wave(1000.0, duration, sample_rate, amplitude=0.30), sample_rate) + + manifest_path = tmp_path / "dub_manifest.txt" + manifest_path.write_text(f"file '{dub_audio.resolve().as_posix()}'\n", encoding="utf-8") + + video_path = tmp_path / "video.mp4" + _run( + [ + "ffmpeg", + "-y", + "-v", + "error", + "-f", + "lavfi", + "-i", + f"color=c=black:s=320x240:d={duration}", + "-i", + str(original_audio), + "-c:v", + "libx264", + "-pix_fmt", + "yuv420p", + "-c:a", + "aac", + "-shortest", + str(video_path), + ] + ) + + separator = AudioSeparator() + separation = separator.separate_audio(original_audio, tmp_path) + + assert separation.instrumental_path is not None + assert separation.instrumental_path.exists() + + instrumental_audio, instrumental_rate = sf.read(separation.instrumental_path, always_2d=True) + original_audio_data, _ = sf.read(original_audio, always_2d=True) + + centered_before = _extract_peak(original_audio_data[:, 0], sample_rate, 440.0) + centered_after = _extract_peak(instrumental_audio[:, 0], instrumental_rate, 440.0) + ambience_after = _extract_peak(instrumental_audio[:, 0], instrumental_rate, 660.0) + + assert centered_after < centered_before * 0.15 + assert ambience_after > 0.01 + + output_video = tmp_path / "dubbed.mp4" + media.render_video( + video_path=video_path, + concat_file=manifest_path, + output_path=output_video, + background_audio_path=separation.instrumental_path, + mix_mode=DEFAULT_MIX_MODE, + background_volume=separation.recommended_bg_volume, + ) + + mixed_audio = tmp_path / "mixed.wav" + _run( + [ + "ffmpeg", + "-y", + "-v", + "error", + "-i", + str(output_video), + "-vn", + "-c:a", + "pcm_s16le", + str(mixed_audio), + ] + ) + + rendered_audio, rendered_rate = sf.read(mixed_audio, always_2d=True) + rendered_channel = rendered_audio[:, 0] + + dub_peak = _extract_peak(rendered_channel, rendered_rate, 1000.0) + residual_original_peak = _extract_peak(rendered_channel, rendered_rate, 440.0) + ambience_peak = _extract_peak(rendered_channel, rendered_rate, 660.0) + + assert dub_peak > residual_original_peak * 4 + assert ambience_peak > residual_original_peak + + +@pytest.mark.skipif(not FFMPEG_READY, reason="FFmpeg is required for audio pipeline tests") +def test_separator_warns_and_returns_no_bed_for_mono_input(tmp_path: Path): + mono_audio = tmp_path / "mono.wav" + sf.write(mono_audio, _sine_wave(440.0, 1.0, 24_000, amplitude=0.30), 24_000) + + result = AudioSeparator().separate_audio(mono_audio, tmp_path) + + assert result.instrumental_path is None + assert result.warning is not None + assert "mono" in result.warning.lower() diff --git a/tests/test_main_cli.py b/tests/test_main_cli.py index 35f2639..667c450 100644 --- a/tests/test_main_cli.py +++ b/tests/test_main_cli.py @@ -2,6 +2,8 @@ from __future__ import annotations +from src.audio_separation import DEFAULT_MIX_MODE + from main import _build_translation_config, build_parser @@ -59,3 +61,11 @@ def test_translation_config_uses_env_defaults(monkeypatch): assert config.base_url == "http://env-host:1234/v1" assert config.model == "env-model" assert config.api_key == "env-key" + + +def test_parser_defaults_to_instrumental_only_mix_mode(): + parser = build_parser() + + args = parser.parse_args(["https://youtube.com/watch?v=demo"]) + + assert args.mix_mode == DEFAULT_MIX_MODE