From 1cc3d31fdafdfc11ce73a51a69717e28bb37142a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franti=C5=A1ek=20Nesveda?= Date: Wed, 18 Dec 2024 15:20:09 +0100 Subject: [PATCH 1/4] fix: prevent reboot loop, allow calling reboot from migrating handler, align reboot behavior with JS SDK --- src/apify/_actor.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 4f076a7a..57cc030b 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -4,6 +4,7 @@ import os import sys from datetime import timedelta +from itertools import chain from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast from lazy_object_proxy import Proxy @@ -13,7 +14,7 @@ from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value from crawlee import service_container -from crawlee.events._types import Event, EventPersistStateData +from crawlee.events._types import Event, EventMigratingData, EventPersistStateData from apify._configuration import Configuration from apify._consts import EVENT_LISTENERS_TIMEOUT @@ -48,6 +49,7 @@ class _ActorType: _apify_client: ApifyClientAsync _configuration: Configuration _is_exiting = False + _is_rebooting = False def __init__( self, @@ -839,12 +841,29 @@ async def reboot( self.log.error('Actor.reboot() is only supported when running on the Apify platform.') return + if self._is_rebooting: + self.log.debug('Actor is already rebooting, skipping the additional reboot call.') + return + + self._is_rebooting = True + if not custom_after_sleep: custom_after_sleep = self._configuration.metamorph_after_sleep - self._event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=True)) + # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish. + # We can't just emit the events and wait for all listeners to finish, + # because this method might be called from an event listener itself, and we would deadlock. + persist_state_listeners = chain.from_iterable( + (self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 + ) + migrating_listeners = chain.from_iterable( + (self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 + ) - await self._event_manager.__aexit__(None, None, None) + await asyncio.gather( + *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], + *[listener(EventMigratingData()) for listener in migrating_listeners], + ) if not self._configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') From ad882f5efd50f0bb7ea7e5e05039a55966827669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franti=C5=A1ek=20Nesveda?= Date: Thu, 19 Dec 2024 10:45:40 +0100 Subject: [PATCH 2/4] chore: Improve docs --- docs/03-concepts/04-actor-events.mdx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/03-concepts/04-actor-events.mdx b/docs/03-concepts/04-actor-events.mdx index 575c37e3..91ad0695 100644 --- a/docs/03-concepts/04-actor-events.mdx +++ b/docs/03-concepts/04-actor-events.mdx @@ -40,6 +40,8 @@ During its runtime, the Actor receives Actor events sent by the Apify platform o {' '}to another worker server soon.

You can use it to persist the state of the Actor so that once it is executed again on the new server, it doesn't have to start over from the beginning. + Once you have persisted the state of your Actor, you can call Actor.reboot() + to reboot the Actor and trigger the migration immediately, to speed up the process. From 07e576e660dcfee3978aa17ff90b39a27ec97bed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franti=C5=A1ek=20Nesveda?= Date: Thu, 19 Dec 2024 17:00:05 +0100 Subject: [PATCH 3/4] Better comment --- src/apify/_actor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 57cc030b..0a952391 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -851,6 +851,9 @@ async def reboot( custom_after_sleep = self._configuration.metamorph_after_sleep # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish. + # PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot. + # MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot. + # Typically, crawlers are listening for the MIIGRATING event to stop processing new requests. # We can't just emit the events and wait for all listeners to finish, # because this method might be called from an event listener itself, and we would deadlock. persist_state_listeners = chain.from_iterable( From 90df7fa1f935c6cd2010ae3b702b4686c3710c19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franti=C5=A1ek=20Nesveda?= Date: Thu, 19 Dec 2024 17:06:37 +0100 Subject: [PATCH 4/4] Use `more_itertools.flatten` --- poetry.lock | 4 ++-- pyproject.toml | 1 + src/apify/_actor.py | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index 11af6b24..443ee6e7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "annotated-types" @@ -3537,4 +3537,4 @@ scrapy = ["scrapy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "008371392c5d2baf886b2529e3227434280d1d37122f0fee6a19e53451682fbb" +content-hash = "5f0773c951bd13de37603ebfe9d55788c09c9b50d5c1ed2370abbafe19242e76" diff --git a/pyproject.toml b/pyproject.toml index e5638dcd..1d562d32 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ cryptography = ">=42.0.0" # https://github.com/apify/apify-sdk-python/issues/348 httpx = "~0.27.0" lazy-object-proxy = ">=1.10.0" +more_itertools = ">=10.2.0" scrapy = { version = ">=2.11.0", optional = true } typing-extensions = ">=4.1.0" websockets = ">=10.0 <14.0.0" diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 0a952391..7c84e510 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -4,10 +4,10 @@ import os import sys from datetime import timedelta -from itertools import chain from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast from lazy_object_proxy import Proxy +from more_itertools import flatten from pydantic import AliasChoices from apify_client import ApifyClientAsync @@ -856,10 +856,10 @@ async def reboot( # Typically, crawlers are listening for the MIIGRATING event to stop processing new requests. # We can't just emit the events and wait for all listeners to finish, # because this method might be called from an event listener itself, and we would deadlock. - persist_state_listeners = chain.from_iterable( + persist_state_listeners = flatten( (self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 ) - migrating_listeners = chain.from_iterable( + migrating_listeners = flatten( (self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 )