Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/crawlee/request_loaders/_request_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from logging import getLogger
from typing import Annotated

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, ValidationError
from typing_extensions import override

from crawlee._request import Request
Expand Down Expand Up @@ -106,10 +106,14 @@ async def _get_state(self) -> RequestListState:
if self._persist_request_data:
async with self._requests_lock:
if not await self._requests_data.has_persisted_state():
self._requests_data.current_value.requests = [
request if isinstance(request, Request) else Request.from_url(request)
async for request in self._requests
]
self._requests_data.current_value.requests = []
async for processing_request in self._requests:
try:
request = self._transform_request(processing_request)
except ValidationError:
logger.warning(f'Invalid request encountered in the request list: {processing_request}')
continue
self._requests_data.current_value.requests.append(request)
await self._requests_data.persist_state()

self._requests = self._iterate_in_threadpool(
Expand Down Expand Up @@ -202,11 +206,18 @@ async def _ensure_next_request(self) -> None:
self._next = (self._next[0], to_enqueue[0])

async def _dequeue_requests(self, count: int) -> AsyncGenerator[Request | None]:
for _ in range(count):
while count > 0:
try:
yield self._transform_request(await self._requests.__anext__())
except StopAsyncIteration: # noqa: PERF203
processing_request = await self._requests.__anext__()
try:
request = self._transform_request(processing_request)
except ValidationError:
logger.warning(f'Invalid request encountered in the request list: {processing_request}')
continue
yield request
except StopAsyncIteration:
yield None
count -= 1

async def _iterate_in_threadpool(self, iterable: Iterable[str | Request]) -> AsyncIterator[str | Request]:
"""Inspired by a function of the same name from encode/starlette."""
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/request_loaders/test_request_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,31 @@ async def test_persist_requests_key_only_persists_once() -> None:
fetched_request = await request_list_2.fetch_next_request()
assert fetched_request is not None
assert fetched_request.url == 'https://once2.placeholder.com' # From original data


async def test_handle_invalid_url() -> None:
"""Test that invalid URLs are handled gracefully."""
request_list = RequestList(['invalid-url.com', 'https://valid.placeholder.com'])

# First request is invalid, should be skipped without crashing
request = await request_list.fetch_next_request()
assert request is not None
assert request.url == 'https://valid.placeholder.com'
await request_list.mark_request_as_handled(request)


async def test_handle_invalid_url_with_persistence() -> None:
"""Test that invalid URLs are handled gracefully even when persistence is enabled."""
persist_key = 'test_invalid_url_persistence'
request_list = RequestList(['invalid-url.com', 'https://valid.placeholder.com'], persist_requests_key=persist_key)

# First request is invalid, should be skipped without crashing
request = await request_list.fetch_next_request()
assert request is not None
assert request.url == 'https://valid.placeholder.com'
await request_list.mark_request_as_handled(request)

# Check that the valid URL was persisted and the invalid one was not
kvs = await KeyValueStore.open()
persisted_data = await kvs.get_value(persist_key)
assert persisted_data is not None
Loading