Skip to content

fix: Use same client_key for Actor created request_queue and improve its metadata estimation #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 69 additions & 71 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing_extensions import override

from apify_client import ApifyClientAsync
from crawlee._utils.crypto import crypto_random_object_id
from crawlee.storage_clients._base import RequestQueueClient
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

Expand Down Expand Up @@ -65,10 +66,7 @@ def __init__(
self,
*,
api_client: RequestQueueClientAsync,
id: str,
name: str | None,
total_request_count: int,
handled_request_count: int,
metadata: RequestQueueMetadata,
) -> None:
"""Initialize a new instance.

Expand All @@ -77,11 +75,8 @@ def __init__(
self._api_client = api_client
"""The Apify request queue client for API operations."""

self._id = id
"""The ID of the request queue."""

self._name = name
"""The name of the request queue."""
self._metadata = metadata
"""Additional data related to the RequestQueue."""

self._queue_head = deque[str]()
"""A deque to store request unique keys in the queue head."""
Expand All @@ -95,40 +90,43 @@ def __init__(
self._should_check_for_forefront_requests = False
"""Whether to check for forefront requests in the next list_head call."""

self._had_multiple_clients = False
"""Whether the request queue has been accessed by multiple clients."""

self._initial_total_count = total_request_count
"""The initial total request count (from the API) when the queue was opened."""

self._initial_handled_count = handled_request_count
"""The initial handled request count (from the API) when the queue was opened."""

self._assumed_total_count = 0
"""The number of requests we assume are in the queue (tracked manually for this instance)."""

self._assumed_handled_count = 0
"""The number of requests we assume have been handled (tracked manually for this instance)."""

self._fetch_lock = asyncio.Lock()
"""Fetch lock to minimize race conditions when communicating with API."""

async def _get_metadata_estimate(self) -> RequestQueueMetadata:
"""Try to get cached metadata first. If multiple clients, fuse with global metadata.

This method is used internally to avoid unnecessary API call unless needed (multiple clients).
Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one
client, it is the better choice.
"""
if self._metadata.had_multiple_clients:
return await self.get_metadata()
# Get local estimation (will not include changes done bo another client)
return self._metadata

@override
async def get_metadata(self) -> RequestQueueMetadata:
total_count = self._initial_total_count + self._assumed_total_count
handled_count = self._initial_handled_count + self._assumed_handled_count
pending_count = total_count - handled_count
"""Get metadata about the request queue.

