diff --git a/packages/markitdown/src/markitdown/converters/_youtube_converter.py b/packages/markitdown/src/markitdown/converters/_youtube_converter.py index c96e8f4f6..dba4c6ad0 100644 --- a/packages/markitdown/src/markitdown/converters/_youtube_converter.py +++ b/packages/markitdown/src/markitdown/converters/_youtube_converter.py @@ -1,7 +1,8 @@ import json -import time import re import bs4 +import requests +from http.cookiejar import MozillaCookieJar from typing import Any, BinaryIO, Dict, List, Union from urllib.parse import parse_qs, urlparse, unquote @@ -145,7 +146,15 @@ def convert( webpage_text += f"\n### Description\n{description}\n" if IS_YOUTUBE_TRANSCRIPT_CAPABLE: - ytt_api = YouTubeTranscriptApi() + cookie_path = kwargs.get("youtube_cookie_path", None) + http_client = None + if cookie_path: + _jar = MozillaCookieJar(cookie_path) + # ignore_discard/expires: browser-exported cookies are often session-scoped + _jar.load(ignore_discard=True, ignore_expires=True) + http_client = requests.Session() + http_client.cookies = _jar # type: ignore + ytt_api = YouTubeTranscriptApi(http_client=http_client) transcript_text = "" parsed_url = urlparse(stream_info.url) # type: ignore params = parse_qs(parsed_url.query) # type: ignore @@ -160,18 +169,14 @@ def convert( youtube_transcript_languages = kwargs.get( "youtube_transcript_languages", languages ) - # Retry the transcript fetching operation - transcript = self._retry_operation( - lambda: ytt_api.fetch( - video_id, languages=youtube_transcript_languages - ), - retries=3, # Retry 3 times - delay=2, # 2 seconds delay between retries + # Use list→find_transcript→fetch (reliable in v1.2.x) + t = transcript_list.find_transcript( + youtube_transcript_languages ) - - if transcript: + fetched = t.fetch() + if fetched: transcript_text = " ".join( - [part.text for part in transcript] + [part.text for part in fetched] ) # type: ignore except Exception as e: # No transcript available @@ -222,17 +227,3 @@ def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json t if result := self._findKey(v, key): return result return None - - def _retry_operation(self, operation, retries=3, delay=2): - """Retries the operation if it fails.""" - attempt = 0 - while attempt < retries: - try: - return operation() # Attempt the operation - except Exception as e: - print(f"Attempt {attempt + 1} failed: {e}") - if attempt < retries - 1: - time.sleep(delay) # Wait before retrying - attempt += 1 - # If all attempts fail, raise the last exception - raise Exception(f"Operation failed after {retries} attempts.") diff --git a/packages/markitdown/tests/test_module_misc.py b/packages/markitdown/tests/test_module_misc.py index 4d62e4919..5949833a5 100644 --- a/packages/markitdown/tests/test_module_misc.py +++ b/packages/markitdown/tests/test_module_misc.py @@ -4,9 +4,12 @@ import re import shutil import pytest -from unittest.mock import MagicMock +import tempfile +import requests +from unittest.mock import MagicMock, patch from markitdown._uri_utils import parse_data_uri, file_uri_to_path +from markitdown.converters._youtube_converter import YouTubeConverter from markitdown import ( MarkItDown, @@ -532,6 +535,85 @@ def test_markitdown_llm() -> None: validate_strings(result, PPTX_TEST_STRINGS) +# Minimal YouTube-shaped HTML for unit tests (no network required) +_YT_FAKE_HTML = b"""
+