Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 17 additions & 26 deletions packages/markitdown/src/markitdown/converters/_youtube_converter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import time
import re
import bs4
import requests
from http.cookiejar import MozillaCookieJar
from typing import Any, BinaryIO, Dict, List, Union
from urllib.parse import parse_qs, urlparse, unquote

Expand Down Expand Up @@ -145,7 +146,15 @@ def convert(
webpage_text += f"\n### Description\n{description}\n"

if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
ytt_api = YouTubeTranscriptApi()
cookie_path = kwargs.get("youtube_cookie_path", None)
http_client = None
if cookie_path:
_jar = MozillaCookieJar(cookie_path)
# ignore_discard/expires: browser-exported cookies are often session-scoped
_jar.load(ignore_discard=True, ignore_expires=True)
http_client = requests.Session()
http_client.cookies = _jar # type: ignore
ytt_api = YouTubeTranscriptApi(http_client=http_client)
transcript_text = ""
parsed_url = urlparse(stream_info.url) # type: ignore
params = parse_qs(parsed_url.query) # type: ignore
Expand All @@ -160,18 +169,14 @@ def convert(
youtube_transcript_languages = kwargs.get(
"youtube_transcript_languages", languages
)
# Retry the transcript fetching operation
transcript = self._retry_operation(
lambda: ytt_api.fetch(
video_id, languages=youtube_transcript_languages
),
retries=3, # Retry 3 times
delay=2, # 2 seconds delay between retries
# Use list→find_transcript→fetch (reliable in v1.2.x)
t = transcript_list.find_transcript(
youtube_transcript_languages
)

if transcript:
fetched = t.fetch()
if fetched:
transcript_text = " ".join(
[part.text for part in transcript]
[part.text for part in fetched]
) # type: ignore
except Exception as e:
# No transcript available
Expand Down Expand Up @@ -222,17 +227,3 @@ def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json t
if result := self._findKey(v, key):
return result
return None

def _retry_operation(self, operation, retries=3, delay=2):
"""Retries the operation if it fails."""
attempt = 0
while attempt < retries:
try:
return operation() # Attempt the operation
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay) # Wait before retrying
attempt += 1
# If all attempts fail, raise the last exception
raise Exception(f"Operation failed after {retries} attempts.")
86 changes: 85 additions & 1 deletion packages/markitdown/tests/test_module_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import re
import shutil
import pytest
from unittest.mock import MagicMock
import tempfile
import requests
from unittest.mock import MagicMock, patch

from markitdown._uri_utils import parse_data_uri, file_uri_to_path
from markitdown.converters._youtube_converter import YouTubeConverter

from markitdown import (
MarkItDown,
Expand Down Expand Up @@ -532,6 +535,85 @@ def test_markitdown_llm() -> None:
validate_strings(result, PPTX_TEST_STRINGS)


# Minimal YouTube-shaped HTML for unit tests (no network required)
_YT_FAKE_HTML = b"""<html><head>
<title>Test Video</title>
<meta name="name" content="Test Video"/>
</head><body></body></html>"""


def test_youtube_cookie_path_builds_http_client() -> None:
"""When youtube_cookie_path is passed, YouTubeTranscriptApi must receive
an http_client (requests.Session with loaded cookies)."""
with (
patch(
"markitdown.converters._youtube_converter.IS_YOUTUBE_TRANSCRIPT_CAPABLE",
True,
),
patch(
"markitdown.converters._youtube_converter.YouTubeTranscriptApi"
) as mock_cls,
):
mock_transcript = MagicMock()
mock_transcript.fetch.return_value = []
mock_cls.return_value.list.return_value.find_transcript.return_value = (
mock_transcript
)

with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False
) as f:
f.write("# Netscape HTTP Cookie File\n")
cookie_path = f.name

try:
converter = YouTubeConverter()
stream = io.BytesIO(_YT_FAKE_HTML)
si = StreamInfo(
mimetype="text/html",
url="https://www.youtube.com/watch?v=test123",
)
converter.convert(stream, si, youtube_cookie_path=cookie_path)

_, kwargs = mock_cls.call_args
assert "http_client" in kwargs, (
"YouTubeTranscriptApi must receive http_client when cookie path is given"
)
assert isinstance(kwargs["http_client"], requests.Session)
finally:
os.unlink(cookie_path)


def test_youtube_transcript_uses_list_find_fetch() -> None:
"""Transcript fetch must use api.list() → find_transcript() → fetch()."""
with (
patch(
"markitdown.converters._youtube_converter.IS_YOUTUBE_TRANSCRIPT_CAPABLE",
True,
),
patch(
"markitdown.converters._youtube_converter.YouTubeTranscriptApi"
) as mock_cls,
):
mock_transcript = MagicMock()
mock_transcript.fetch.return_value = [MagicMock(text="hello world")]
mock_cls.return_value.list.return_value.find_transcript.return_value = (
mock_transcript
)

converter = YouTubeConverter()
stream = io.BytesIO(_YT_FAKE_HTML)
si = StreamInfo(
mimetype="text/html",
url="https://www.youtube.com/watch?v=test123",
)
result = converter.convert(stream, si)

assert "hello world" in result.markdown
mock_cls.return_value.list.assert_called_once_with("test123")
mock_transcript.fetch.assert_called_once()


if __name__ == "__main__":
"""Runs this file's tests from the command line."""
for test in [
Expand All @@ -547,6 +629,8 @@ def test_markitdown_llm() -> None:
test_markitdown_exiftool,
test_markitdown_llm_parameters,
test_markitdown_llm,
test_youtube_cookie_path_builds_http_client,
test_youtube_transcript_uses_list_find_fetch,
]:
print(f"Running {test.__name__}...", end="")
test()
Expand Down