Returns:
Metadata from the API, merged with local estimation, because in some cases, the data from the API can
be delayed.
"""
response = await self._api_client.get()
if response is None:
raise ValueError('Failed to fetch request queue metadata from the API.')
# Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.)
return RequestQueueMetadata(
id=self._id,
name=self._name,
total_request_count=total_count,
handled_request_count=handled_count,
pending_request_count=pending_count,
created_at=datetime.now(timezone.utc),
modified_at=datetime.now(timezone.utc),
accessed_at=datetime.now(timezone.utc),
had_multiple_clients=self._had_multiple_clients,
id=response['id'],
name=response['name'],
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count),
pending_request_count=response['pendingRequestCount'],
created_at=min(response['createdAt'], self._metadata.created_at),
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients,
)

@classmethod
Expand Down Expand Up @@ -187,27 +185,34 @@ async def open(
)
apify_rqs_client = apify_client_async.request_queues()

# If both id and name are provided, raise an error.
if id and name:
raise ValueError('Only one of "id" or "name" can be specified, not both.')

# If id is provided, get the storage by ID.
if id and name is None:
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
match (id, name):
case (None, None):
# If both id and name are None, try to get the default storage ID from environment variables.
# The default storage ID environment variable is set by the Apify platform. It also contains
# a new storage ID after Actor's reboot or migration.
id = configuration.default_request_queue_id
case (None, name):
# If only name is provided, get or create the storage by name.
id = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(name=name),
).id
case (_, None):
# If only id is provided, use it.
pass
case (_, _):
# If both id and name are provided, raise an error.
raise ValueError('Only one of "id" or "name" can be specified, not both.')
if id is None:
raise RuntimeError('Unreachable code')

# If name is provided, get or create the storage by name.
if name and id is None:
id = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(name=name),
).id
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
# It should persist across migrated or resurrected Actor runs on the Apify platform.
_api_max_client_key_length = 32
client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[
:_api_max_client_key_length
]

# If both id and name are None, try to get the default storage ID from environment variables.
# The default storage ID environment variable is set by the Apify platform. It also contains
# a new storage ID after Actor's reboot or migration.
if id is None and name is None:
id = configuration.default_request_queue_id
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)

# Fetch its metadata.
metadata = await apify_rq_client.get()
Expand All @@ -217,27 +222,18 @@ async def open(
id = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(),
).id
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)

# Verify that the storage exists by fetching its metadata again.
metadata = await apify_rq_client.get()
if metadata is None:
raise ValueError(f'Opening request queue with id={id} and name={name} failed.')

metadata_model = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(),
)

# Ensure we have a valid ID.
if id is None:
raise ValueError('Request queue ID cannot be None.')
metadata_model = RequestQueueMetadata.model_validate(metadata)

return cls(
api_client=apify_rq_client,
id=id,
name=name,
total_request_count=metadata_model.total_request_count,
handled_request_count=metadata_model.handled_request_count,
metadata=metadata_model,
)

@override
Expand Down Expand Up @@ -341,7 +337,7 @@ async def add_batch_of_requests(
if not processed_request.was_already_present and not processed_request.was_already_handled:
new_request_count += 1

self._assumed_total_count += new_request_count
self._metadata.total_request_count += new_request_count

return api_response

Expand Down Expand Up @@ -439,7 +435,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |

# Update assumed handled count if this wasn't already handled
if not processed_request.was_already_handled:
self._assumed_handled_count += 1
self._metadata.handled_request_count += 1

# Update the cache with the handled request
cache_key = request.unique_key
Expand Down Expand Up @@ -487,7 +483,7 @@ async def reclaim_request(
# If the request was previously handled, decrement our handled count since
# we're putting it back for processing.
if request.was_already_handled and not processed_request.was_already_handled:
self._assumed_handled_count -= 1
self._metadata.handled_request_count -= 1

# Update the cache
cache_key = request.unique_key
Expand Down Expand Up @@ -645,7 +641,7 @@ async def _list_head(
if cached_request and cached_request.hydrated:
items.append(cached_request.hydrated)

metadata = await self.get_metadata()
metadata = await self._get_metadata_estimate()

return RequestQueueHead(
limit=limit,
Expand All @@ -672,6 +668,8 @@ async def _list_head(

# Update the queue head cache
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
# Check if there is another client working with the RequestQueue
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)

for request_data in response.get('items', []):
request = Request.model_validate(request_data)
Expand Down
10 changes: 9 additions & 1 deletion tests/integration/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,13 @@

def generate_unique_resource_name(label: str) -> str:
"""Generates a unique resource name, which will contain the given label."""
name_template = 'python-sdk-tests-{}-generated-{}'
template_length = len(name_template.format('', ''))
api_name_limit = 63
generated_random_id_length = 8
label_length_limit = api_name_limit - template_length - generated_random_id_length

label = label.replace('_', '-')
return f'python-sdk-tests-{label}-generated-{crypto_random_object_id(8)}'
assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}'

return name_template.format(label, crypto_random_object_id(generated_random_id_length))
16 changes: 15 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import apify._actor
from ._utils import generate_unique_resource_name
from apify import Actor
from apify._models import ActorRun

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine, Iterator, Mapping
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterator, Mapping
from decimal import Decimal

from apify_client.clients.resource_clients import ActorClientAsync
from crawlee.storages import RequestQueue

_TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
_API_URL_ENV_VAR = 'APIFY_INTEGRATION_TESTS_API_URL'
Expand Down Expand Up @@ -104,6 +106,18 @@ def apify_client_async(apify_token: str) -> ApifyClientAsync:
return ApifyClientAsync(apify_token, api_url=api_url)


@pytest.fixture
async def request_queue_force_cloud(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> AsyncGenerator[RequestQueue]:
"""Create an instance of the Apify request queue on the platform and drop it when the test is finished."""
request_queue_name = generate_unique_resource_name('request_queue')
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)

async with Actor:
rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
yield rq
await rq.drop()


@pytest.fixture(scope='session')
def sdk_wheel_path(tmp_path_factory: pytest.TempPathFactory, testrun_uid: str) -> Path:
"""Build the package wheel if it hasn't been built yet, and return the path to the wheel."""
Expand Down
Loading
Loading