-
Notifications
You must be signed in to change notification settings - Fork 13
feat: Use same client_key
for Actor
created request_queue
and improve its metadata estimation
#552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Seems to be some problem on the platform?
Platform acknowledged it is a bug
As of now, the In develop branch: https://github.com/apify/apify-worker/pull/1470 |
@pytest.fixture(autouse=True) | ||
def set_token(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> None: | ||
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this now? How did it work before without it?
async def test_request_queue_enhanced_metadata( | ||
request_queue_force_cloud: RequestQueue, | ||
apify_client_async: ApifyClientAsync, | ||
) -> None: | ||
"""Test metadata tracking. | ||
|
||
Multiple clients scenarios are not guaranteed to give correct results without delay. But at least multiple clients, | ||
single producer, should be reliable on the producer side.""" | ||
|
||
for i in range(1, 10): | ||
await request_queue_force_cloud.add_request(Request.from_url(f'http://example.com/{i}')) | ||
# Reliable information as the API response is enhanced with local metadata estimation. | ||
assert (await request_queue_force_cloud.get_metadata()).total_request_count == i | ||
|
||
# Accessed with client created explicitly with `client_key=None` should appear as distinct client | ||
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None) | ||
await api_client.list_head() | ||
|
||
# The presence of another non-producing client should not affect the metadata | ||
for i in range(10, 20): | ||
await request_queue_force_cloud.add_request(Request.from_url(f'http://example.com/{i}')) | ||
# Reliable information as the API response is enhanced with local metadata estimation. | ||
assert (await request_queue_force_cloud.get_metadata()).total_request_count == i | ||
|
||
|
||
async def test_request_queue_metadata_another_client( | ||
request_queue_force_cloud: RequestQueue, | ||
apify_client_async: ApifyClientAsync, | ||
) -> None: | ||
"""Test metadata tracking. The delayed metadata should be reliable even when changed by another client.""" | ||
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None) | ||
await api_client.add_request(Request.from_url('http://example.com/1').model_dump(by_alias=True, exclude={'id'})) | ||
|
||
# Wait to be sure that the API has updated the global metadata | ||
await asyncio.sleep(10) | ||
|
||
assert (await request_queue_force_cloud.get_metadata()).total_request_count == 1 | ||
|
||
|
||
async def test_request_queue_had_multiple_clients_local( | ||
request_queue_force_cloud: RequestQueue, | ||
apify_client_async: ApifyClientAsync, | ||
) -> None: | ||
"""Test that `RequestQueue` correctly detects multiple clients. | ||
|
||
Clients created with different `client_key` should appear as distinct clients.""" | ||
await request_queue_force_cloud.fetch_next_request() | ||
|
||
# Accessed with client created explicitly with `client_key=None` should appear as distinct client | ||
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None) | ||
await api_client.list_head() | ||
|
||
# Check that it is correctly in the RequestQueueClient metadata | ||
assert (await request_queue_force_cloud.get_metadata()).had_multiple_clients is True | ||
|
||
# Check that it is correctly in the API | ||
api_response = await api_client.get() | ||
assert api_response | ||
assert api_response['hadMultipleClients'] is True | ||
|
||
|
||
async def test_request_queue_not_had_multiple_clients_local( | ||
request_queue_force_cloud: RequestQueue, apify_client_async: ApifyClientAsync | ||
) -> None: | ||
"""Test that same `RequestQueue` created from Actor does not act as multiple clients.""" | ||
|
||
# Two calls to API to create situation where different `client_key` can set `had_multiple_clients` to True | ||
await request_queue_force_cloud.fetch_next_request() | ||
await request_queue_force_cloud.fetch_next_request() | ||
|
||
# Check that it is correctly in the RequestQueueClient metadata | ||
assert (await request_queue_force_cloud.get_metadata()).had_multiple_clients is False | ||
|
||
# Check that it is correctly in the API | ||
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id) | ||
api_response = await api_client.get() | ||
assert api_response | ||
assert api_response['hadMultipleClients'] is False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be here? We have test_actor_request_queue.py
and test_request_queue.py
btw.
async def _get_metadata(self) -> RequestQueueMetadata: | ||
"""Try to get cached metadata first. If multiple clients, fuse with global metadata.""" | ||
if self._metadata.had_multiple_clients: | ||
return await self.get_metadata() | ||
# Get local estimation (will not include changes done bo another client) | ||
return self._metadata | ||
|
||
@override | ||
async def get_metadata(self) -> RequestQueueMetadata: | ||
total_count = self._initial_total_count + self._assumed_total_count | ||
handled_count = self._initial_handled_count + self._assumed_handled_count | ||
pending_count = total_count - handled_count | ||
|
||
"""Get metadata about the request queue.""" | ||
response = await self._api_client.get() | ||
if response is None: | ||
raise ValueError('Failed to fetch request queue metadata from the API.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be better explained why we have _get_metadata
and get_metadata
now.
Description
RequestQueue
from Actor on the platform or withforce_cloud=True
theclient_key
should be set torun_id
. This ensures:RequestQueue
instance that is usingApifyRequestQueueClient
will be done with the sameclient_key
and thus in metadatahad_multiple_clients=False
Actor.open_request_queue()
on the platform share the sameclient_key
and thus in metadatahad_multiple_clients=False
client_key
is set torun_id
, it remains the same for resurrected or migrated run, and thus in metadatahad_multiple_clients=False
had_multiple_clients
allows better estimation ofRequestQueue
metadata.had_multiple_clients=False
, it is possible to trust local estimation of the metadata.had_multiple_clients=True
, local estimation is no longer valid, but still can, in some cases, improve estimation by being ahead of the delayed API update of the metadata. Therefore API-based metadata are fused with local metadata estimation to produce as good estimation as we can.ApifyRequestQueueClient
init changed to properly initialize from full metadata - to enable more reliable metadata after migration/resurrection or when using existingRequestQueue
_list_head
, if there is a call to API, use the availablehad_multiple_clients
to update local estimation of this value. This is a cheap way of knowing if there is another client or not without the need to make a new API call._list_head
is called frequently enough to make the local estimation ofhad_multiple_clients
decently good.Issues
client_key
for all ApifyRequestQueue client calls #536Testing