From 27cfe2a3f54b3b8dfd19abb1c835d5e1b61c92d8 Mon Sep 17 00:00:00 2001 From: oimwiodev Date: Mon, 30 Mar 2026 18:18:41 +0100 Subject: [PATCH] baseline: initial working version --- .gitignore | 8 + LICENSE | 21 + LM_STUDIO_MIGRATION.md | 45 ++ README.md | 167 +++++ language_map.json | 999 ++++++++++++++++++++++++++++++ latest_langmap_generate.py | 98 +++ logs/auto-dub-20260329-225711.log | Bin 0 -> 1356 bytes main.py | 364 +++++++++++ requirements.txt | 12 + run-auto-dub.ps1 | 127 ++++ src/__init__.py | 4 + src/core_utils.py | 181 ++++++ src/engines.py | 547 ++++++++++++++++ src/media.py | 410 ++++++++++++ src/translation.py | 358 +++++++++++ src/youtube.py | 329 ++++++++++ tests/conftest.py | 11 + tests/test_main_cli.py | 61 ++ tests/test_translation.py | 136 ++++ 19 files changed, 3878 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 LM_STUDIO_MIGRATION.md create mode 100644 README.md create mode 100644 language_map.json create mode 100644 latest_langmap_generate.py create mode 100644 logs/auto-dub-20260329-225711.log create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 run-auto-dub.ps1 create mode 100644 src/__init__.py create mode 100644 src/core_utils.py create mode 100644 src/engines.py create mode 100644 src/media.py create mode 100644 src/translation.py create mode 100644 src/youtube.py create mode 100644 tests/conftest.py create mode 100644 tests/test_main_cli.py create mode 100644 tests/test_translation.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e28916e --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.pyc +.cache/ +temp/ +output/ +*.mp4 +*.wav +*.mp3 \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..fbd0ff5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Nguyen Cong Thuan Huy (mangodxd) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/LM_STUDIO_MIGRATION.md b/LM_STUDIO_MIGRATION.md new file mode 100644 index 0000000..af7cff0 --- /dev/null +++ b/LM_STUDIO_MIGRATION.md @@ -0,0 +1,45 @@ +# LM Studio Migration Notes + +## Summary + +This repo originally translated subtitle chunks through a Google Translate scraper wired directly into `src/engines.py`. The translation backend is now replaced with a dedicated LM Studio client that talks to an OpenAI-compatible `/v1/chat/completions` endpoint. + +## New Runtime Defaults + +- `LM_STUDIO_BASE_URL=http://127.0.0.1:1234/v1` +- `LM_STUDIO_API_KEY=lm-studio` +- `LM_STUDIO_MODEL=gemma-3-4b-it` +- `--translation-backend lmstudio` + +## Commands Used In This Checkout + +```powershell +uv venv --clear --python "C:\pinokio\bin\miniconda\python.exe" .venv +uv pip install --python .venv\Scripts\python.exe -r requirements.txt pytest +``` + +Validation commands: + +```powershell +.venv\Scripts\python.exe -m pytest +.venv\Scripts\python.exe main.py --help +.venv\Scripts\python.exe -c "from src.translation import TranslationConfig, LMStudioTranslator; print(TranslationConfig.from_env().model)" +``` + +## Files Touched + +- `main.py` +- `requirements.txt` +- `README.md` +- `src/engines.py` +- `src/translation.py` +- `tests/conftest.py` +- `tests/test_main_cli.py` +- `tests/test_translation.py` + +## Notes + +- Translation remains segment-by-segment for deterministic subtitle ordering. +- The CLI now supports `--lmstudio-base-url` and `--lmstudio-model`. +- Parser/help now loads before heavy runtime imports, which makes `main.py --help` more reliable. +- `src/googlev4.py` was removed from the active codebase because LM Studio is now the only supported translation backend. diff --git a/README.md b/README.md new file mode 100644 index 0000000..32ac8d5 --- /dev/null +++ b/README.md @@ -0,0 +1,167 @@ +# YouTube Auto Dub + +YouTube Auto Dub is a Python pipeline that downloads a YouTube video, transcribes its speech with Whisper, translates the subtitle text through a local LM Studio server, and renders a subtitled output video. + +## What Changed + +- Translation now uses an LM Studio OpenAI-compatible `/v1/chat/completions` endpoint. +- Google Translate scraping has been removed from the active runtime path. +- LM Studio is now the default and only supported translation backend. +- Translation settings can be configured with environment variables or CLI flags. + +## Requirements + +- Python 3.10+ +- [uv](https://docs.astral.sh/uv/) +- FFmpeg and FFprobe available on `PATH` +- LM Studio running locally with an OpenAI-compatible server enabled + +## Setup + +Create a UV-managed virtual environment in a repo subfolder and install dependencies: + +```powershell +uv venv --python "C:\pinokio\bin\miniconda\python.exe" .venv +uv pip install --python .venv\Scripts\python.exe -r requirements.txt +``` + +Verify the local toolchain: + +```powershell +.venv\Scripts\python.exe --version +ffmpeg -version +ffprobe -version +.venv\Scripts\python.exe main.py --help +``` + +## LM Studio Configuration + +Start LM Studio's local server and load a translation-capable model. The default model name in this repo is: + +```text +gemma-3-4b-it +``` + +If your local LM Studio model name differs, set it with an environment variable or `--lmstudio-model`. + +### Environment Variables + +```powershell +$env:LM_STUDIO_BASE_URL="http://127.0.0.1:1234/v1" +$env:LM_STUDIO_API_KEY="lm-studio" +$env:LM_STUDIO_MODEL="gemma-3-4b-it" +``` + +Defaults if unset: + +- `LM_STUDIO_BASE_URL=http://127.0.0.1:1234/v1` +- `LM_STUDIO_API_KEY=lm-studio` +- `LM_STUDIO_MODEL=gemma-3-4b-it` + +## Usage + +Basic example: + +```powershell +.venv\Scripts\python.exe main.py "https://youtube.com/watch?v=VIDEO_ID" --lang es +``` + +Override the LM Studio endpoint or model from the CLI: + +```powershell +.venv\Scripts\python.exe main.py "https://youtube.com/watch?v=VIDEO_ID" ` + --lang fr ` + --translation-backend lmstudio ` + --lmstudio-base-url http://127.0.0.1:1234/v1 ` + --lmstudio-model gemma-3-4b-it +``` + +Authentication options for restricted videos still work as before: + +```powershell +.venv\Scripts\python.exe main.py "https://youtube.com/watch?v=VIDEO_ID" --lang ja --browser chrome +.venv\Scripts\python.exe main.py "https://youtube.com/watch?v=VIDEO_ID" --lang de --cookies cookies.txt +``` + +## CLI Options + +| Option | Description | +| --- | --- | +| `url` | YouTube video URL to process | +| `--lang`, `-l` | Target language code | +| `--browser`, `-b` | Browser name for cookie extraction | +| `--cookies`, `-c` | Path to exported cookies file | +| `--gpu` | Prefer GPU acceleration when CUDA is available | +| `--whisper_model`, `-wm` | Override Whisper model | +| `--translation-backend` | Translation backend, currently `lmstudio` | +| `--lmstudio-base-url` | Override LM Studio base URL | +| `--lmstudio-model` | Override LM Studio model name | + +## Translation Behavior + +The LM Studio translator is tuned for subtitle-like text: + +- preserves meaning, tone, and intent +- keeps punctuation natural +- returns translation text only +- preserves line and segment boundaries +- leaves names, brands, URLs, emails, code, and proper nouns unchanged unless transliteration is clearly needed +- avoids commentary, summarization, and censorship + +Translation is currently performed segment-by-segment to keep subtitle ordering deterministic and reduce the risk of malformed batched output corrupting timing alignment. + +## Testing + +Run the focused validation suite: + +```powershell +.venv\Scripts\python.exe -m pytest +.venv\Scripts\python.exe main.py --help +``` + +The tests cover: + +- LM Studio request payload construction +- response parsing +- retry handling for transient HTTP failures +- empty or malformed response handling +- CLI and environment config precedence + +## Troubleshooting + +### LM Studio connection errors + +- Make sure LM Studio's local server is running. +- Confirm the base URL ends in `/v1`. +- Check that the loaded model name matches `LM_STUDIO_MODEL` or `--lmstudio-model`. + +### Empty or malformed translations + +- Try a stronger local instruction-tuned model if your current model ignores formatting. +- Keep LM Studio in non-streaming OpenAI-compatible mode. +- Review the server logs for model-side failures. + +### FFmpeg missing + +If startup reports missing `ffmpeg` or `ffprobe`, install FFmpeg and add it to your system `PATH`. + +## Project Layout + +```text +youtube-auto-dub/ +|-- main.py +|-- requirements.txt +|-- language_map.json +|-- README.md +|-- LM_STUDIO_MIGRATION.md +|-- src/ +| |-- core_utils.py +| |-- engines.py +| |-- media.py +| |-- translation.py +| `-- youtube.py +`-- tests/ + |-- conftest.py + |-- test_main_cli.py + `-- test_translation.py +``` diff --git a/language_map.json b/language_map.json new file mode 100644 index 0000000..b6abb78 --- /dev/null +++ b/language_map.json @@ -0,0 +1,999 @@ +{ + "af": { + "name": "af-ZA", + "voices": { + "male": [ + "af-ZA-WillemNeural" + ], + "female": [ + "af-ZA-AdriNeural" + ] + } + }, + "sq": { + "name": "sq-AL", + "voices": { + "male": [ + "sq-AL-IlirNeural" + ], + "female": [ + "sq-AL-AnilaNeural" + ] + } + }, + "am": { + "name": "am-ET", + "voices": { + "male": [ + "am-ET-AmehaNeural" + ], + "female": [ + "am-ET-MekdesNeural" + ] + } + }, + "ar": { + "name": "ar-DZ", + "voices": { + "male": [ + "ar-DZ-IsmaelNeural", + "ar-BH-AliNeural", + "ar-EG-ShakirNeural", + "ar-IQ-BasselNeural", + "ar-JO-TaimNeural", + "ar-KW-FahedNeural", + "ar-LB-RamiNeural", + "ar-LY-OmarNeural", + "ar-MA-JamalNeural", + "ar-OM-AbdullahNeural", + "ar-QA-MoazNeural", + "ar-SA-HamedNeural", + "ar-SY-LaithNeural", + "ar-TN-HediNeural", + "ar-AE-HamdanNeural", + "ar-YE-SalehNeural" + ], + "female": [ + "ar-DZ-AminaNeural", + "ar-BH-LailaNeural", + "ar-EG-SalmaNeural", + "ar-IQ-RanaNeural", + "ar-JO-SanaNeural", + "ar-KW-NouraNeural", + "ar-LB-LaylaNeural", + "ar-LY-ImanNeural", + "ar-MA-MounaNeural", + "ar-OM-AyshaNeural", + "ar-QA-AmalNeural", + "ar-SA-ZariyahNeural", + "ar-SY-AmanyNeural", + "ar-TN-ReemNeural", + "ar-AE-FatimaNeural", + "ar-YE-MaryamNeural" + ] + } + }, + "az": { + "name": "az-AZ", + "voices": { + "male": [ + "az-AZ-BabekNeural" + ], + "female": [ + "az-AZ-BanuNeural" + ] + } + }, + "bn": { + "name": "bn-BD", + "voices": { + "male": [ + "bn-BD-PradeepNeural", + "bn-IN-BashkarNeural" + ], + "female": [ + "bn-BD-NabanitaNeural", + "bn-IN-TanishaaNeural" + ] + } + }, + "bs": { + "name": "bs-BA", + "voices": { + "male": [ + "bs-BA-GoranNeural" + ], + "female": [ + "bs-BA-VesnaNeural" + ] + } + }, + "bg": { + "name": "bg-BG", + "voices": { + "male": [ + "bg-BG-BorislavNeural" + ], + "female": [ + "bg-BG-KalinaNeural" + ] + } + }, + "my": { + "name": "my-MM", + "voices": { + "male": [ + "my-MM-ThihaNeural" + ], + "female": [ + "my-MM-NilarNeural" + ] + } + }, + "ca": { + "name": "ca-ES", + "voices": { + "male": [ + "ca-ES-EnricNeural" + ], + "female": [ + "ca-ES-JoanaNeural" + ] + } + }, + "zh": { + "name": "zh-HK", + "voices": { + "male": [ + "zh-HK-WanLungNeural", + "zh-CN-YunjianNeural", + "zh-CN-YunxiNeural", + "zh-CN-YunxiaNeural", + "zh-CN-YunyangNeural", + "zh-TW-YunJheNeural" + ], + "female": [ + "zh-HK-HiuGaaiNeural", + "zh-HK-HiuMaanNeural", + "zh-CN-XiaoxiaoNeural", + "zh-CN-XiaoyiNeural", + "zh-CN-liaoning-XiaobeiNeural", + "zh-TW-HsiaoChenNeural", + "zh-TW-HsiaoYuNeural", + "zh-CN-shaanxi-XiaoniNeural" + ] + } + }, + "hr": { + "name": "hr-HR", + "voices": { + "male": [ + "hr-HR-SreckoNeural" + ], + "female": [ + "hr-HR-GabrijelaNeural" + ] + } + }, + "cs": { + "name": "cs-CZ", + "voices": { + "male": [ + "cs-CZ-AntoninNeural" + ], + "female": [ + "cs-CZ-VlastaNeural" + ] + } + }, + "da": { + "name": "da-DK", + "voices": { + "male": [ + "da-DK-JeppeNeural" + ], + "female": [ + "da-DK-ChristelNeural" + ] + } + }, + "nl": { + "name": "nl-BE", + "voices": { + "male": [ + "nl-BE-ArnaudNeural", + "nl-NL-MaartenNeural" + ], + "female": [ + "nl-BE-DenaNeural", + "nl-NL-ColetteNeural", + "nl-NL-FennaNeural" + ] + } + }, + "en": { + "name": "en-AU", + "voices": { + "male": [ + "en-AU-WilliamMultilingualNeural", + "en-CA-LiamNeural", + "en-HK-SamNeural", + "en-IN-PrabhatNeural", + "en-IE-ConnorNeural", + "en-KE-ChilembaNeural", + "en-NZ-MitchellNeural", + "en-NG-AbeoNeural", + "en-PH-JamesNeural", + "en-US-AndrewNeural", + "en-US-BrianNeural", + "en-SG-WayneNeural", + "en-ZA-LukeNeural", + "en-TZ-ElimuNeural", + "en-GB-RyanNeural", + "en-GB-ThomasNeural", + "en-US-AndrewMultilingualNeural", + "en-US-BrianMultilingualNeural", + "en-US-ChristopherNeural", + "en-US-EricNeural", + "en-US-GuyNeural", + "en-US-RogerNeural", + "en-US-SteffanNeural" + ], + "female": [ + "en-AU-NatashaNeural", + "en-CA-ClaraNeural", + "en-HK-YanNeural", + "en-IN-NeerjaExpressiveNeural", + "en-IN-NeerjaNeural", + "en-IE-EmilyNeural", + "en-KE-AsiliaNeural", + "en-NZ-MollyNeural", + "en-NG-EzinneNeural", + "en-PH-RosaNeural", + "en-US-AvaNeural", + "en-US-EmmaNeural", + "en-SG-LunaNeural", + "en-ZA-LeahNeural", + "en-TZ-ImaniNeural", + "en-GB-LibbyNeural", + "en-GB-MaisieNeural", + "en-GB-SoniaNeural", + "en-US-AnaNeural", + "en-US-AriaNeural", + "en-US-AvaMultilingualNeural", + "en-US-EmmaMultilingualNeural", + "en-US-JennyNeural", + "en-US-MichelleNeural" + ] + } + }, + "et": { + "name": "et-EE", + "voices": { + "male": [ + "et-EE-KertNeural" + ], + "female": [ + "et-EE-AnuNeural" + ] + } + }, + "fil": { + "name": "fil-PH", + "voices": { + "male": [ + "fil-PH-AngeloNeural" + ], + "female": [ + "fil-PH-BlessicaNeural" + ] + } + }, + "fi": { + "name": "fi-FI", + "voices": { + "male": [ + "fi-FI-HarriNeural" + ], + "female": [ + "fi-FI-NooraNeural" + ] + } + }, + "fr": { + "name": "fr-BE", + "voices": { + "male": [ + "fr-BE-GerardNeural", + "fr-CA-ThierryNeural", + "fr-CA-AntoineNeural", + "fr-CA-JeanNeural", + "fr-FR-RemyMultilingualNeural", + "fr-FR-HenriNeural", + "fr-CH-FabriceNeural" + ], + "female": [ + "fr-BE-CharlineNeural", + "fr-CA-SylvieNeural", + "fr-FR-VivienneMultilingualNeural", + "fr-FR-DeniseNeural", + "fr-FR-EloiseNeural", + "fr-CH-ArianeNeural" + ] + } + }, + "gl": { + "name": "gl-ES", + "voices": { + "male": [ + "gl-ES-RoiNeural" + ], + "female": [ + "gl-ES-SabelaNeural" + ] + } + }, + "ka": { + "name": "ka-GE", + "voices": { + "male": [ + "ka-GE-GiorgiNeural" + ], + "female": [ + "ka-GE-EkaNeural" + ] + } + }, + "de": { + "name": "de-AT", + "voices": { + "male": [ + "de-AT-JonasNeural", + "de-DE-FlorianMultilingualNeural", + "de-DE-ConradNeural", + "de-DE-KillianNeural", + "de-CH-JanNeural" + ], + "female": [ + "de-AT-IngridNeural", + "de-DE-SeraphinaMultilingualNeural", + "de-DE-AmalaNeural", + "de-DE-KatjaNeural", + "de-CH-LeniNeural" + ] + } + }, + "el": { + "name": "el-GR", + "voices": { + "male": [ + "el-GR-NestorasNeural" + ], + "female": [ + "el-GR-AthinaNeural" + ] + } + }, + "gu": { + "name": "gu-IN", + "voices": { + "male": [ + "gu-IN-NiranjanNeural" + ], + "female": [ + "gu-IN-DhwaniNeural" + ] + } + }, + "he": { + "name": "he-IL", + "voices": { + "male": [ + "he-IL-AvriNeural" + ], + "female": [ + "he-IL-HilaNeural" + ] + } + }, + "hi": { + "name": "hi-IN", + "voices": { + "male": [ + "hi-IN-MadhurNeural" + ], + "female": [ + "hi-IN-SwaraNeural" + ] + } + }, + "hu": { + "name": "hu-HU", + "voices": { + "male": [ + "hu-HU-TamasNeural" + ], + "female": [ + "hu-HU-NoemiNeural" + ] + } + }, + "is": { + "name": "is-IS", + "voices": { + "male": [ + "is-IS-GunnarNeural" + ], + "female": [ + "is-IS-GudrunNeural" + ] + } + }, + "id": { + "name": "id-ID", + "voices": { + "male": [ + "id-ID-ArdiNeural" + ], + "female": [ + "id-ID-GadisNeural" + ] + } + }, + "iu": { + "name": "iu-Latn-CA", + "voices": { + "male": [ + "iu-Latn-CA-TaqqiqNeural", + "iu-Cans-CA-TaqqiqNeural" + ], + "female": [ + "iu-Latn-CA-SiqiniqNeural", + "iu-Cans-CA-SiqiniqNeural" + ] + } + }, + "ga": { + "name": "ga-IE", + "voices": { + "male": [ + "ga-IE-ColmNeural" + ], + "female": [ + "ga-IE-OrlaNeural" + ] + } + }, + "it": { + "name": "it-IT", + "voices": { + "male": [ + "it-IT-GiuseppeMultilingualNeural", + "it-IT-DiegoNeural" + ], + "female": [ + "it-IT-ElsaNeural", + "it-IT-IsabellaNeural" + ] + } + }, + "ja": { + "name": "ja-JP", + "voices": { + "male": [ + "ja-JP-KeitaNeural" + ], + "female": [ + "ja-JP-NanamiNeural" + ] + } + }, + "jv": { + "name": "jv-ID", + "voices": { + "male": [ + "jv-ID-DimasNeural" + ], + "female": [ + "jv-ID-SitiNeural" + ] + } + }, + "kn": { + "name": "kn-IN", + "voices": { + "male": [ + "kn-IN-GaganNeural" + ], + "female": [ + "kn-IN-SapnaNeural" + ] + } + }, + "kk": { + "name": "kk-KZ", + "voices": { + "male": [ + "kk-KZ-DauletNeural" + ], + "female": [ + "kk-KZ-AigulNeural" + ] + } + }, + "km": { + "name": "km-KH", + "voices": { + "male": [ + "km-KH-PisethNeural" + ], + "female": [ + "km-KH-SreymomNeural" + ] + } + }, + "ko": { + "name": "ko-KR", + "voices": { + "male": [ + "ko-KR-HyunsuMultilingualNeural", + "ko-KR-InJoonNeural" + ], + "female": [ + "ko-KR-SunHiNeural" + ] + } + }, + "lo": { + "name": "lo-LA", + "voices": { + "male": [ + "lo-LA-ChanthavongNeural" + ], + "female": [ + "lo-LA-KeomanyNeural" + ] + } + }, + "lv": { + "name": "lv-LV", + "voices": { + "male": [ + "lv-LV-NilsNeural" + ], + "female": [ + "lv-LV-EveritaNeural" + ] + } + }, + "lt": { + "name": "lt-LT", + "voices": { + "male": [ + "lt-LT-LeonasNeural" + ], + "female": [ + "lt-LT-OnaNeural" + ] + } + }, + "mk": { + "name": "mk-MK", + "voices": { + "male": [ + "mk-MK-AleksandarNeural" + ], + "female": [ + "mk-MK-MarijaNeural" + ] + } + }, + "ms": { + "name": "ms-MY", + "voices": { + "male": [ + "ms-MY-OsmanNeural" + ], + "female": [ + "ms-MY-YasminNeural" + ] + } + }, + "ml": { + "name": "ml-IN", + "voices": { + "male": [ + "ml-IN-MidhunNeural" + ], + "female": [ + "ml-IN-SobhanaNeural" + ] + } + }, + "mt": { + "name": "mt-MT", + "voices": { + "male": [ + "mt-MT-JosephNeural" + ], + "female": [ + "mt-MT-GraceNeural" + ] + } + }, + "mr": { + "name": "mr-IN", + "voices": { + "male": [ + "mr-IN-ManoharNeural" + ], + "female": [ + "mr-IN-AarohiNeural" + ] + } + }, + "mn": { + "name": "mn-MN", + "voices": { + "male": [ + "mn-MN-BataaNeural" + ], + "female": [ + "mn-MN-YesuiNeural" + ] + } + }, + "ne": { + "name": "ne-NP", + "voices": { + "male": [ + "ne-NP-SagarNeural" + ], + "female": [ + "ne-NP-HemkalaNeural" + ] + } + }, + "nb": { + "name": "nb-NO", + "voices": { + "male": [ + "nb-NO-FinnNeural" + ], + "female": [ + "nb-NO-PernilleNeural" + ] + } + }, + "ps": { + "name": "ps-AF", + "voices": { + "male": [ + "ps-AF-GulNawazNeural" + ], + "female": [ + "ps-AF-LatifaNeural" + ] + } + }, + "fa": { + "name": "fa-IR", + "voices": { + "male": [ + "fa-IR-FaridNeural" + ], + "female": [ + "fa-IR-DilaraNeural" + ] + } + }, + "pl": { + "name": "pl-PL", + "voices": { + "male": [ + "pl-PL-MarekNeural" + ], + "female": [ + "pl-PL-ZofiaNeural" + ] + } + }, + "pt": { + "name": "pt-BR", + "voices": { + "male": [ + "pt-BR-AntonioNeural", + "pt-PT-DuarteNeural" + ], + "female": [ + "pt-BR-ThalitaMultilingualNeural", + "pt-BR-FranciscaNeural", + "pt-PT-RaquelNeural" + ] + } + }, + "ro": { + "name": "ro-RO", + "voices": { + "male": [ + "ro-RO-EmilNeural" + ], + "female": [ + "ro-RO-AlinaNeural" + ] + } + }, + "ru": { + "name": "ru-RU", + "voices": { + "male": [ + "ru-RU-DmitryNeural" + ], + "female": [ + "ru-RU-SvetlanaNeural" + ] + } + }, + "sr": { + "name": "sr-RS", + "voices": { + "male": [ + "sr-RS-NicholasNeural" + ], + "female": [ + "sr-RS-SophieNeural" + ] + } + }, + "si": { + "name": "si-LK", + "voices": { + "male": [ + "si-LK-SameeraNeural" + ], + "female": [ + "si-LK-ThiliniNeural" + ] + } + }, + "sk": { + "name": "sk-SK", + "voices": { + "male": [ + "sk-SK-LukasNeural" + ], + "female": [ + "sk-SK-ViktoriaNeural" + ] + } + }, + "sl": { + "name": "sl-SI", + "voices": { + "male": [ + "sl-SI-RokNeural" + ], + "female": [ + "sl-SI-PetraNeural" + ] + } + }, + "so": { + "name": "so-SO", + "voices": { + "male": [ + "so-SO-MuuseNeural" + ], + "female": [ + "so-SO-UbaxNeural" + ] + } + }, + "es": { + "name": "es-AR", + "voices": { + "male": [ + "es-AR-TomasNeural", + "es-BO-MarceloNeural", + "es-CL-LorenzoNeural", + "es-CO-GonzaloNeural", + "es-CR-JuanNeural", + "es-CU-ManuelNeural", + "es-DO-EmilioNeural", + "es-EC-LuisNeural", + "es-SV-RodrigoNeural", + "es-GQ-JavierNeural", + "es-GT-AndresNeural", + "es-HN-CarlosNeural", + "es-MX-JorgeNeural", + "es-NI-FedericoNeural", + "es-PA-RobertoNeural", + "es-PY-MarioNeural", + "es-PE-AlexNeural", + "es-PR-VictorNeural", + "es-ES-AlvaroNeural", + "es-US-AlonsoNeural", + "es-UY-MateoNeural", + "es-VE-SebastianNeural" + ], + "female": [ + "es-AR-ElenaNeural", + "es-BO-SofiaNeural", + "es-CL-CatalinaNeural", + "es-CO-SalomeNeural", + "es-ES-XimenaNeural", + "es-CR-MariaNeural", + "es-CU-BelkysNeural", + "es-DO-RamonaNeural", + "es-EC-AndreaNeural", + "es-SV-LorenaNeural", + "es-GQ-TeresaNeural", + "es-GT-MartaNeural", + "es-HN-KarlaNeural", + "es-MX-DaliaNeural", + "es-NI-YolandaNeural", + "es-PA-MargaritaNeural", + "es-PY-TaniaNeural", + "es-PE-CamilaNeural", + "es-PR-KarinaNeural", + "es-ES-ElviraNeural", + "es-US-PalomaNeural", + "es-UY-ValentinaNeural", + "es-VE-PaolaNeural" + ] + } + }, + "su": { + "name": "su-ID", + "voices": { + "male": [ + "su-ID-JajangNeural" + ], + "female": [ + "su-ID-TutiNeural" + ] + } + }, + "sw": { + "name": "sw-KE", + "voices": { + "male": [ + "sw-KE-RafikiNeural", + "sw-TZ-DaudiNeural" + ], + "female": [ + "sw-KE-ZuriNeural", + "sw-TZ-RehemaNeural" + ] + } + }, + "sv": { + "name": "sv-SE", + "voices": { + "male": [ + "sv-SE-MattiasNeural" + ], + "female": [ + "sv-SE-SofieNeural" + ] + } + }, + "ta": { + "name": "ta-IN", + "voices": { + "male": [ + "ta-IN-ValluvarNeural", + "ta-MY-SuryaNeural", + "ta-SG-AnbuNeural", + "ta-LK-KumarNeural" + ], + "female": [ + "ta-IN-PallaviNeural", + "ta-MY-KaniNeural", + "ta-SG-VenbaNeural", + "ta-LK-SaranyaNeural" + ] + } + }, + "te": { + "name": "te-IN", + "voices": { + "male": [ + "te-IN-MohanNeural" + ], + "female": [ + "te-IN-ShrutiNeural" + ] + } + }, + "th": { + "name": "th-TH", + "voices": { + "male": [ + "th-TH-NiwatNeural" + ], + "female": [ + "th-TH-PremwadeeNeural" + ] + } + }, + "tr": { + "name": "tr-TR", + "voices": { + "male": [ + "tr-TR-AhmetNeural" + ], + "female": [ + "tr-TR-EmelNeural" + ] + } + }, + "uk": { + "name": "uk-UA", + "voices": { + "male": [ + "uk-UA-OstapNeural" + ], + "female": [ + "uk-UA-PolinaNeural" + ] + } + }, + "ur": { + "name": "ur-IN", + "voices": { + "male": [ + "ur-IN-SalmanNeural", + "ur-PK-AsadNeural" + ], + "female": [ + "ur-IN-GulNeural", + "ur-PK-UzmaNeural" + ] + } + }, + "uz": { + "name": "uz-UZ", + "voices": { + "male": [ + "uz-UZ-SardorNeural" + ], + "female": [ + "uz-UZ-MadinaNeural" + ] + } + }, + "vi": { + "name": "vi-VN", + "voices": { + "male": [ + "vi-VN-NamMinhNeural" + ], + "female": [ + "vi-VN-HoaiMyNeural" + ] + } + }, + "cy": { + "name": "cy-GB", + "voices": { + "male": [ + "cy-GB-AledNeural" + ], + "female": [ + "cy-GB-NiaNeural" + ] + } + }, + "zu": { + "name": "zu-ZA", + "voices": { + "male": [ + "zu-ZA-ThembaNeural" + ], + "female": [ + "zu-ZA-ThandoNeural" + ] + } + } +} \ No newline at end of file diff --git a/latest_langmap_generate.py b/latest_langmap_generate.py new file mode 100644 index 0000000..54f4c72 --- /dev/null +++ b/latest_langmap_generate.py @@ -0,0 +1,98 @@ +""" +Language Map Generator for YouTube Auto Dub. + +This script fetches the latest available voices from Microsoft Edge TTS +and generates a `language_map.json` file compatible with the +Multi-Speaker Diarization system. + +It groups voices into 'male' and 'female' lists (pools) for every language, +enabling the engine to rotate voices for different speakers automatically. + +Usage: python latest_langmap_generate.py +""" + +import asyncio +import json +import edge_tts +from pathlib import Path +from typing import Dict, List, Any + +# Define path relative to project root (assuming this script is in root or src) +# Adjust BASE_DIR if you move this script. +BASE_DIR = Path(__file__).resolve().parent +LANG_MAP_FILE = BASE_DIR / "language_map.json" + +async def generate_lang_map() -> None: + print("[*] Connecting to Microsoft Edge TTS API...") + + try: + # Fetch all available voices + voices = await edge_tts.list_voices() + except Exception as e: + print(f"[!] CRITICAL: Failed to fetch voices: {e}") + return + + print(f"[*] Processing {len(voices)} raw voice entries...") + + # Structure: { "vi": { "name": "vi-VN", "voices": { "male": [], "female": [] } } } + lang_map: Dict[str, Any] = {} + + for v in voices: + # 1. FILTER: Strict quality control - Neural voices only + if "Neural" not in v["ShortName"]: + continue + + # 2. EXTRACT: Parse metadata + short_name = v["ShortName"] # e.g., "vi-VN-NamMinhNeural" + locale = v["Locale"] # e.g., "vi-VN" + gender = v["Gender"].lower() # "male" or "female" + + # ISO Language Code (e.g., 'vi' from 'vi-VN') + lang_code = locale.split('-')[0] + + # 3. INITIALIZE: Create structure if language not seen before + if lang_code not in lang_map: + lang_map[lang_code] = { + "name": locale, # Store locale as a friendly name reference + "voices": { + "male": [], + "female": [] + } + } + + # 4. POPULATE: Add voice to the specific gender pool + # This creates the "List" structure required by engines.py + target_list = lang_map[lang_code]["voices"].get(gender) + + # Handle case where gender might be undefined or new + if target_list is None: + lang_map[lang_code]["voices"][gender] = [] + target_list = lang_map[lang_code]["voices"][gender] + + if short_name not in target_list: + target_list.append(short_name) + + # 5. OPTIMIZE: Remove languages with empty voice lists (optional cleanup) + final_map = { + k: v for k, v in lang_map.items() + if v["voices"]["male"] or v["voices"]["female"] + } + + # 6. SAVE: Write to JSON + try: + with open(LANG_MAP_FILE, "w", encoding="utf-8") as f: + json.dump(final_map, f, ensure_ascii=False, indent=2) + + print(f"\n[+] SUCCESS! Generated configuration for {len(final_map)} languages.") + print(f" File saved to: {LANG_MAP_FILE}") + + # Preview a specific language (e.g., Vietnamese) + if "vi" in final_map: + print("\n[*] Preview (Vietnamese):") + print(json.dumps(final_map["vi"], indent=2)) + + except Exception as e: + print(f"[!] ERROR: Failed to write JSON file: {e}") + +if __name__ == "__main__": + asyncio.run(generate_lang_map()) \ No newline at end of file diff --git a/logs/auto-dub-20260329-225711.log b/logs/auto-dub-20260329-225711.log new file mode 100644 index 0000000000000000000000000000000000000000..811044a67891356afef597351dfcea202240631e GIT binary patch literal 1356 zcmchXTW{Jx5QWckrTz!M@noAzXqr@!N+7*RkV;MHOQ|9^KoSYC1ty_?zU_CG0Fl~< zAXU|9$GfvTvuDoE_}9;`s;kO)rI>F6UDuH=(RCfGsl!M8O_45CD$^W&rVdNpz8N0g|G|6-lBK@m6Jz%( zXEqGBIK*FIbHsrX-7}k^2jp>$?M?-|j#Ut?)GgjK&W*HU{0~fKz)U6qK?H&lDZq5B z6xrpNge+5?AuIKGCucd*Cq@sfF)|5!`c=kcx9^*DiE{-rm%KtpABlXWQ?;<1G3)EI zx;oh+T;r=zBZr``K8_l=ZENTN^!LpF1&SV6I-v0VH{`pnoH*3#0ldU;Snhq(1yP+< zJLK0T({sJY(n34q4{&e}8z5JZl zwX2{6hpW%kxCcq={IUgg^CyM` zoBui$?K#DAtCyK4shY6~!$)m?_}a&t0f$Bdb~X|R|NM=Mxevpc5y W%;)C(Q@+gG3=|nLCY(z`_5B7BsoUNF literal 0 HcmV?d00001 diff --git a/main.py b/main.py new file mode 100644 index 0000000..b66823b --- /dev/null +++ b/main.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +"""YouTube Auto Dub command-line entrypoint.""" + +from __future__ import annotations + +import argparse +import asyncio +import shutil +import time + +from src.core_utils import ConfigurationError +from src.translation import TranslationConfig + + +def build_parser() -> argparse.ArgumentParser: + """Build the command-line parser.""" + parser = argparse.ArgumentParser( + description="YouTube Auto Dub - Automated Video Subtitling", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +Examples: + python main.py "https://youtube.com/watch?v=VIDEO_ID" --lang es + python main.py "https://youtube.com/watch?v=VIDEO_ID" --lang fr --gpu + python main.py "https://youtube.com/watch?v=VIDEO_ID" --lang ja --browser chrome + python main.py "https://youtube.com/watch?v=VIDEO_ID" --whisper_model large-v3 + python main.py "https://youtube.com/watch?v=VIDEO_ID" --lmstudio-model gemma-3-4b-it + """, + ) + + parser.add_argument("url", help="YouTube video URL to subtitle") + parser.add_argument( + "--lang", + "-l", + default="es", + help="Target language ISO code (e.g., es, fr, ja, vi).", + ) + parser.add_argument( + "--browser", + "-b", + help="Browser to extract cookies from (chrome, edge, firefox). Close browser first!", + ) + parser.add_argument( + "--cookies", + "-c", + help="Path to cookies.txt file (Netscape format) for YouTube authentication", + ) + parser.add_argument( + "--gpu", + action="store_true", + help="Use GPU acceleration for Whisper when CUDA is available.", + ) + parser.add_argument( + "--whisper_model", + "-wm", + help="Whisper model to use (tiny, base, small, medium, large-v3). Default: auto-select based on VRAM", + ) + parser.add_argument( + "--translation-backend", + default="lmstudio", + choices=["lmstudio"], + help="Translation backend to use. Currently only 'lmstudio' is supported.", + ) + parser.add_argument( + "--lmstudio-base-url", + help="Override the LM Studio OpenAI-compatible base URL (default: env or http://127.0.0.1:1234/v1).", + ) + parser.add_argument( + "--lmstudio-model", + help="Override the LM Studio model name (default: env or gemma-3-4b-it).", + ) + return parser + + +def _check_deps() -> None: + """Verify critical runtime dependencies.""" + from shutil import which + + missing = [] + if not which("ffmpeg"): + missing.append("ffmpeg") + if not which("ffprobe"): + missing.append("ffprobe") + + if missing: + print(f"[!] CRITICAL: Missing dependencies: {', '.join(missing)}") + print(" Please install FFmpeg and add it to your System PATH.") + print(" Download: https://ffmpeg.org/download.html") + raise SystemExit(1) + + try: + import torch + + print(f"[*] PyTorch {torch.__version__} | CUDA Available: {torch.cuda.is_available()}") + except ImportError: + print("[!] CRITICAL: PyTorch not installed.") + print(" Install with your UV env, for example:") + print(" uv pip install --python .venv\\Scripts\\python.exe -r requirements.txt") + raise SystemExit(1) + + +def _cleanup() -> None: + """Clean up the temp directory with retries for Windows file locks.""" + import src.engines + + max_retries = 5 + for attempt in range(max_retries): + try: + if src.engines.TEMP_DIR.exists(): + shutil.rmtree(src.engines.TEMP_DIR) + src.engines.TEMP_DIR.mkdir(parents=True, exist_ok=True) + return + except PermissionError: + wait_time = 0.5 * (2 ** attempt) + print(f"[-] File locked (attempt {attempt + 1}/{max_retries}). Retrying in {wait_time}s...") + time.sleep(wait_time) + + print(f"[!] WARNING: Could not fully clean temp directory after {max_retries} attempts.") + print(f" Files may persist in: {src.engines.TEMP_DIR}") + + +def _detect_device() -> str: + """Detect the best available inference device.""" + import torch + + if torch.backends.mps.is_available(): + return "mps" + if torch.cuda.is_available(): + return "cuda" + return "cpu" + + +def _build_translation_config(args: argparse.Namespace) -> TranslationConfig: + """Resolve translation configuration from env vars plus CLI overrides.""" + return TranslationConfig.from_env( + backend=args.translation_backend, + base_url=args.lmstudio_base_url, + model=args.lmstudio_model, + ) + + +def _get_source_language_hint() -> str: + """Read an optional source language override from the environment.""" + import os + + return (os.getenv("SOURCE_LANGUAGE_HINT") or "").strip() + + +async def _synthesize_dub_audio(engine, chunks, target_lang: str, media_module, temp_dir) -> None: + """Generate and fit dubbed audio clips for each translated chunk.""" + total = len(chunks) + for index, chunk in enumerate(chunks, start=1): + translated_text = chunk.get("trans_text", "").strip() + target_duration = max(0.0, chunk["end"] - chunk["start"]) + + if not translated_text or target_duration <= 0: + chunk["processed_audio"] = None + continue + + raw_audio_path = temp_dir / f"tts_{index:04d}.mp3" + rate = engine.calcRate( + text=translated_text, + target_dur=target_duration, + original_text=chunk.get("text", ""), + ) + + await engine.synthesize( + text=translated_text, + target_lang=target_lang, + out_path=raw_audio_path, + rate=rate, + ) + + chunk["processed_audio"] = media_module.fit_audio(raw_audio_path, target_duration) + + if index == 1 or index % 10 == 0 or index == total: + print(f"[-] Dub synthesis progress: {index}/{total}") + + +def main() -> None: + """Run the full YouTube Auto Dub pipeline.""" + parser = build_parser() + args = parser.parse_args() + + import src.engines + import src.media + import src.youtube + + print("\n" + "=" * 60) + print("YOUTUBE AUTO SUB - INITIALIZING") + print("=" * 60) + + _check_deps() + + try: + translation_config = _build_translation_config(args) + except ConfigurationError as exc: + print(f"[!] INVALID TRANSLATION CONFIG: {exc}") + raise SystemExit(1) from exc + + _cleanup() + + device = _detect_device() + print(f"[*] Using device: {device.upper()}") + print(f"[*] Translation backend: {translation_config.backend}") + print(f"[*] LM Studio endpoint: {translation_config.base_url}") + print(f"[*] LM Studio model: {translation_config.model}") + + if args.whisper_model: + src.engines.ASR_MODEL = args.whisper_model + print(f"[*] Using specified Whisper model: {args.whisper_model}") + else: + print(f"[*] Auto-selected Whisper model: {src.engines.ASR_MODEL} (based on VRAM)") + + try: + source_language_hint = _get_source_language_hint() + if source_language_hint: + print(f"[*] Source language hint: {source_language_hint}") + + engine = src.engines.Engine( + device, + translation_config=translation_config, + source_language_hint=source_language_hint, + ) + + print(f"\n{'=' * 60}") + print("STEP 1: DOWNLOADING CONTENT") + print(f"{'=' * 60}") + print(f"[*] Target URL: {args.url}") + print(f"[*] Target Language: {args.lang.upper()}") + + try: + video_path = src.youtube.downloadVideo( + args.url, + browser=args.browser, + cookies_file=args.cookies, + ) + audio_path = src.youtube.downloadAudio( + args.url, + browser=args.browser, + cookies_file=args.cookies, + ) + print(f"[+] Video downloaded: {video_path}") + print(f"[+] Audio extracted: {audio_path}") + except Exception as exc: + print(f"\n[!] DOWNLOAD FAILED: {exc}") + print("\n[-] TROUBLESHOOTING TIPS:") + print(" 1. Close all browser windows if using --browser") + print(" 2. Export fresh cookies.txt and use --cookies") + print(" 3. Check if video is private/region-restricted") + print(" 4. Verify YouTube URL is correct") + return + + print(f"\n{'=' * 60}") + print("STEP 2: SPEECH TRANSCRIPTION") + print(f"{'=' * 60}") + print(f"[*] Transcribing audio with Whisper ({src.engines.ASR_MODEL})...") + + raw_segments = engine.transcribeSafe(audio_path) + print(f"[+] Transcription complete: {len(raw_segments)} segments") + + if raw_segments: + print(f"[*] Sample segment: '{raw_segments[0]['text'][:50]}...'") + + print(f"\n{'=' * 60}") + print("STEP 3: INTELLIGENT CHUNKING") + print(f"{'=' * 60}") + + chunks = src.engines.smartChunk(raw_segments) + print(f"[+] Optimized {len(raw_segments)} raw segments into {len(chunks)} chunks") + print(f"[*] Average chunk duration: {sum(c['end'] - c['start'] for c in chunks) / len(chunks):.2f}s") + + print(f"\n{'=' * 60}") + print(f"STEP 4: TRANSLATION ({args.lang.upper()})") + print(f"{'=' * 60}") + + texts = [chunk["text"] for chunk in chunks] + print(f"[*] Translating {len(texts)} text segments...") + + translated_texts = engine.translateSafe(texts, args.lang) + + for index, chunk in enumerate(chunks): + chunk["trans_text"] = translated_texts[index] + + print("[+] Translation complete") + + if chunks: + original = chunks[0]["text"][:50] + translated = chunks[0]["trans_text"][:50] + print(f"[*] Sample: '{original}' -> '{translated}'") + + print(f"\n{'=' * 60}") + print("STEP 5: DUB AUDIO SYNTHESIS") + print(f"{'=' * 60}") + + print(f"[*] Synthesizing dubbed speech for {len(chunks)} translated chunks...") + asyncio.run(_synthesize_dub_audio(engine, chunks, args.lang, src.media, src.engines.TEMP_DIR)) + + concat_manifest_path = src.engines.TEMP_DIR / "dub_audio_manifest.txt" + silence_ref_path = src.engines.TEMP_DIR / "silence_ref.wav" + src.media.create_concat_file(chunks, silence_ref_path, concat_manifest_path) + print(f"[+] Dub audio manifest generated: {concat_manifest_path}") + + print(f"\n{'=' * 60}") + print("STEP 6: SUBTITLE GENERATION") + print(f"{'=' * 60}") + + subtitle_path = src.engines.TEMP_DIR / "subtitles.srt" + src.media.generate_srt(chunks, subtitle_path) + print(f"[+] Subtitles generated: {subtitle_path}") + + print(f"\n{'=' * 60}") + print("STEP 7: FINAL VIDEO RENDERING") + print(f"{'=' * 60}") + + try: + video_name = video_path.stem + output_name = f"dubbed_{args.lang}_{video_name}.mp4" + final_output = src.engines.OUTPUT_DIR / output_name + + print("[*] Rendering final video with dubbed audio and subtitles...") + print(f" Source: {video_path}") + print(f" Output: {final_output}") + print(f" Dub audio manifest: {concat_manifest_path}") + print(f" Subtitles: {subtitle_path}") + + src.media.render_video( + video_path, + concat_manifest_path, + final_output, + subtitle_path=subtitle_path, + ) + + if final_output.exists(): + file_size = final_output.stat().st_size / (1024 * 1024) + print("\n[+] SUCCESS! Video rendered successfully.") + print(f" Output: {final_output}") + print(f" Size: {file_size:.1f} MB") + else: + print(f"\n[!] ERROR: Output file not created at {final_output}") + except Exception as exc: + print(f"\n[!] RENDERING FAILED: {exc}") + print("[-] This may be due to:") + print(" 1. Corrupted audio chunks") + print(" 2. FFmpeg compatibility issues") + print(" 3. Insufficient disk space") + return + finally: + if "engine" in locals(): + engine.translator.close() + print(f"\n{'=' * 60}") + print("YOUTUBE AUTO SUB - PIPELINE COMPLETE") + print(f"{'=' * 60}") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n[!] Process interrupted by user") + raise SystemExit(1) + except Exception as exc: + print(f"\n[!] UNEXPECTED ERROR: {exc}") + print("[-] Please report this issue with the full error message") + raise SystemExit(1) from exc diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..95fc6bc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +yt-dlp +faster-whisper +torch +edge-tts +httpx +librosa +numpy +soundfile +tqdm +pathlib +typing-extensions +pytest diff --git a/run-auto-dub.ps1 b/run-auto-dub.ps1 new file mode 100644 index 0000000..9f8d61a --- /dev/null +++ b/run-auto-dub.ps1 @@ -0,0 +1,127 @@ +param( + [string]$DefaultVideoUrl = "https://youtu.be/EExM3dueOeM", + [string]$DefaultOutputLanguage = "es", + [string]$DefaultInputLanguage = "", + [string]$DefaultLmStudioBaseUrl = "http://127.0.0.1:1234/v1", + [string]$DefaultLmStudioApiKey = "lm-studio", + [string]$DefaultLmStudioModel = "gemma-3-4b-it" +) + +$ErrorActionPreference = "Stop" + +function Read-Value { + param( + [Parameter(Mandatory = $true)] + [string]$Prompt, + [string]$DefaultValue = "", + [switch]$Required + ) + + if ($DefaultValue) { + $value = Read-Host "$Prompt [$DefaultValue]" + if ([string]::IsNullOrWhiteSpace($value)) { + $value = $DefaultValue + } + } + else { + $value = Read-Host $Prompt + } + + if ($Required -and [string]::IsNullOrWhiteSpace($value)) { + throw "A value is required for: $Prompt" + } + + return $value.Trim() +} + +$repoRoot = Split-Path -Parent $MyInvocation.MyCommand.Path +$pythonExe = Join-Path $repoRoot ".venv\Scripts\python.exe" +$mainPy = Join-Path $repoRoot "main.py" +$logsDir = Join-Path $repoRoot "logs" +$timestamp = Get-Date -Format "yyyyMMdd-HHmmss" +$logFile = Join-Path $logsDir "auto-dub-$timestamp.log" + +if (-not (Test-Path $pythonExe)) { + throw "Python executable not found at $pythonExe. Create the UV environment first." +} + +if (-not (Test-Path $mainPy)) { + throw "main.py not found at $mainPy." +} + +New-Item -ItemType Directory -Force -Path $logsDir | Out-Null + +Write-Host "" +Write-Host "YouTube Auto Dub Launcher" -ForegroundColor Cyan +Write-Host "Repo: $repoRoot" +Write-Host "Log file: $logFile" +Write-Host "" +Write-Host "Leave input language blank to let Whisper auto-detect it." -ForegroundColor Yellow +Write-Host "" + +$videoUrl = Read-Value -Prompt "Video URL" -DefaultValue $DefaultVideoUrl -Required +$outputLanguage = Read-Value -Prompt "Output language code" -DefaultValue $DefaultOutputLanguage -Required +$inputLanguage = Read-Value -Prompt "Input language code (optional)" -DefaultValue $DefaultInputLanguage +$lmStudioBaseUrl = Read-Value -Prompt "LM Studio base URL" -DefaultValue $DefaultLmStudioBaseUrl -Required +$lmStudioApiKey = Read-Value -Prompt "LM Studio API key" -DefaultValue $DefaultLmStudioApiKey -Required +$lmStudioModel = Read-Value -Prompt "LM Studio model" -DefaultValue $DefaultLmStudioModel -Required + +$env:LM_STUDIO_BASE_URL = $lmStudioBaseUrl +$env:LM_STUDIO_API_KEY = $lmStudioApiKey +$env:LM_STUDIO_MODEL = $lmStudioModel + +$commandArgs = @( + $mainPy, + $videoUrl, + "--lang", + $outputLanguage +) + +if (-not [string]::IsNullOrWhiteSpace($inputLanguage)) { + $env:SOURCE_LANGUAGE_HINT = $inputLanguage + Write-Host "Using input language hint: $inputLanguage" -ForegroundColor Yellow +} +else { + Remove-Item Env:SOURCE_LANGUAGE_HINT -ErrorAction SilentlyContinue +} + +Write-Host "" +Write-Host "Running with:" -ForegroundColor Cyan +Write-Host " Video URL: $videoUrl" +Write-Host " Output language: $outputLanguage" +Write-Host " LM Studio URL: $lmStudioBaseUrl" +Write-Host " LM Studio model: $lmStudioModel" +if ($inputLanguage) { + Write-Host " Input language hint: $inputLanguage" +} +else { + Write-Host " Input language hint: auto-detect" +} +Write-Host "" + +Push-Location $repoRoot +try { + $commandLine = @($pythonExe) + $commandArgs + + "[$(Get-Date -Format s)] Starting run" | Tee-Object -FilePath $logFile -Append | Out-Null + "[$(Get-Date -Format s)] Command: $($commandLine -join ' ')" | Tee-Object -FilePath $logFile -Append | Out-Null + "[$(Get-Date -Format s)] LM_STUDIO_BASE_URL=$lmStudioBaseUrl" | Tee-Object -FilePath $logFile -Append | Out-Null + "[$(Get-Date -Format s)] LM_STUDIO_MODEL=$lmStudioModel" | Tee-Object -FilePath $logFile -Append | Out-Null + if ($inputLanguage) { + "[$(Get-Date -Format s)] SOURCE_LANGUAGE_HINT=$inputLanguage" | Tee-Object -FilePath $logFile -Append | Out-Null + } + + & $pythonExe @commandArgs 2>&1 | Tee-Object -FilePath $logFile -Append +} +catch { + Write-Host "" + Write-Host "The run failed." -ForegroundColor Red + Write-Host $_.Exception.Message -ForegroundColor Red + "[$(Get-Date -Format s)] Launcher error: $($_.Exception.Message)" | Tee-Object -FilePath $logFile -Append | Out-Null +} +finally { + Pop-Location + Write-Host "" + Write-Host "Run log saved to: $logFile" -ForegroundColor Cyan + Read-Host "Press Enter to close" +} diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..354cc89 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,4 @@ +"""YouTube Auto Dub - Automated Video Translation and Dubbing""" + +__version__ = "1.0.0" +__author__ = "Nguyen Cong Thuan Huy (mangodxd)" diff --git a/src/core_utils.py b/src/core_utils.py new file mode 100644 index 0000000..b3e25f4 --- /dev/null +++ b/src/core_utils.py @@ -0,0 +1,181 @@ +"""Core utilities and exceptions for YouTube Auto Sub. + +This module consolidates shared utilities, exceptions, and helper functions +used across the entire pipeline to reduce code duplication. + +Author: Nguyen Cong Thuan Huy (mangodxd) +Version: 1.0.0 +""" + +import subprocess +import time +import traceback +from pathlib import Path +from typing import Dict, List, Optional, Union + + +class YouTubeAutoSubError(Exception): + """Base exception for all YouTube Auto Sub errors.""" + pass + + +class ModelLoadError(YouTubeAutoSubError): + """Raised when AI/ML model fails to load.""" + pass + + +class AudioProcessingError(YouTubeAutoSubError): + """Raised when audio processing operations fail.""" + pass + + +class TranscriptionError(YouTubeAutoSubError): + """Raised when speech transcription fails.""" + pass + + +class TranslationError(YouTubeAutoSubError): + """Raised when text translation fails.""" + pass + + +class TTSError(YouTubeAutoSubError): + """Raised when text-to-speech synthesis fails.""" + pass + + +class VideoProcessingError(YouTubeAutoSubError): + """Raised when video processing operations fail.""" + pass + + +class ConfigurationError(YouTubeAutoSubError): + """Raised when configuration is invalid or missing.""" + pass + + +class DependencyError(YouTubeAutoSubError): + """Raised when required dependencies are missing.""" + pass + + +class ValidationError(YouTubeAutoSubError): + """Raised when input validation fails.""" + pass + + +class ResourceError(YouTubeAutoSubError): + """Raised when system resources are insufficient.""" + pass + + +def _handleError(error: Exception, context: str = "") -> None: + """Centralized error handling with context. + + Args: + error: The exception that occurred. + context: Additional context about where the error occurred. + + Returns: + None + """ + if context: + print(f"[!] ERROR in {context}: {error}") + else: + print(f"[!] ERROR: {error}") + + print(f" Full traceback: {traceback.format_exc()}") + + + + +def _runFFmpegCmd(cmd: List[str], timeout: int = 300, description: str = "FFmpeg operation") -> None: + """Run FFmpeg command with consistent error handling. + + Args: + cmd: FFmpeg command to run. + timeout: Command timeout in seconds. + description: Description for error messages. + + Raises: + RuntimeError: If FFmpeg command fails. + """ + try: + subprocess.run(cmd, check=True, timeout=timeout) + except subprocess.TimeoutExpired: + raise RuntimeError(f"{description} timed out") + except subprocess.CalledProcessError as e: + raise RuntimeError(f"{description} failed: {e}") + except Exception as e: + raise RuntimeError(f"Unexpected error during {description}: {e}") + + +def _validateAudioFile(file_path: Path, min_size: int = 1024) -> bool: + """Validate that audio file exists and has minimum size. + + Args: + file_path: Path to audio file. + min_size: Minimum file size in bytes. + + Returns: + True if file is valid, False otherwise. + """ + if not file_path.exists(): + return False + + if file_path.stat().st_size < min_size: + return False + + return True + + +def _safeFileDelete(file_path: Path) -> None: + """Safely delete file with error handling. + + Args: + file_path: Path to file to delete. + + Returns: + None + """ + try: + if file_path.exists(): + file_path.unlink() + except Exception as e: + print(f"[!] WARNING: Could not delete file {file_path}: {e}") + + + +class ProgressTracker: + """Simple progress tracking for long operations.""" + + def __init__(self, total: int, description: str = "Processing", update_interval: int = 10): + """Initialize progress tracker. + + Args: + total: Total number of items to process. + description: Description for progress messages. + update_interval: How often to update progress (every N items). + """ + self.total = total + self.description = description + self.update_interval = update_interval + self.current = 0 + + def update(self, increment: int = 1) -> None: + """Update progress counter. + + Args: + increment: Number of items processed. + + Returns: + None + """ + self.current += increment + + if self.current % self.update_interval == 0 or self.current >= self.total: + progress = (self.current / self.total) * 100 + print(f"[-] {self.description}: {self.current}/{self.total} ({progress:.1f}%)", end='\r') + + if self.current >= self.total: + print() diff --git a/src/engines.py b/src/engines.py new file mode 100644 index 0000000..ff2084e --- /dev/null +++ b/src/engines.py @@ -0,0 +1,547 @@ +""" +AI/ML Engines Module for YouTube Auto Dub. + +This module provides the core AI/ML functionality including: +- Device and configuration management +- Whisper-based speech transcription +- LM Studio translation integration +- Edge TTS synthesis +- Pipeline orchestration and chunking + +Author: Nguyen Cong Thuan Huy (mangodxd) +Version: 1.0.0 +""" + +import torch +import asyncio +import edge_tts +import gc +import json +import os +from abc import ABC +import numpy as np +from pathlib import Path +from typing import List, Dict, Optional, Union, Any + +# Local imports +from src.core_utils import ( + ModelLoadError, TranscriptionError, TranslationError, TTSError, + AudioProcessingError, _handleError, _runFFmpegCmd, ProgressTracker, + _validateAudioFile, _safeFileDelete +) +from src.translation import LMStudioTranslator, TranslationConfig + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +# Base directory of the project +BASE_DIR = Path(__file__).resolve().parent.parent + +# Working directories +CACHE_DIR = BASE_DIR / ".cache" +OUTPUT_DIR = BASE_DIR / "output" +TEMP_DIR = BASE_DIR / "temp" + +# Configuration files +LANG_MAP_FILE = BASE_DIR / "language_map.json" + +# Ensure directories exist +for directory_path in [CACHE_DIR, OUTPUT_DIR, TEMP_DIR]: + directory_path.mkdir(parents=True, exist_ok=True) + +# Audio processing settings +SAMPLE_RATE = 24000 +AUDIO_CHANNELS = 1 + +def _select_optimal_whisper_model(device: str = "cpu") -> str: + """Select optimal Whisper model based on available VRAM and device. + + Args: + device: Device type ('cuda' or 'cpu'). + + Returns: + Optimal Whisper model name. + """ + if device == "cpu": + return "base" # CPU works best with base model + + try: + import torch + if not torch.cuda.is_available(): + return "base" + + # Get VRAM information + gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3) # GB + + if gpu_memory < 4: + return "tiny" # < 4GB VRAM + elif gpu_memory < 8: + return "base" # 4-8GB VRAM + elif gpu_memory < 12: + return "small" # 8-12GB VRAM + elif gpu_memory < 16: + return "medium" # 12-16GB VRAM + else: + return "large-v3" # > 16GB VRAM - use latest large model + + except Exception: + return "base" # Fallback to base if detection fails + +ASR_MODEL = _select_optimal_whisper_model(device="cuda" if torch.cuda.is_available() else "cpu") +DEFAULT_VOICE = "en-US-AriaNeural" + + +# Load language configuration +try: + with open(LANG_MAP_FILE, "r", encoding="utf-8") as f: + LANG_DATA = json.load(f) + print(f"[*] Loaded language configuration for {len(LANG_DATA)} languages") +except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"[!] WARNING: Could not load language map from {LANG_MAP_FILE}") + LANG_DATA = {} + + +class DeviceManager: + """Centralized device detection and management.""" + + def __init__(self, device: Optional[str] = None): + """Initialize device manager. + + Args: + device: Device type ('cuda' or 'cpu'). If None, auto-detects. + """ + if device is None: + if torch.backends.mps.is_available(): #macOS + device = "mps" + elif torch.cuda.is_available(): + device = "cuda" + else: + device = "cpu" + + self.device = device + self._logDeviceInfo() + + def _logDeviceInfo(self) -> None: + """Log device information to console. + + Args: + None + + Returns: + None + """ + print(f"[*] Device initialized: {self.device.upper()}") + + if self.device == "cuda": + gpu_name = torch.cuda.get_device_name(0) + gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3) + print(f" GPU: {gpu_name} | VRAM: {gpu_memory:.1f} GB") + + def getMemoryInfo(self) -> Dict[str, float]: + """Get GPU memory usage information. + + Args: + None + + Returns: + Dictionary with allocated and reserved memory in GB. + """ + if self.device != "cuda": + return {"allocated": 0.0, "reserved": 0.0} + + return { + "allocated": torch.cuda.memory_allocated(0) / (1024**3), + "reserved": torch.cuda.memory_reserved(0) / (1024**3) + } + + def clearCache(self) -> None: + """Clear GPU cache and run garbage collection. + + Args: + None + + Returns: + None + """ + if self.device == "cuda": + torch.cuda.empty_cache() + gc.collect() + + +class ConfigManager: + """Centralized configuration access with validation.""" + + def getLanguageConfig(self, lang_code: str) -> Dict[str, Any]: + """Get language configuration by language code. + + Args: + lang_code: ISO language code. + + Returns: + Language configuration dictionary. + """ + return LANG_DATA.get(lang_code, {}) + + def extractVoice(self, voice_data, fallback_gender: str = "female") -> str: + """Extract voice string from various data formats. + + Args: + voice_data: Voice data in list, string, or other format. + fallback_gender: Default gender to use if extraction fails. + + Returns: + Voice string for TTS. + """ + if isinstance(voice_data, list): + return voice_data[0] if voice_data else DEFAULT_VOICE + if isinstance(voice_data, str): + return voice_data + return DEFAULT_VOICE + + def getVoicePool(self, lang_code: str, gender: str) -> list: + """Get pool of available voices for language and gender. + + Args: + lang_code: ISO language code. + gender: Voice gender (male/female). + + Returns: + List of available voice strings. + """ + lang_config = self.getLanguageConfig(lang_code) + voices = lang_config.get('voices', {}) + pool = voices.get(gender, [DEFAULT_VOICE]) + + if isinstance(pool, str): + pool = [pool] + + return pool + + +class PipelineComponent(ABC): + """Base class for pipeline components with shared utilities.""" + + def __init__(self, device_manager: DeviceManager, config_manager: ConfigManager): + """Initialize pipeline component. + + Args: + device_manager: Device management instance. + config_manager: Configuration management instance. + """ + self.device_manager = device_manager + self.config_manager = config_manager + self.device = device_manager.device + + def _validateFileExists(self, file_path: Path, description: str = "File") -> None: + """Validate that a file exists. + + Args: + file_path: Path to validate. + description: Description for error messages. + + Raises: + FileNotFoundError: If file doesn't exist. + """ + if not file_path.exists(): + raise FileNotFoundError(f"{description} not found: {file_path}") + + def _ensureDirectory(self, directory: Path) -> None: + """Ensure directory exists, create if necessary. + + Args: + directory: Directory path to ensure exists. + + Returns: + None + """ + directory.mkdir(parents=True, exist_ok=True) + + +# ============================================================================= +# MAIN AI/ML ENGINE +# ============================================================================= + +class Engine(PipelineComponent): + """Central AI/ML engine for YouTube Auto Dub pipeline.""" + + def __init__( + self, + device: Optional[str] = None, + translation_config: Optional[TranslationConfig] = None, + source_language_hint: Optional[str] = None, + ): + device_manager = DeviceManager(device) + config_manager = ConfigManager() + super().__init__(device_manager, config_manager) + + self._asr = None + self.source_language_hint = (source_language_hint or os.getenv("SOURCE_LANGUAGE_HINT") or "").strip() + self.detected_source_lang = self.source_language_hint or "auto" + self.translation_config = translation_config or TranslationConfig.from_env() + self.translator = LMStudioTranslator(self.translation_config) + + print(f"[+] AI Engine initialized successfully") + + @property + def asrModel(self): + """Lazy-load Whisper ASR model. + + Returns: + Loaded Whisper model instance. + + Raises: + ModelLoadError: If model fails to load. + """ + if not self._asr: + print(f"[*] Loading Whisper model ({ASR_MODEL}) on {self.device}...") + try: + from faster_whisper import WhisperModel + compute_type = "float16" if self.device == "cuda" else "int8" + self._asr = WhisperModel(ASR_MODEL, device=self.device, compute_type=compute_type) + print(f"[+] Whisper model loaded successfully") + except Exception as e: + raise ModelLoadError(f"Failed to load Whisper model: {e}") from e + return self._asr + + def _getLangConfig(self, lang: str) -> Dict: + """Get language configuration. + + Args: + lang: Language code. + + Returns: + Language configuration dictionary. + """ + return self.config_manager.getLanguageConfig(lang) + + def _extractVoiceString(self, voice_data: Union[str, List[str], None]) -> str: + """Extract voice string from data. + + Args: + voice_data: Voice data in various formats. + + Returns: + Voice string for TTS. + """ + return self.config_manager.extractVoice(voice_data) + + def releaseMemory(self, component: Optional[str] = None) -> None: + """Release VRAM and clean up GPU memory. + + Args: + component: Specific component to release ('asr'). + If None, releases all components. + + Returns: + None + """ + if component in [None, 'asr'] and self._asr: + del self._asr + self._asr = None + print("[*] ASR VRAM cleared") + self.device_manager.clearCache() + + def transcribeSafe(self, audio_path: Path) -> List[Dict]: + """Transcribe audio with automatic memory management. + + Args: + audio_path: Path to audio file. + + Returns: + List of transcription segments with timing. + + Raises: + TranscriptionError: If transcription fails. + """ + try: + res = self.transcribe(audio_path) + self.releaseMemory('asr') + return res + except Exception as e: + _handleError(e, "transcription") + raise TranscriptionError(f"Transcription failed: {e}") from e + + def translateSafe(self, texts: List[str], target_lang: str) -> List[str]: + """Translate texts safely with memory management. + + Args: + texts: List of text strings to translate. + target_lang: Target language code. + + Returns: + List of translated text strings. + """ + self.releaseMemory() + return self.translate(texts, target_lang) + + def transcribe(self, audio_path: Path) -> List[Dict]: + """Transcribe audio using Whisper model. + + Args: + audio_path: Path to audio file. + + Returns: + List of transcription segments with start/end times and text. + """ + segments, info = self.asrModel.transcribe(str(audio_path), word_timestamps=False, language=None) + detected = getattr(info, "language", "auto") or "auto" + self.detected_source_lang = self.source_language_hint or detected + print(f"[*] Detected source language: {self.detected_source_lang}") + return [{'start': s.start, 'end': s.end, 'text': s.text.strip()} for s in segments] + + def translate(self, texts: List[str], target_lang: str) -> List[str]: + """Translate texts to target language. + + Args: + texts: List of text strings to translate. + target_lang: Target language code. + + Returns: + List of translated text strings. + + Raises: + TranslationError: If translation fails. + """ + if not texts: return [] + print(f"[*] Translating {len(texts)} segments to '{target_lang}'...") + source_lang = self.detected_source_lang or "auto" + + try: + return self.translator.translate_segments( + texts=texts, + target_language=target_lang, + source_language=source_lang, + ) + except Exception as e: + _handleError(e, "translation") + raise TranslationError(f"Translation failed: {e}") from e + + def calcRate(self, text: str, target_dur: float, original_text: str = "") -> str: + """Calculate speech rate adjustment for TTS with dynamic limits. + + Args: + text: Text to be synthesized (translated text). + target_dur: Target duration in seconds. + original_text: Original text for length comparison (optional). + + Returns: + Rate adjustment string (e.g., '+10%', '-5%'). + """ + words = len(text.split()) + if words == 0 or target_dur <= 0: return "+0%" + + # Base calculation + wps = words / target_dur + estimated_time = words / wps + + if estimated_time <= target_dur: + return "+0%" + + ratio = estimated_time / target_dur + speed_percent = int((ratio - 1) * 100) + + # Dynamic speed limits based on text length comparison + if original_text: + orig_len = len(original_text.split()) + trans_len = words + + # If translated text is significantly longer, allow more slowdown + if trans_len > orig_len * 1.5: + # Allow up to -25% slowdown for longer translations + speed_percent = max(-25, min(speed_percent, 90)) + elif trans_len < orig_len * 0.7: + # If translation is shorter, be more conservative with speedup + speed_percent = max(-15, min(speed_percent, 50)) + else: + # Normal case: -10% to 90% + speed_percent = max(-10, min(speed_percent, 90)) + else: + # Fallback to original limits + speed_percent = max(-10, min(speed_percent, 90)) + + return f"{speed_percent:+d}%" + + async def synthesize( + self, + text: str, + target_lang: str, + out_path: Path, + gender: str = "female", + rate: str = "+0%" + ) -> None: + if not text.strip(): raise ValueError("Text empty") + out_path.parent.mkdir(parents=True, exist_ok=True) + + try: + lang_cfg = self._getLangConfig(target_lang) + voice_pool = self.config_manager.getVoicePool(target_lang, gender) + voice = voice_pool[0] if voice_pool else DEFAULT_VOICE + + communicate = edge_tts.Communicate(text, voice=voice, rate=rate) + await communicate.save(str(out_path)) + + if not out_path.exists() or out_path.stat().st_size < 1024: + raise RuntimeError("TTS file invalid") + + except Exception as e: + if out_path.exists(): out_path.unlink(missing_ok=True) + _handleError(e, "TTS synthesis") + raise TTSError(f"TTS failed: {e}") from e + + +def smartChunk(segments: List[Dict]) -> List[Dict]: + n = len(segments) + if n == 0: return [] + + # Calculate segment durations and gaps for dynamic analysis + durations = [s['end'] - s['start'] for s in segments] + gaps = [segments[i]['start'] - segments[i-1]['end'] for i in range(1, n)] + + # Dynamic parameters based on actual video content + avg_seg_dur = sum(durations) / n + avg_gap = sum(gaps) / len(gaps) if gaps else 0.5 + + # Dynamic min/max duration based on content characteristics + min_dur = max(1.0, avg_seg_dur * 0.5) # Minimum 1s, or 50% of average + max_dur = np.percentile(durations, 90) if n > 5 else min(15.0, avg_seg_dur * 3) + max_dur = max(5.0, min(30.0, max_dur)) # Clamp between 5-30 seconds + + # Hard threshold for gap-based splitting (1.5x average gap) + gap_threshold = max(0.4, avg_gap * 1.5) + + path = [] + curr_chunk_segs = [segments[0]] + + for i in range(1, n): + prev = segments[i-1] + curr = segments[i] + gap = curr['start'] - prev['end'] + + # Dynamic splitting criteria: + # 1. Gap exceeds threshold (natural pause) + # 2. Current chunk exceeds safe duration + # 3. Dynamic lookback: consider context but don't go too far back + current_dur = curr['end'] - curr_chunk_segs[0]['start'] + + if gap > gap_threshold or current_dur > max_dur: + # Close current chunk + path.append({ + 'start': curr_chunk_segs[0]['start'], + 'end': curr_chunk_segs[-1]['end'], + 'text': " ".join(s['text'] for s in curr_chunk_segs).strip() + }) + curr_chunk_segs = [curr] + else: + curr_chunk_segs.append(curr) + + # Add final chunk + if curr_chunk_segs: + path.append({ + 'start': curr_chunk_segs[0]['start'], + 'end': curr_chunk_segs[-1]['end'], + 'text': " ".join(s['text'] for s in curr_chunk_segs).strip() + }) + + print(f"[+] Smart chunking: {len(path)} chunks (Dynamic: min={min_dur:.1f}s, max={max_dur:.1f}s, gap_thr={gap_threshold:.2f}s)") + return path diff --git a/src/media.py b/src/media.py new file mode 100644 index 0000000..35afabb --- /dev/null +++ b/src/media.py @@ -0,0 +1,410 @@ +"""Media Processing Module for YouTube Auto Dub. + +This module handles all audio/video processing operations using FFmpeg. +It provides functionality for: +- Audio duration detection and analysis +- Silence generation for gap filling +- Audio time-stretching and duration fitting (PADDING logic added) +- Video concatenation and rendering (Volume Mixing fixed) +- Audio synchronization and mixing + +Author: Nguyen Cong Thuan Huy (mangodxd) +Version: 1.1.0 (Patched) +""" + +import subprocess +from pathlib import Path +from typing import List, Dict, Optional + +from src.engines import SAMPLE_RATE, AUDIO_CHANNELS + + +def _build_subtitle_filter(subtitle_path: Path) -> str: + """Build a Windows-safe FFmpeg subtitles filter expression.""" + escaped_path = str(subtitle_path.resolve()).replace("\\", "/").replace(":", "\\:") + return f"subtitles=filename='{escaped_path}'" + + +def _render_with_soft_subtitles(video_path: Path, output_path: Path, subtitle_path: Path) -> None: + """Fallback render path that muxes subtitles instead of hard-burning them.""" + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-i', str(video_path), + '-i', str(subtitle_path), + '-map', '0:v', + '-map', '0:a?', + '-map', '1:0', + '-c:v', 'copy', + '-c:a', 'copy', + '-c:s', 'mov_text', + str(output_path) + ] + subprocess.run(cmd, check=True, timeout=None) + + +def _render_mixed_with_soft_subtitles( + video_path: Path, + concat_file: Path, + output_path: Path, + subtitle_path: Path, + filter_complex: str, +) -> None: + """Fallback render path that muxes subtitles while preserving mixed dubbed audio.""" + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-i', str(video_path), + '-f', 'concat', '-safe', '0', '-i', str(concat_file), + '-i', str(subtitle_path), + '-filter_complex', filter_complex, + '-map', '0:v', + '-map', '[outa]', + '-map', '2:0', + '-c:v', 'copy', + '-c:a', 'aac', '-b:a', '192k', + '-ar', str(SAMPLE_RATE), + '-ac', str(AUDIO_CHANNELS), + '-c:s', 'mov_text', + '-shortest', + str(output_path), + ] + subprocess.run(cmd, check=True, timeout=None) + + +def _get_duration(path: Path) -> float: + """Get the duration of an audio/video file using FFprobe.""" + if not path.exists(): + print(f"[!] ERROR: Media file not found: {path}") + return 0.0 + + try: + cmd = [ + 'ffprobe', '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + str(path) + ] + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + timeout=60 # Increased from 30s to 60s for better reliability + ) + + duration_str = result.stdout.strip() + if duration_str: + return float(duration_str) + else: + return 0.0 + + except Exception as e: + print(f"[!] ERROR: Getting duration failed for {path}: {e}") + return 0.0 + + +def _generate_silence_segment(duration: float, silence_ref: Path) -> Optional[Path]: + """Generate a small silence segment for the concat list.""" + if duration <= 0: + return None + + # Use the parent folder of the reference silence file + output_path = silence_ref.parent / f"gap_{duration:.4f}.wav" + + if output_path.exists(): + return output_path + + try: + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-f', 'lavfi', '-i', f'anullsrc=r={SAMPLE_RATE}:cl=mono', + '-t', f"{duration:.4f}", + '-c:a', 'pcm_s16le', + str(output_path) + ] + subprocess.run(cmd, check=True) + return output_path + except Exception: + return None + +def _analyze_audio_loudness(audio_path: Path) -> Optional[float]: + """Analyze audio loudness using FFmpeg volumedetect filter. + + Args: + audio_path: Path to audio file to analyze. + + Returns: + Mean volume in dB, or None if analysis fails. + """ + if not audio_path.exists(): + return None + + try: + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-i', str(audio_path), + '-filter:a', 'volumedetect', + '-f', 'null', '-' + ] + + result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=30) + + # Parse mean volume from output + for line in result.stderr.split('\n'): + if 'mean_volume:' in line: + # Extract dB value from line like: "mean_volume: -15.2 dB" + parts = line.split() + if len(parts) >= 2: + try: + return float(parts[1]) + except ValueError: + continue + + return None + except Exception: + return None + + +def fit_audio(audio_path: Path, target_dur: float) -> Path: + if not audio_path.exists() or target_dur <= 0: + return audio_path + + actual_dur = _get_duration(audio_path) + if actual_dur == 0.0: + return audio_path + + out_path = audio_path.parent / f"{audio_path.stem}_fit.wav" + + # Increased tolerance from 0.05s to 0.15s for more natural audio + if actual_dur > target_dur + 0.15: + ratio = actual_dur / target_dur + filter_chain = [] + current_ratio = ratio + + # Dynamic speed limit: max 1.5x instead of 2.0x to avoid chipmunk effect + max_speed_ratio = 1.5 + + while current_ratio > max_speed_ratio: + filter_chain.append(f"atempo={max_speed_ratio}") + current_ratio /= max_speed_ratio + + if current_ratio > 1.0: + filter_chain.append(f"atempo={current_ratio:.4f}") + + filter_complex = ",".join(filter_chain) + + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-i', str(audio_path), + '-filter:a', f"{filter_complex},aresample=24000", + '-t', f"{target_dur:.4f}", + '-c:a', 'pcm_s16le', + str(out_path) + ] + else: + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-i', str(audio_path), + '-filter:a', f"apad,aresample=24000", + '-t', f"{target_dur:.4f}", + '-c:a', 'pcm_s16le', + str(out_path) + ] + print(f"Fiting {actual_dur:.4f}s to {target_dur:.4f}s") + + try: + subprocess.run(cmd, check=True, timeout=120) + return out_path + except Exception: + return audio_path + +def create_concat_file(segments: List[Dict], silence_ref: Path, output_txt: Path) -> None: + if not segments: + return + + try: + with open(output_txt, 'w', encoding='utf-8') as f: + current_timeline = 0.0 + + for segment in segments: + start_time = segment['start'] + end_time = segment['end'] + audio_path = segment.get('processed_audio') + + gap = start_time - current_timeline + if gap > 0.01: + silence_gap = _generate_silence_segment(gap, silence_ref) + if silence_gap: + f.write(f"file '{silence_gap.resolve().as_posix()}'\n") + current_timeline += gap + + if audio_path and audio_path.exists(): + f.write(f"file '{audio_path.resolve().as_posix()}'\n") + current_timeline += (end_time - start_time) + else: + dur = end_time - start_time + silence_err = _generate_silence_segment(dur, silence_ref) + if silence_err: + f.write(f"file '{silence_err.resolve().as_posix()}'\n") + current_timeline += dur + + except Exception as e: + raise RuntimeError(f"Failed to create concat manifest: {e}") + + +def render_video( + video_path: Path, + concat_file: Optional[Path], + output_path: Path, + subtitle_path: Optional[Path] = None, +) -> None: + """Render final video with Dynamic Volume Mixing.""" + if not video_path.exists(): + raise FileNotFoundError("Source video for rendering is missing") + + if concat_file is not None and not concat_file.exists(): + raise FileNotFoundError("Concat audio manifest for rendering is missing") + + output_path.parent.mkdir(parents=True, exist_ok=True) + + try: + print(f"[*] Rendering final video...") + + if concat_file is None: + video_codec = 'copy' + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-i', str(video_path), + '-map', '0:v', + '-map', '0:a?', + ] + + if subtitle_path: + video_codec = 'libx264' + cmd.extend(['-vf', _build_subtitle_filter(subtitle_path)]) + + cmd.extend([ + '-c:v', video_codec, + '-c:a', 'copy', + ]) + + cmd.append(str(output_path)) + try: + subprocess.run(cmd, check=True, timeout=None, capture_output=True, text=True) + except subprocess.CalledProcessError as exc: + if subtitle_path and "No such filter: 'subtitles'" in (exc.stderr or ""): + print("[!] FFmpeg subtitles filter is unavailable. Falling back to soft subtitles.") + _render_with_soft_subtitles(video_path, output_path, subtitle_path) + else: + raise + + if not output_path.exists(): + raise RuntimeError("Output file not created") + + print(f"[+] Video rendered successfully: {output_path}") + return + + # DYNAMIC VOLUME MIXING STRATEGY: + # Analyze original audio loudness to determine optimal background volume + original_loudness = _analyze_audio_loudness(video_path) + + if original_loudness is not None: + # Calculate background volume based on loudness analysis + # Target: voice should be 10-15dB louder than background + if original_loudness > -10: # Very loud audio + bg_volume = 0.08 # 8% - reduce more for loud content + elif original_loudness > -20: # Normal audio + bg_volume = 0.15 # 15% - standard reduction + else: # Quiet audio + bg_volume = 0.25 # 25% - reduce less for quiet content + + print(f"[*] Dynamic volume mixing: original={original_loudness:.1f}dB, bg_volume={bg_volume*100:.0f}%") + else: + # Fallback to default if analysis fails + bg_volume = 0.15 + print(f"[*] Using default volume mixing: bg_volume={bg_volume*100:.0f}%") + + filter_complex = ( + f"[0:a]volume={bg_volume}[bg]; " + "[bg][1:a]amix=inputs=2:duration=first:dropout_transition=0[outa]" + ) + video_codec = 'copy' + + cmd = [ + 'ffmpeg', '-y', '-v', 'error', + '-i', str(video_path), + '-f', 'concat', '-safe', '0', '-i', str(concat_file), + '-filter_complex', filter_complex, + ] + + # Handle Hard Subtitles (Requires re-encoding) + if subtitle_path: + video_codec = 'libx264' + cmd.extend(['-vf', _build_subtitle_filter(subtitle_path)]) + + cmd.extend([ + '-map', '0:v', + '-map', '[outa]', + '-c:v', video_codec, + '-c:a', 'aac', '-b:a', '192k', + '-ar', str(SAMPLE_RATE), + '-ac', str(AUDIO_CHANNELS), + '-shortest' + ]) + + cmd.append(str(output_path)) + + # Run rendering + try: + subprocess.run(cmd, check=True, timeout=None, capture_output=True, text=True) + except subprocess.CalledProcessError as exc: + if subtitle_path and "No such filter: 'subtitles'" in (exc.stderr or ""): + print("[!] FFmpeg subtitles filter is unavailable. Falling back to soft subtitles.") + _render_mixed_with_soft_subtitles( + video_path=video_path, + concat_file=concat_file, + output_path=output_path, + subtitle_path=subtitle_path, + filter_complex=filter_complex, + ) + else: + raise + + if not output_path.exists(): + raise RuntimeError("Output file not created") + + print(f"[+] Video rendered successfully: {output_path}") + + except subprocess.CalledProcessError as e: + raise RuntimeError(f"FFmpeg rendering failed: {e}") + except Exception as e: + raise RuntimeError(f"Rendering error: {e}") + + +def generate_srt(segments: List[Dict], output_path: Path) -> None: + """Generate SRT subtitle file.""" + if not segments: return + + output_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with open(output_path, 'w', encoding='utf-8') as f: + for i, segment in enumerate(segments, 1): + start = _format_timestamp_srt(segment['start']) + end = _format_timestamp_srt(segment['end']) + text = segment.get('trans_text', '').strip() + + f.write(f"{i}\n{start} --> {end}\n{text}\n\n") + + print(f"[+] SRT subtitles generated") + except Exception as e: + print(f"[!] Warning: SRT generation failed: {e}") + + +def _format_timestamp_srt(seconds: float) -> str: + """Convert seconds to HH:MM:SS,mmm.""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + millis = int((seconds % 1) * 1000) + return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" diff --git a/src/translation.py b/src/translation.py new file mode 100644 index 0000000..2fb21c6 --- /dev/null +++ b/src/translation.py @@ -0,0 +1,358 @@ +"""LM Studio translation client for YouTube Auto Dub.""" + +from __future__ import annotations + +import os +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + +import httpx + +from src.core_utils import ConfigurationError, TranslationError + +DEFAULT_LM_STUDIO_BASE_URL = "http://127.0.0.1:1234/v1" +DEFAULT_LM_STUDIO_API_KEY = "lm-studio" +DEFAULT_LM_STUDIO_MODEL = "gemma-3-4b-it" +DEFAULT_TRANSLATION_BACKEND = "lmstudio" + + +def _normalize_base_url(base_url: str) -> str: + """Normalize LM Studio base URLs to the OpenAI-compatible /v1 root.""" + if not base_url or not isinstance(base_url, str): + raise ConfigurationError("LM Studio base URL must be a non-empty string.") + + normalized = base_url.strip().rstrip("/") + if normalized.endswith("/chat/completions"): + normalized = normalized[: -len("/chat/completions")] + if not normalized.endswith("/v1"): + normalized = f"{normalized}/v1" + + parsed = urlparse(normalized) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ConfigurationError( + "LM Studio base URL must be a valid http(s) URL, for example " + "'http://127.0.0.1:1234/v1'." + ) + + return normalized + + +@dataclass(frozen=True) +class TranslationConfig: + """Runtime configuration for the translation backend.""" + + backend: str = DEFAULT_TRANSLATION_BACKEND + base_url: str = DEFAULT_LM_STUDIO_BASE_URL + api_key: str = DEFAULT_LM_STUDIO_API_KEY + model: str = DEFAULT_LM_STUDIO_MODEL + timeout_seconds: float = 45.0 + max_retries: int = 3 + retry_backoff_seconds: float = 1.0 + + @classmethod + def from_env( + cls, + backend: Optional[str] = None, + base_url: Optional[str] = None, + model: Optional[str] = None, + api_key: Optional[str] = None, + ) -> "TranslationConfig": + """Build config from environment variables plus optional overrides.""" + config = cls( + backend=(backend or os.getenv("TRANSLATION_BACKEND") or DEFAULT_TRANSLATION_BACKEND).strip().lower(), + base_url=_normalize_base_url(base_url or os.getenv("LM_STUDIO_BASE_URL") or DEFAULT_LM_STUDIO_BASE_URL), + api_key=api_key or os.getenv("LM_STUDIO_API_KEY") or DEFAULT_LM_STUDIO_API_KEY, + model=model or os.getenv("LM_STUDIO_MODEL") or DEFAULT_LM_STUDIO_MODEL, + ) + config.validate() + return config + + @property + def chat_completions_url(self) -> str: + return f"{_normalize_base_url(self.base_url)}/chat/completions" + + def validate(self) -> None: + """Validate the translation configuration.""" + if self.backend != DEFAULT_TRANSLATION_BACKEND: + raise ConfigurationError( + f"Unsupported translation backend '{self.backend}'. " + f"Only '{DEFAULT_TRANSLATION_BACKEND}' is supported." + ) + + if not self.model or not isinstance(self.model, str): + raise ConfigurationError("LM Studio model must be a non-empty string.") + + if not self.api_key or not isinstance(self.api_key, str): + raise ConfigurationError("LM Studio API key must be a non-empty string.") + + if self.timeout_seconds <= 0: + raise ConfigurationError("LM Studio timeout must be greater than zero.") + + if self.max_retries < 1: + raise ConfigurationError("LM Studio max retries must be at least 1.") + + if self.retry_backoff_seconds < 0: + raise ConfigurationError("LM Studio retry backoff cannot be negative.") + + _normalize_base_url(self.base_url) + + +def _build_system_prompt(source_language: str, target_language: str) -> str: + source_descriptor = source_language or "auto" + return ( + "You are a professional audiovisual translator.\n" + f"Translate the user-provided text from {source_descriptor} to {target_language}.\n" + "Preserve meaning, tone, style, and intent as closely as possible.\n" + "Keep punctuation natural and keep subtitle-like lines concise when the source is concise.\n" + "Return only the translation.\n" + "Do not explain anything.\n" + "Do not add notes, headings, metadata, or commentary.\n" + "Do not add quotation marks unless they are part of the source.\n" + "Preserve line breaks and segment boundaries exactly.\n" + "Keep names, brands, URLs, emails, code, and proper nouns unchanged unless transliteration " + "is clearly appropriate.\n" + "Expand abbreviations only when needed for a natural translation.\n" + "Do not censor, summarize, or omit content." + ) + + +class LMStudioTranslator: + """OpenAI-style chat completions client for LM Studio.""" + + def __init__( + self, + config: TranslationConfig, + client: Optional[httpx.Client] = None, + sleeper=time.sleep, + ) -> None: + self.config = config + self.config.validate() + self._client = client or httpx.Client(timeout=httpx.Timeout(self.config.timeout_seconds)) + self._owns_client = client is None + self._sleeper = sleeper + + def build_payload(self, text: str, source_language: str, target_language: str) -> Dict[str, Any]: + """Build the OpenAI-compatible chat completions payload.""" + return { + "model": self.config.model, + "messages": [ + {"role": "system", "content": _build_system_prompt(source_language, target_language)}, + {"role": "user", "content": text}, + ], + "temperature": 0.1, + "top_p": 1, + "stream": False, + } + + def build_user_only_payload( + self, + text: str, + source_language: str, + target_language: str, + ) -> Dict[str, Any]: + """Build a fallback payload for models that require the first turn to be user.""" + instructions = _build_system_prompt(source_language, target_language) + merged_prompt = f"{instructions}\n\nText to translate:\n{text}" + return { + "model": self.config.model, + "messages": [ + {"role": "user", "content": merged_prompt}, + ], + "temperature": 0.1, + "top_p": 1, + "stream": False, + } + + def build_structured_translation_payload( + self, + text: str, + source_language: str, + target_language: str, + ) -> Dict[str, Any]: + """Build a payload for custom translation models with structured user content.""" + return { + "model": self.config.model, + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "source_lang_code": source_language or "auto", + "target_lang_code": target_language, + "text": text, + "image": None, + } + ], + } + ], + "temperature": 0.1, + "top_p": 1, + "stream": False, + } + + @staticmethod + def parse_response_content(payload: Dict[str, Any]) -> str: + """Extract translated text from an OpenAI-compatible response payload.""" + try: + content = payload["choices"][0]["message"]["content"] + except (KeyError, IndexError, TypeError) as exc: + raise TranslationError("LM Studio response did not contain a chat completion message.") from exc + + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict) and item.get("type") == "text": + parts.append(str(item.get("text", ""))) + content = "".join(parts) + + if not isinstance(content, str): + raise TranslationError("LM Studio response content was not a text string.") + + translated = content.strip() + if not translated: + raise TranslationError("LM Studio returned an empty translation.") + + return translated + + def _headers(self) -> Dict[str, str]: + return { + "Authorization": f"Bearer {self.config.api_key}", + "Content-Type": "application/json", + } + + def _should_retry(self, exc: Exception) -> bool: + if isinstance(exc, (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout, httpx.PoolTimeout)): + return True + if isinstance(exc, httpx.HTTPStatusError): + return exc.response.status_code in {408, 409, 429, 500, 502, 503, 504} + return False + + @staticmethod + def _should_retry_with_user_only_prompt(exc: Exception) -> bool: + if not isinstance(exc, httpx.HTTPStatusError): + return False + if exc.response.status_code != 400: + return False + + response_text = exc.response.text.lower() + return "conversations must start with a user prompt" in response_text + + @staticmethod + def _should_retry_with_structured_translation_prompt(exc: Exception) -> bool: + if not isinstance(exc, httpx.HTTPStatusError): + return False + if exc.response.status_code != 400: + return False + + response_text = exc.response.text.lower() + return "source_lang_code" in response_text and "target_lang_code" in response_text + + def _post_chat_completion(self, payload: Dict[str, Any]) -> str: + response = self._client.post( + self.config.chat_completions_url, + headers=self._headers(), + json=payload, + ) + response.raise_for_status() + return self.parse_response_content(response.json()) + + def translate_text( + self, + text: str, + target_language: str, + source_language: str = "auto", + ) -> str: + """Translate a single text segment.""" + if not text.strip(): + return "" + + payload = self.build_payload(text, source_language, target_language) + last_error: Optional[Exception] = None + + for attempt in range(1, self.config.max_retries + 1): + try: + return self._post_chat_completion(payload) + except (httpx.HTTPError, ValueError, TranslationError) as exc: + last_error = exc + if self._should_retry_with_user_only_prompt(exc): + try: + fallback_payload = self.build_user_only_payload(text, source_language, target_language) + return self._post_chat_completion(fallback_payload) + except (httpx.HTTPError, ValueError, TranslationError) as fallback_exc: + last_error = fallback_exc + if self._should_retry_with_structured_translation_prompt(last_error): + try: + structured_payload = self.build_structured_translation_payload( + text, + source_language, + target_language, + ) + return self._post_chat_completion(structured_payload) + except (httpx.HTTPError, ValueError, TranslationError) as structured_exc: + last_error = structured_exc + if attempt >= self.config.max_retries or not self._should_retry(exc): + break + self._sleeper(self.config.retry_backoff_seconds * attempt) + + if isinstance(last_error, TranslationError): + raise last_error + if isinstance(last_error, ValueError): + raise TranslationError("LM Studio returned a non-JSON response.") from last_error + raise TranslationError(f"LM Studio request failed: {last_error}") from last_error + + def translate_segments( + self, + texts: List[str], + target_language: str, + source_language: str = "auto", + ) -> List[str]: + """Translate an ordered list of subtitle-like segments.""" + results: List[str] = [] + for text in texts: + results.append( + self.translate_text( + text=text, + target_language=target_language, + source_language=source_language, + ) + ) + return results + + def close(self) -> None: + if self._owns_client: + self._client.close() + + +def translate_text( + text: str, + target_language: str, + source_language: str = "auto", + config: Optional[TranslationConfig] = None, + client: Optional[httpx.Client] = None, +) -> str: + """Translate a single text string using LM Studio.""" + translator = LMStudioTranslator(config or TranslationConfig.from_env(), client=client) + try: + return translator.translate_text(text, target_language, source_language) + finally: + translator.close() + + +def translate_segments( + texts: List[str], + target_language: str, + source_language: str = "auto", + config: Optional[TranslationConfig] = None, + client: Optional[httpx.Client] = None, +) -> List[str]: + """Translate a list of text strings using LM Studio.""" + translator = LMStudioTranslator(config or TranslationConfig.from_env(), client=client) + try: + return translator.translate_segments(texts, target_language, source_language) + finally: + translator.close() diff --git a/src/youtube.py b/src/youtube.py new file mode 100644 index 0000000..b280834 --- /dev/null +++ b/src/youtube.py @@ -0,0 +1,329 @@ +"""YouTube Content Download Module for YouTube Auto Dub. + +This module provides a robust interface for downloading YouTube content +using yt-dlp. It handles: +- Video and audio extraction from YouTube URLs +- Authentication via cookies or browser integration +- Format selection and quality optimization +- Error handling and retry logic +- Metadata extraction and validation + +Author: Nguyen Cong Thuan Huy (mangodxd) +Version: 1.0.0 +""" + +import yt_dlp +from pathlib import Path +from typing import Optional, Dict, Any +from src.engines import CACHE_DIR + + +def _format_minutes_seconds(total_seconds: float) -> str: + """Format seconds as M:SS for logging.""" + seconds = int(round(total_seconds)) + minutes, remaining_seconds = divmod(seconds, 60) + return f"{minutes}:{remaining_seconds:02d}" + + +def _getOpts(browser: Optional[str] = None, + cookies_file: Optional[str] = None, + quiet: bool = True) -> Dict[str, Any]: + """Generate common yt-dlp options with authentication configuration. + + Args: + browser: Browser name for cookie extraction (chrome, edge, firefox). + If provided, cookies will be extracted from this browser. + cookies_file: Path to cookies.txt file in Netscape format. + Takes priority over browser extraction if both provided. + quiet: Whether to suppress yt-dlp output messages. + + Returns: + Dictionary of yt-dlp options. + + Raises: + ValueError: If invalid browser name is provided. + + Note: + Priority order: cookies_file > browser > no authentication. + """ + opts = { + 'quiet': quiet, + 'no_warnings': True, + 'extract_flat': False, + } + + if cookies_file: + cookies_path = Path(cookies_file) + if not cookies_path.exists(): + raise FileNotFoundError(f"Cookies file not found: {cookies_file}") + + opts['cookiefile'] = str(cookies_path) + print(f"[*] Using cookies file: {cookies_file}") + + elif browser: + valid_browsers = ['chrome', 'firefox', 'edge', 'safari', 'opera', 'brave'] + browser_lower = browser.lower() + + if browser_lower not in valid_browsers: + raise ValueError(f"Invalid browser '{browser}'. Supported: {', '.join(valid_browsers)}") + + opts['cookiesfrombrowser'] = (browser_lower,) + print(f"[*] Extracting cookies from browser: {browser}") + + else: + print(f"[*] No authentication configured (public videos only)") + + return opts + + +def getId(url: str, + browser: Optional[str] = None, + cookies_file: Optional[str] = None) -> str: + """Extract YouTube video ID from URL with authentication support. + + Args: + url: YouTube video URL to extract ID from. + browser: Browser name for cookie extraction. + cookies_file: Path to cookies.txt file. + + Returns: + YouTube video ID as string. + + Raises: + ValueError: If URL is invalid or video ID cannot be extracted. + RuntimeError: If yt-dlp fails to extract information. + + Note: + This function validates the URL and extracts metadata + without downloading the actual content. + """ + if not url or not isinstance(url, str): + raise ValueError("URL must be a non-empty string") + + if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']): + raise ValueError(f"Invalid YouTube URL: {url}") + + try: + print(f"[*] Extracting video ID from: {url[:50]}...") + + opts = _getOpts(browser=browser, cookies_file=cookies_file) + + with yt_dlp.YoutubeDL(opts) as ydl: + try: + info = ydl.extract_info(url, download=False) + video_id = info.get('id') + + if not video_id: + raise RuntimeError("No video ID found in extracted information") + + title = info.get('title', 'Unknown') + duration = info.get('duration', 0) + uploader = info.get('uploader', 'Unknown') + + print(f"[+] Video ID extracted: {video_id}") + print(f" Title: {title[:50]}{'...' if len(title) > 50 else ''}") + print(f" Duration: {duration}s ({_format_minutes_seconds(duration)})") + print(f" Uploader: {uploader}") + + return video_id + + except yt_dlp.DownloadError as e: + if "Sign in to confirm" in str(e) or "private video" in str(e).lower(): + raise ValueError(f"Authentication required for this video. Please use --browser or --cookies. Original error: {e}") + else: + raise RuntimeError(f"yt-dlp extraction failed: {e}") + + except Exception as e: + if isinstance(e, (ValueError, RuntimeError)): + raise + raise RuntimeError(f"Failed to extract video ID: {e}") from e + + +def downloadVideo(url: str, + browser: Optional[str] = None, + cookies_file: Optional[str] = None) -> Path: + """Download the best quality video with audio from YouTube. + + Args: + url: YouTube video URL to download. + browser: Browser name for cookie extraction. + cookies_file: Path to cookies.txt file. + + Returns: + Path to the downloaded video file. + + Raises: + ValueError: If URL is invalid or authentication is required. + RuntimeError: If download fails or file is corrupted. + + Note: + This function downloads both video and audio in a single file. + If the video already exists in cache, it returns the existing file. + """ + try: + video_id = getId(url, browser=browser, cookies_file=cookies_file) + except Exception as e: + raise ValueError(f"Failed to validate video URL: {e}") from e + + out_path = CACHE_DIR / f"{video_id}.mp4" + + if out_path.exists(): + file_size = out_path.stat().st_size + if file_size > 1024 * 1024: + print(f"[*] Video already cached: {out_path}") + return out_path + else: + print(f"[!] WARNING: Cached video seems too small ({file_size} bytes), re-downloading") + out_path.unlink() + + try: + print(f"[*] Downloading video: {video_id}") + + opts = _getOpts(browser=browser, cookies_file=cookies_file) + opts.update({ + 'format': ( + 'bestvideo[ext=mp4][vcodec^=avc]+bestaudio[ext=m4a]/' + 'best[ext=mp4]/' + 'best' + ), + 'outtmpl': str(out_path), + 'merge_output_format': 'mp4', + 'postprocessors': [], + }) + + with yt_dlp.YoutubeDL(opts) as ydl: + ydl.download([url]) + + if not out_path.exists(): + raise RuntimeError(f"Video file not created after download: {out_path}") + + file_size = out_path.stat().st_size + if file_size < 1024 * 1024: + raise RuntimeError(f"Downloaded video file is too small: {file_size} bytes") + + print(f"[+] Video downloaded successfully:") + print(f" File: {out_path}") + print(f" Size: {file_size / (1024*1024):.1f} MB") + + return out_path + + except yt_dlp.DownloadError as e: + error_msg = str(e).lower() + if "sign in to confirm" in error_msg or "private video" in error_msg: + raise ValueError( + f"Authentication required for this video. Please try:\n" + f"1. Close all browser windows and use --browser\n" + f"2. Export fresh cookies.txt and use --cookies\n" + f"3. Check if video is public/accessible\n" + f"Original error: {e}" + ) + else: + raise RuntimeError(f"Video download failed: {e}") + + except Exception as e: + if out_path.exists(): + out_path.unlink() + raise RuntimeError(f"Video download failed: {e}") from e + + +def downloadAudio(url: str, + browser: Optional[str] = None, + cookies_file: Optional[str] = None) -> Path: + """Download audio-only from YouTube for transcription processing. + + Args: + url: YouTube video URL to extract audio from. + browser: Browser name for cookie extraction. + cookies_file: Path to cookies.txt file. + + Returns: + Path to the downloaded WAV audio file. + + Raises: + ValueError: If URL is invalid or authentication is required. + RuntimeError: If audio download or conversion fails. + + Note: + The output is always in WAV format at the project's sample rate + for consistency with the transcription pipeline. + """ + try: + video_id = getId(url, browser=browser, cookies_file=cookies_file) + except Exception as e: + raise ValueError(f"Failed to validate video URL: {e}") from e + + temp_path = CACHE_DIR / f"{video_id}" + final_path = CACHE_DIR / f"{video_id}.wav" + + if final_path.exists(): + file_size = final_path.stat().st_size + if file_size > 1024 * 100: + print(f"[*] Audio already cached: {final_path}") + return final_path + else: + print(f"[!] WARNING: Cached audio seems too small ({file_size} bytes), re-downloading") + final_path.unlink() + + try: + print(f"[*] Downloading audio: {video_id}") + + opts = _getOpts(browser=browser, cookies_file=cookies_file) + opts.update({ + 'format': 'bestaudio/best', + 'outtmpl': str(temp_path), + 'postprocessors': [{ + 'key': 'FFmpegExtractAudio', + 'preferredcodec': 'wav', + 'preferredquality': '192', + }], + }) + + with yt_dlp.YoutubeDL(opts) as ydl: + ydl.download([url]) + + if not final_path.exists(): + temp_files = list(CACHE_DIR.glob(f"{video_id}.*")) + if temp_files: + print(f"[!] WARNING: Expected {final_path} but found {temp_files[0]}") + final_path = temp_files[0] + else: + raise RuntimeError(f"Audio file not created after download: {final_path}") + + file_size = final_path.stat().st_size + if file_size < 1024 * 100: + raise RuntimeError(f"Downloaded audio file is too small: {file_size} bytes") + + print(f"[+] Audio downloaded successfully:") + print(f" File: {final_path}") + print(f" Size: {file_size / (1024*1024):.1f} MB") + + try: + from src.media import _get_duration + duration = _get_duration(final_path) + if duration > 0: + print(f" Duration: {duration:.1f}s ({_format_minutes_seconds(duration)})") + else: + print(f"[!] WARNING: Could not determine audio duration") + except Exception as e: + print(f"[!] WARNING: Audio validation failed: {e}") + + return final_path + + except yt_dlp.DownloadError as e: + error_msg = str(e).lower() + if "sign in to confirm" in error_msg or "private video" in error_msg: + raise ValueError( + f"Authentication required for this video. Please try:\n" + f"1. Close all browser windows and use --browser\n" + f"2. Export fresh cookies.txt and use --cookies\n" + f"3. Check if video is public/accessible\n" + f"Original error: {e}" + ) + else: + raise RuntimeError(f"Audio download failed: {e}") + + except Exception as e: + for path in [temp_path, final_path]: + if path.exists(): + path.unlink() + raise RuntimeError(f"Audio download failed: {e}") from e diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..292f3b6 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,11 @@ +"""Pytest configuration for local imports.""" + +from __future__ import annotations + +import sys +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_main_cli.py b/tests/test_main_cli.py new file mode 100644 index 0000000..35f2639 --- /dev/null +++ b/tests/test_main_cli.py @@ -0,0 +1,61 @@ +"""Tests for CLI parser and translation config wiring.""" + +from __future__ import annotations + +from main import _build_translation_config, build_parser + + +def test_parser_accepts_lmstudio_flags(): + parser = build_parser() + + args = parser.parse_args( + [ + "https://youtube.com/watch?v=demo", + "--translation-backend", + "lmstudio", + "--lmstudio-base-url", + "http://localhost:1234/v1", + "--lmstudio-model", + "gemma-custom", + ] + ) + + assert args.translation_backend == "lmstudio" + assert args.lmstudio_base_url == "http://localhost:1234/v1" + assert args.lmstudio_model == "gemma-custom" + + +def test_translation_config_prefers_cli_over_env(monkeypatch): + monkeypatch.setenv("LM_STUDIO_BASE_URL", "http://env-host:1234/v1") + monkeypatch.setenv("LM_STUDIO_MODEL", "env-model") + + parser = build_parser() + args = parser.parse_args( + [ + "https://youtube.com/watch?v=demo", + "--lmstudio-base-url", + "http://cli-host:1234/v1", + "--lmstudio-model", + "cli-model", + ] + ) + + config = _build_translation_config(args) + + assert config.base_url == "http://cli-host:1234/v1" + assert config.model == "cli-model" + + +def test_translation_config_uses_env_defaults(monkeypatch): + monkeypatch.setenv("LM_STUDIO_BASE_URL", "http://env-host:1234/v1") + monkeypatch.setenv("LM_STUDIO_MODEL", "env-model") + monkeypatch.setenv("LM_STUDIO_API_KEY", "env-key") + + parser = build_parser() + args = parser.parse_args(["https://youtube.com/watch?v=demo"]) + + config = _build_translation_config(args) + + assert config.base_url == "http://env-host:1234/v1" + assert config.model == "env-model" + assert config.api_key == "env-key" diff --git a/tests/test_translation.py b/tests/test_translation.py new file mode 100644 index 0000000..b067615 --- /dev/null +++ b/tests/test_translation.py @@ -0,0 +1,136 @@ +"""Tests for the LM Studio translation layer.""" + +from __future__ import annotations + +import httpx +import pytest + +from src.core_utils import TranslationError +from src.translation import LMStudioTranslator, TranslationConfig + + +def _mock_client(handler): + return httpx.Client(transport=httpx.MockTransport(handler)) + + +def test_translation_config_normalizes_base_url(): + config = TranslationConfig.from_env(base_url="http://127.0.0.1:1234") + + assert config.base_url == "http://127.0.0.1:1234/v1" + assert config.chat_completions_url == "http://127.0.0.1:1234/v1/chat/completions" + assert config.model == "gemma-3-4b-it" + + +def test_build_payload_includes_model_and_prompt(): + translator = LMStudioTranslator(TranslationConfig(), client=_mock_client(lambda request: None)) + + payload = translator.build_payload("Hello world", "en", "es") + + assert payload["model"] == "gemma-3-4b-it" + assert payload["messages"][0]["role"] == "system" + assert "Translate the user-provided text from en to es." in payload["messages"][0]["content"] + assert payload["messages"][1]["content"] == "Hello world" + + +def test_translate_segments_preserves_order_and_blank_segments(): + def handler(request: httpx.Request) -> httpx.Response: + text = request.read().decode("utf-8") + if "first" in text: + content = "primero" + elif "third" in text: + content = "tercero" + else: + content = "desconocido" + return httpx.Response(200, json={"choices": [{"message": {"content": content}}]}) + + translator = LMStudioTranslator(TranslationConfig(), client=_mock_client(handler)) + + translated = translator.translate_segments(["first", "", "third"], target_language="es", source_language="en") + + assert translated == ["primero", "", "tercero"] + + +def test_retry_on_transient_http_error_then_succeeds(): + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(503, json={"error": {"message": "busy"}}) + return httpx.Response(200, json={"choices": [{"message": {"content": "hola"}}]}) + + translator = LMStudioTranslator( + TranslationConfig(max_retries=2), + client=_mock_client(handler), + sleeper=lambda _: None, + ) + + translated = translator.translate_text("hello", target_language="es", source_language="en") + + assert translated == "hola" + assert attempts["count"] == 2 + + +def test_parse_response_content_rejects_empty_content(): + with pytest.raises(TranslationError, match="empty translation"): + LMStudioTranslator.parse_response_content({"choices": [{"message": {"content": " "}}]}) + + +def test_translate_text_raises_on_malformed_response(): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"choices": []}) + + translator = LMStudioTranslator(TranslationConfig(), client=_mock_client(handler)) + + with pytest.raises(TranslationError, match="did not contain a chat completion message"): + translator.translate_text("hello", target_language="es", source_language="en") + + +def test_translate_text_falls_back_to_user_only_prompt_for_template_error(): + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + body = request.read().decode("utf-8") + if attempts["count"] == 1: + return httpx.Response( + 400, + text='{"error":"Error rendering prompt with jinja template: \\"Conversations must start with a user prompt.\\""}', + ) + assert '"role":"user"' in body + return httpx.Response(200, json={"choices": [{"message": {"content": "hola"}}]}) + + translator = LMStudioTranslator(TranslationConfig(), client=_mock_client(handler)) + + translated = translator.translate_text("hello", target_language="es", source_language="en") + + assert translated == "hola" + assert attempts["count"] == 2 + + +def test_translate_text_falls_back_to_structured_prompt_for_custom_template(): + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + body = request.read().decode("utf-8") + if attempts["count"] == 1: + return httpx.Response( + 400, + text='{"error":"Error rendering prompt with jinja template: \\"Conversations must start with a user prompt.\\""}', + ) + if attempts["count"] == 2: + return httpx.Response( + 400, + text='{"error":"Error rendering prompt with jinja template: \\"User role must provide `content` as an iterable with exactly one item. That item must be a mapping(type:\'text\' | \'image\', source_lang_code:string, target_lang_code:string, text:string | none, image:string | none).\\""}', + ) + assert '"source_lang_code":"en"' in body + assert '"target_lang_code":"es"' in body + return httpx.Response(200, json={"choices": [{"message": {"content": "hola"}}]}) + + translator = LMStudioTranslator(TranslationConfig(), client=_mock_client(handler)) + + translated = translator.translate_text("hello", target_language="es", source_language="en") + + assert translated == "hola" + assert attempts["count"] == 3