diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 953a4d81..f731f433 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -13,6 +13,7 @@ from typing_extensions import override from apify_client import ApifyClientAsync +from crawlee._utils.crypto import crypto_random_object_id from crawlee.storage_clients._base import RequestQueueClient from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata @@ -65,10 +66,7 @@ def __init__( self, *, api_client: RequestQueueClientAsync, - id: str, - name: str | None, - total_request_count: int, - handled_request_count: int, + metadata: RequestQueueMetadata, ) -> None: """Initialize a new instance. @@ -77,11 +75,8 @@ def __init__( self._api_client = api_client """The Apify request queue client for API operations.""" - self._id = id - """The ID of the request queue.""" - - self._name = name - """The name of the request queue.""" + self._metadata = metadata + """Additional data related to the RequestQueue.""" self._queue_head = deque[str]() """A deque to store request unique keys in the queue head.""" @@ -95,40 +90,43 @@ def __init__( self._should_check_for_forefront_requests = False """Whether to check for forefront requests in the next list_head call.""" - self._had_multiple_clients = False - """Whether the request queue has been accessed by multiple clients.""" - - self._initial_total_count = total_request_count - """The initial total request count (from the API) when the queue was opened.""" - - self._initial_handled_count = handled_request_count - """The initial handled request count (from the API) when the queue was opened.""" - - self._assumed_total_count = 0 - """The number of requests we assume are in the queue (tracked manually for this instance).""" - - self._assumed_handled_count = 0 - """The number of requests we assume have been handled (tracked manually for this instance).""" - self._fetch_lock = asyncio.Lock() """Fetch lock to minimize race conditions when communicating with API.""" + async def _get_metadata_estimate(self) -> RequestQueueMetadata: + """Try to get cached metadata first. If multiple clients, fuse with global metadata. + + This method is used internally to avoid unnecessary API call unless needed (multiple clients). + Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one + client, it is the better choice. + """ + if self._metadata.had_multiple_clients: + return await self.get_metadata() + # Get local estimation (will not include changes done bo another client) + return self._metadata + @override async def get_metadata(self) -> RequestQueueMetadata: - total_count = self._initial_total_count + self._assumed_total_count - handled_count = self._initial_handled_count + self._assumed_handled_count - pending_count = total_count - handled_count + """Get metadata about the request queue. + Returns: + Metadata from the API, merged with local estimation, because in some cases, the data from the API can + be delayed. + """ + response = await self._api_client.get() + if response is None: + raise ValueError('Failed to fetch request queue metadata from the API.') + # Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.) return RequestQueueMetadata( - id=self._id, - name=self._name, - total_request_count=total_count, - handled_request_count=handled_count, - pending_request_count=pending_count, - created_at=datetime.now(timezone.utc), - modified_at=datetime.now(timezone.utc), - accessed_at=datetime.now(timezone.utc), - had_multiple_clients=self._had_multiple_clients, + id=response['id'], + name=response['name'], + total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count), + handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count), + pending_request_count=response['pendingRequestCount'], + created_at=min(response['createdAt'], self._metadata.created_at), + modified_at=max(response['modifiedAt'], self._metadata.modified_at), + accessed_at=max(response['accessedAt'], self._metadata.accessed_at), + had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients, ) @classmethod @@ -187,27 +185,34 @@ async def open( ) apify_rqs_client = apify_client_async.request_queues() - # If both id and name are provided, raise an error. - if id and name: - raise ValueError('Only one of "id" or "name" can be specified, not both.') - - # If id is provided, get the storage by ID. - if id and name is None: - apify_rq_client = apify_client_async.request_queue(request_queue_id=id) + match (id, name): + case (None, None): + # If both id and name are None, try to get the default storage ID from environment variables. + # The default storage ID environment variable is set by the Apify platform. It also contains + # a new storage ID after Actor's reboot or migration. + id = configuration.default_request_queue_id + case (None, name): + # If only name is provided, get or create the storage by name. + id = RequestQueueMetadata.model_validate( + await apify_rqs_client.get_or_create(name=name), + ).id + case (_, None): + # If only id is provided, use it. + pass + case (_, _): + # If both id and name are provided, raise an error. + raise ValueError('Only one of "id" or "name" can be specified, not both.') + if id is None: + raise RuntimeError('Unreachable code') - # If name is provided, get or create the storage by name. - if name and id is None: - id = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(name=name), - ).id - apify_rq_client = apify_client_async.request_queue(request_queue_id=id) + # Use suitable client_key to make `hadMultipleClients` response of Apify API useful. + # It should persist across migrated or resurrected Actor runs on the Apify platform. + _api_max_client_key_length = 32 + client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[ + :_api_max_client_key_length + ] - # If both id and name are None, try to get the default storage ID from environment variables. - # The default storage ID environment variable is set by the Apify platform. It also contains - # a new storage ID after Actor's reboot or migration. - if id is None and name is None: - id = configuration.default_request_queue_id - apify_rq_client = apify_client_async.request_queue(request_queue_id=id) + apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key) # Fetch its metadata. metadata = await apify_rq_client.get() @@ -217,27 +222,18 @@ async def open( id = RequestQueueMetadata.model_validate( await apify_rqs_client.get_or_create(), ).id - apify_rq_client = apify_client_async.request_queue(request_queue_id=id) + apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key) # Verify that the storage exists by fetching its metadata again. metadata = await apify_rq_client.get() if metadata is None: raise ValueError(f'Opening request queue with id={id} and name={name} failed.') - metadata_model = RequestQueueMetadata.model_validate( - await apify_rqs_client.get_or_create(), - ) - - # Ensure we have a valid ID. - if id is None: - raise ValueError('Request queue ID cannot be None.') + metadata_model = RequestQueueMetadata.model_validate(metadata) return cls( api_client=apify_rq_client, - id=id, - name=name, - total_request_count=metadata_model.total_request_count, - handled_request_count=metadata_model.handled_request_count, + metadata=metadata_model, ) @override @@ -341,7 +337,7 @@ async def add_batch_of_requests( if not processed_request.was_already_present and not processed_request.was_already_handled: new_request_count += 1 - self._assumed_total_count += new_request_count + self._metadata.total_request_count += new_request_count return api_response @@ -439,7 +435,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | # Update assumed handled count if this wasn't already handled if not processed_request.was_already_handled: - self._assumed_handled_count += 1 + self._metadata.handled_request_count += 1 # Update the cache with the handled request cache_key = request.unique_key @@ -487,7 +483,7 @@ async def reclaim_request( # If the request was previously handled, decrement our handled count since # we're putting it back for processing. if request.was_already_handled and not processed_request.was_already_handled: - self._assumed_handled_count -= 1 + self._metadata.handled_request_count -= 1 # Update the cache cache_key = request.unique_key @@ -645,7 +641,7 @@ async def _list_head( if cached_request and cached_request.hydrated: items.append(cached_request.hydrated) - metadata = await self.get_metadata() + metadata = await self._get_metadata_estimate() return RequestQueueHead( limit=limit, @@ -672,6 +668,8 @@ async def _list_head( # Update the queue head cache self._queue_has_locked_requests = response.get('queueHasLockedRequests', False) + # Check if there is another client working with the RequestQueue + self._metadata.had_multiple_clients = response.get('hadMultipleClients', False) for request_data in response.get('items', []): request = Request.model_validate(request_data) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index cbea845d..b5323272 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -5,5 +5,13 @@ def generate_unique_resource_name(label: str) -> str: """Generates a unique resource name, which will contain the given label.""" + name_template = 'python-sdk-tests-{}-generated-{}' + template_length = len(name_template.format('', '')) + api_name_limit = 63 + generated_random_id_length = 8 + label_length_limit = api_name_limit - template_length - generated_random_id_length + label = label.replace('_', '-') - return f'python-sdk-tests-{label}-generated-{crypto_random_object_id(8)}' + assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' + + return name_template.format(label, crypto_random_object_id(generated_random_id_length)) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 66ced9c3..9c230acd 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -18,13 +18,15 @@ import apify._actor from ._utils import generate_unique_resource_name +from apify import Actor from apify._models import ActorRun if TYPE_CHECKING: - from collections.abc import Awaitable, Callable, Coroutine, Iterator, Mapping + from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterator, Mapping from decimal import Decimal from apify_client.clients.resource_clients import ActorClientAsync + from crawlee.storages import RequestQueue _TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN' _API_URL_ENV_VAR = 'APIFY_INTEGRATION_TESTS_API_URL' @@ -104,6 +106,18 @@ def apify_client_async(apify_token: str) -> ApifyClientAsync: return ApifyClientAsync(apify_token, api_url=api_url) +@pytest.fixture +async def request_queue_force_cloud(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> AsyncGenerator[RequestQueue]: + """Create an instance of the Apify request queue on the platform and drop it when the test is finished.""" + request_queue_name = generate_unique_resource_name('request_queue') + monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token) + + async with Actor: + rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) + yield rq + await rq.drop() + + @pytest.fixture(scope='session') def sdk_wheel_path(tmp_path_factory: pytest.TempPathFactory, testrun_uid: str) -> Path: """Build the package wheel if it hasn't been built yet, and return the path to the wheel.""" diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index b9abc2a1..80a8b628 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -11,6 +11,7 @@ from ._utils import generate_unique_resource_name from apify import Actor, Request +from apify._models import ActorRun if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -100,18 +101,17 @@ async def test_force_cloud( async def test_request_queue_is_finished( apify_named_rq: RequestQueue, ) -> None: - request_queue = await Actor.open_request_queue(name=apify_named_rq.name, force_cloud=True) - await request_queue.add_request(Request.from_url('http://example.com')) - assert not await request_queue.is_finished() + await apify_named_rq.add_request(Request.from_url('http://example.com')) + assert not await apify_named_rq.is_finished() - request = await request_queue.fetch_next_request() + request = await apify_named_rq.fetch_next_request() assert request is not None - assert not await request_queue.is_finished(), ( + assert not await apify_named_rq.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) - await request_queue.mark_request_as_handled(request) - assert await request_queue.is_finished() + await apify_named_rq.mark_request_as_handled(request) + assert await apify_named_rq.is_finished() async def test_request_queue_deduplication( @@ -318,3 +318,83 @@ def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dic Actor.log.info(stats_after) assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 + + +async def test_request_queue_had_multiple_clients_platform( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, +) -> None: + """Test that `RequestQueue` clients created with different `client_key` appear as distinct clients.""" + + async def main() -> None: + from apify_client import ApifyClientAsync + + async with Actor: + rq_1 = await Actor.open_request_queue() + await rq_1.fetch_next_request() + + # Accessed with client created explicitly with `client_key=None` should appear as distinct client + api_client = ApifyClientAsync(token=Actor.configuration.token).request_queue( + request_queue_id=rq_1.id, client_key=None + ) + await api_client.list_head() + + assert (await rq_1.get_metadata()).had_multiple_clients is True + + actor = await make_actor(label='rq-had-multiple-clients', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' + + +async def test_request_queue_not_had_multiple_clients_platform( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, +) -> None: + """Test that same `RequestQueue` created from Actor does not act as multiple clients.""" + + async def main() -> None: + async with Actor: + rq_1 = await Actor.open_request_queue() + # Two calls to API to create situation where unset `client_key` can cause `had_multiple_clients` to True + await rq_1.fetch_next_request() + await rq_1.fetch_next_request() + + assert (await rq_1.get_metadata()).had_multiple_clients is False + + actor = await make_actor(label='rq-not-had-multiple-clients', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' + + +async def test_request_queue_not_had_multiple_clients_platform_resurrection( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + """Test `RequestQueue` created from Actor does not act as multiple clients even after resurrection.""" + + async def main() -> None: + async with Actor: + rq_1 = await Actor.open_request_queue() + assert (await rq_1.get_metadata()).had_multiple_clients is False, 'Not accessed yet, should be False' + + await rq_1.fetch_next_request() + + assert (await rq_1.get_metadata()).had_multiple_clients is False, ( + 'Accessed with the same client, should be False' + ) + + actor = await make_actor(label='rq-clients-resurrection', main_func=main) + run_result = await run_actor(actor) + assert run_result.status == 'SUCCEEDED' + + # Resurrect the run, the RequestQueue should still use same client key and thus not have multiple clients. + run_client = apify_client_async.run(run_id=run_result.id) + # Redirect logs even from the resurrected run + streamed_log = await run_client.get_streamed_log(from_start=False) + await run_client.resurrect() + async with streamed_log: + run_result = ActorRun.model_validate(await run_client.wait_for_finish(wait_secs=600)) + assert run_result.status == 'SUCCEEDED' diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 1db730a7..ed913b89 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -1,12 +1,18 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING import pytest +from crawlee import Request + from apify import Actor if TYPE_CHECKING: + from apify_client import ApifyClientAsync + from crawlee.storages import RequestQueue + from .conftest import MakeActorFunction, RunActorFunction @@ -1192,3 +1198,83 @@ async def consumer() -> int: actor = await make_actor(label='rq-performance-pattern-test', main_func=main) run_result = await run_actor(actor) assert run_result.status == 'SUCCEEDED' + + +async def test_request_queue_enhanced_metadata( + request_queue_force_cloud: RequestQueue, + apify_client_async: ApifyClientAsync, +) -> None: + """Test metadata tracking. + + Multiple clients scenarios are not guaranteed to give correct results without delay. But at least multiple clients, + single producer, should be reliable on the producer side.""" + + for i in range(1, 10): + await request_queue_force_cloud.add_request(Request.from_url(f'http://example.com/{i}')) + # Reliable information as the API response is enhanced with local metadata estimation. + assert (await request_queue_force_cloud.get_metadata()).total_request_count == i + + # Accessed with client created explicitly with `client_key=None` should appear as distinct client + api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None) + await api_client.list_head() + + # The presence of another non-producing client should not affect the metadata + for i in range(10, 20): + await request_queue_force_cloud.add_request(Request.from_url(f'http://example.com/{i}')) + # Reliable information as the API response is enhanced with local metadata estimation. + assert (await request_queue_force_cloud.get_metadata()).total_request_count == i + + +async def test_request_queue_metadata_another_client( + request_queue_force_cloud: RequestQueue, + apify_client_async: ApifyClientAsync, +) -> None: + """Test metadata tracking. The delayed metadata should be reliable even when changed by another client.""" + api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None) + await api_client.add_request(Request.from_url('http://example.com/1').model_dump(by_alias=True, exclude={'id'})) + + # Wait to be sure that the API has updated the global metadata + await asyncio.sleep(10) + + assert (await request_queue_force_cloud.get_metadata()).total_request_count == 1 + + +async def test_request_queue_had_multiple_clients( + request_queue_force_cloud: RequestQueue, + apify_client_async: ApifyClientAsync, +) -> None: + """Test that `RequestQueue` correctly detects multiple clients. + + Clients created with different `client_key` should appear as distinct clients.""" + await request_queue_force_cloud.fetch_next_request() + + # Accessed with client created explicitly with `client_key=None` should appear as distinct client + api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None) + await api_client.list_head() + + # Check that it is correctly in the RequestQueueClient metadata + assert (await request_queue_force_cloud.get_metadata()).had_multiple_clients is True + + # Check that it is correctly in the API + api_response = await api_client.get() + assert api_response + assert api_response['hadMultipleClients'] is True + + +async def test_request_queue_not_had_multiple_clients( + request_queue_force_cloud: RequestQueue, apify_client_async: ApifyClientAsync +) -> None: + """Test that same `RequestQueue` created from Actor does not act as multiple clients.""" + + # Two calls to API to create situation where different `client_key` can set `had_multiple_clients` to True + await request_queue_force_cloud.fetch_next_request() + await request_queue_force_cloud.fetch_next_request() + + # Check that it is correctly in the RequestQueueClient metadata + assert (await request_queue_force_cloud.get_metadata()).had_multiple_clients is False + + # Check that it is correctly in the API + api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id) + api_response = await api_client.get() + assert api_response + assert api_response['hadMultipleClients'] is False diff --git a/uv.lock b/uv.lock index 08f0229d..c9a882eb 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" [[package]] diff --git a/website/package-lock.json b/website/package-lock.json index 2420fdbc..230848fa 100644 --- a/website/package-lock.json +++ b/website/package-lock.json @@ -6,7 +6,7 @@ "": { "name": "apify-sdk-python", "dependencies": { - "@apify/docs-theme": "^1.0.185", + "@apify/docs-theme": "^1.0.203", "@apify/docusaurus-plugin-typedoc-api": "^4.4.6", "@docusaurus/core": "^3.8.1", "@docusaurus/faster": "^3.8.1", diff --git a/website/package.json b/website/package.json index 76e37047..3df0bf09 100644 --- a/website/package.json +++ b/website/package.json @@ -21,7 +21,7 @@ "lint:code:fix": "eslint . --fix" }, "dependencies": { - "@apify/docs-theme": "^1.0.185", + "@apify/docs-theme": "^1.0.203", "@apify/docusaurus-plugin-typedoc-api": "^4.4.6", "@docusaurus/core": "^3.8.1", "@docusaurus/faster": "^3.8.1",