diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 84796e0..c04290f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -13,6 +13,7 @@ * The dispatcher offers two new parameters to control the client's call and stream timeout: - `call_timeout`: The maximum time to wait for a response from the client. - `stream_timeout`: The maximum time to wait before restarting a stream. +* While the dispatch stream restarts we refresh our dispatch cache as well, to ensure we didn't miss any updates. ## Bug Fixes diff --git a/pyproject.toml b/pyproject.toml index 80f81d5..144591d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,9 +38,11 @@ dependencies = [ # Make sure to update the version for cross-referencing also in the # mkdocs.yml file when changing the version here (look for the config key # plugins.mkdocstrings.handlers.python.import) - "frequenz-sdk >= 1.0.0-rc1900, < 1.0.0-rc2100", + "frequenz-sdk >= 1.0.0-rc2002, < 1.0.0-rc2100", "frequenz-channels >= 1.6.1, < 2.0.0", - "frequenz-client-dispatch >= 0.10.2, < 0.11.0", + "frequenz-client-dispatch >= 0.11.0, < 0.12.0", + "frequenz-client-common >= 0.3.2, < 0.4.0", + "frequenz-client-base >= 0.11.0, < 0.12.0", ] dynamic = ["version"] diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index b260b4b..0be2606 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -13,6 +13,9 @@ from frequenz.channels import Broadcast, Receiver, Sender, select from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.common.microgrid.components import ComponentCategory, ComponentId +from frequenz.client.dispatch.types import DispatchId +from frequenz.client.dispatch.types import TargetComponents as ClientTargetComponents +from frequenz.core.id import BaseId from frequenz.sdk.actor import Actor, BackgroundService from ._dispatch import Dispatch @@ -26,6 +29,18 @@ """ +class DispatchActorId(BaseId, str_prefix="DA"): + """ID for a dispatch actor.""" + + def __init__(self, dispatch_id: DispatchId | int) -> None: + """Initialize the DispatchActorId. + + Args: + dispatch_id: The ID of the dispatch this actor is associated with. + """ + super().__init__(int(dispatch_id)) + + @dataclass(frozen=True, kw_only=True) class DispatchInfo: """Event emitted when the dispatch changes.""" @@ -161,7 +176,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen [DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor] ], running_status_receiver: Receiver[Dispatch], - dispatch_identity: Callable[[Dispatch], int] | None = None, + dispatch_identity: Callable[[Dispatch], DispatchActorId] | None = None, retry_interval: timedelta = timedelta(seconds=60), ) -> None: """Initialize the dispatch handler. @@ -175,36 +190,25 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen retry_interval: How long to wait until trying to start failed actors again. """ super().__init__() - self._dispatch_identity: Callable[[Dispatch], int] = ( - dispatch_identity if dispatch_identity else lambda d: d.id + self._dispatch_identity: Callable[[Dispatch], DispatchActorId] = ( + dispatch_identity if dispatch_identity else lambda d: DispatchActorId(d.id) ) self._dispatch_rx = running_status_receiver self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift()) self._actor_factory = actor_factory - self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {} - self._failed_dispatches: dict[int, Dispatch] = {} + self._actors: dict[DispatchActorId, ActorDispatcher.ActorAndChannel] = {} + self._failed_dispatches: dict[DispatchActorId, Dispatch] = {} """Failed dispatches that will be retried later.""" def start(self) -> None: """Start the background service.""" self._tasks.add(asyncio.create_task(self._run())) - def _get_target_components_from_dispatch( - self, dispatch: Dispatch - ) -> TargetComponents: - if all(isinstance(comp, int) for comp in dispatch.target): - # We've confirmed all elements are integers, so we can cast. - int_components = cast(list[int], dispatch.target) - return [ComponentId(cid) for cid in int_components] - # If not all are ints, then it must be a list of ComponentCategory - # based on the definition of ClientTargetComponents. - return cast(list[ComponentCategory], dispatch.target) - async def _start_actor(self, dispatch: Dispatch) -> None: """Start the actor the given dispatch refers to.""" dispatch_update = DispatchInfo( - components=self._get_target_components_from_dispatch(dispatch), + components=_convert_target_components(dispatch.target), dry_run=dispatch.dry_run, options=dispatch.payload, _src=dispatch, @@ -301,3 +305,13 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: await self._start_actor(dispatch) else: await self._stop_actor(dispatch, "Dispatch stopped") + + +def _convert_target_components(target: ClientTargetComponents) -> TargetComponents: + if all(isinstance(comp, int) for comp in target): + # We've confirmed all elements are integers, so we can cast. + int_components = cast(list[int], target) + return [ComponentId(cid) for cid in int_components] + # If not all are ints, then it must be a list of ComponentCategory + # based on the definition of ClientTargetComponents. + return cast(list[ComponentCategory], target) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 5b5b00c..d1bb4e2 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -18,10 +18,18 @@ import grpc.aio from frequenz.channels import Broadcast, Receiver, select, selected_from from frequenz.channels.timer import SkipMissedAndResync, Timer +from frequenz.client.base.streaming import ( + StreamFatalError, + StreamRetrying, + StreamStarted, +) +from frequenz.client.common.microgrid import MicrogridId from frequenz.client.dispatch import DispatchApiClient -from frequenz.client.dispatch.types import Event +from frequenz.client.dispatch.types import DispatchEvent as ApiDispatchEvent +from frequenz.client.dispatch.types import DispatchId, Event from frequenz.sdk.actor import BackgroundService +from ._actor_dispatcher import DispatchActorId from ._dispatch import Dispatch from ._event import Created, Deleted, DispatchEvent, Updated @@ -33,11 +41,13 @@ class MergeStrategy(ABC): """Base class for strategies to merge running intervals.""" @abstractmethod - def identity(self, dispatch: Dispatch) -> int: + def identity(self, dispatch: Dispatch) -> DispatchActorId: """Identity function for the merge criteria.""" @abstractmethod - def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: + def filter( + self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch + ) -> bool: """Filter dispatches based on the strategy. Args: @@ -75,7 +85,7 @@ class QueueItem: to consider the start event when deciding whether to execute the stop event. """ - dispatch_id: int + dispatch_id: DispatchId dispatch: Dispatch = field(compare=False) def __init__( @@ -90,7 +100,7 @@ def __init__( # pylint: disable=too-many-arguments def __init__( self, - microgrid_id: int, + microgrid_id: MicrogridId, client: DispatchApiClient, ) -> None: """Initialize the background service. @@ -102,7 +112,7 @@ def __init__( super().__init__(name="dispatch") self._client = client - self._dispatches: dict[int, Dispatch] = {} + self._dispatches: dict[DispatchId, Dispatch] = {} self._microgrid_id = microgrid_id self._lifecycle_events_channel = Broadcast[DispatchEvent]( @@ -230,8 +240,21 @@ async def _run(self) -> None: ) as next_event_timer: # Initial fetch await self._fetch(next_event_timer) - stream = self._client.stream(microgrid_id=self._microgrid_id) + # pylint: disable-next=protected-access + streamer = self._client._get_stream(microgrid_id=self._microgrid_id) + stream = streamer.new_receiver(include_events=True) + + # We track stream start events linked to retries to avoid re-fetching + # dispatches that were already retrieved during an initial stream start. + # The initial fetch gets all dispatches, and the StreamStarted event + # isn't always reliable due to parallel receiver creation and stream + # task initiation. + # This way we get a deterministic behavior where we only fetch + # dispatches once initially and then only when the stream is restarted. + is_retry_attempt = False + + # Streaming updates async for selected in select(next_event_timer, stream): if selected_from(selected, next_event_timer): if not self._scheduled_events: @@ -240,36 +263,54 @@ async def _run(self) -> None: heappop(self._scheduled_events).dispatch, next_event_timer ) elif selected_from(selected, stream): - _logger.debug("Received dispatch event: %s", selected.message) - dispatch = Dispatch(selected.message.dispatch) - match selected.message.event: - case Event.CREATED: - self._dispatches[dispatch.id] = dispatch - await self._update_dispatch_schedule_and_notify( - dispatch, None, next_event_timer - ) - await self._lifecycle_events_tx.send( - Created(dispatch=dispatch) - ) - case Event.UPDATED: - await self._update_dispatch_schedule_and_notify( - dispatch, - self._dispatches[dispatch.id], - next_event_timer, - ) - self._dispatches[dispatch.id] = dispatch - await self._lifecycle_events_tx.send( - Updated(dispatch=dispatch) - ) - case Event.DELETED: - self._dispatches.pop(dispatch.id) - await self._update_dispatch_schedule_and_notify( - None, dispatch, next_event_timer - ) - - await self._lifecycle_events_tx.send( - Deleted(dispatch=dispatch) + match selected.message: + case ApiDispatchEvent(): + _logger.debug( + "Received dispatch event: %s", selected.message ) + dispatch = Dispatch(selected.message.dispatch) + match selected.message.event: + case Event.CREATED: + self._dispatches[dispatch.id] = dispatch + await self._update_dispatch_schedule_and_notify( + dispatch, None, next_event_timer + ) + await self._lifecycle_events_tx.send( + Created(dispatch=dispatch) + ) + case Event.UPDATED: + await self._update_dispatch_schedule_and_notify( + dispatch, + self._dispatches[dispatch.id], + next_event_timer, + ) + self._dispatches[dispatch.id] = dispatch + await self._lifecycle_events_tx.send( + Updated(dispatch=dispatch) + ) + case Event.DELETED: + self._dispatches.pop(dispatch.id) + await self._update_dispatch_schedule_and_notify( + None, dispatch, next_event_timer + ) + + await self._lifecycle_events_tx.send( + Deleted(dispatch=dispatch) + ) + + case StreamRetrying(): + is_retry_attempt = True + + case StreamStarted(): + if is_retry_attempt: + _logger.info( + "Dispatch stream restarted, getting dispatches" + ) + await self._fetch(next_event_timer) + is_retry_attempt = False + + case StreamFatalError(): + pass async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None: """Execute a scheduled event. diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index d4eca69..5180631 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -12,11 +12,12 @@ from typing import Awaitable, Callable, Self from frequenz.channels import Receiver +from frequenz.client.common.microgrid import MicrogridId from frequenz.client.dispatch import DispatchApiClient from frequenz.sdk.actor import Actor, BackgroundService from typing_extensions import override -from ._actor_dispatcher import ActorDispatcher, DispatchInfo +from ._actor_dispatcher import ActorDispatcher, DispatchActorId, DispatchInfo from ._bg_service import DispatchScheduler, MergeStrategy from ._dispatch import Dispatch from ._event import DispatchEvent @@ -202,7 +203,7 @@ async def run(): def __init__( self, *, - microgrid_id: int, + microgrid_id: MicrogridId, server_url: str, key: str, call_timeout: timedelta = timedelta(seconds=60), @@ -328,8 +329,8 @@ async def start_managing( self._empty_event.clear() - def id_identity(dispatch: Dispatch) -> int: - return dispatch.id + def id_identity(dispatch: Dispatch) -> DispatchActorId: + return DispatchActorId(dispatch.id) dispatcher = ActorDispatcher( actor_factory=actor_factory, diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index e222b0a..0a95598 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -5,23 +5,34 @@ import logging from collections.abc import Mapping +from sys import maxsize +from typing import Any +from frequenz.client.dispatch.types import DispatchId from typing_extensions import override +from ._actor_dispatcher import DispatchActorId from ._bg_service import MergeStrategy from ._dispatch import Dispatch +def _hash_positive(args: Any) -> int: + """Make a positive hash.""" + return hash(args) + maxsize + 1 + + class MergeByType(MergeStrategy): """Merge running intervals based on the dispatch type.""" @override - def identity(self, dispatch: Dispatch) -> int: + def identity(self, dispatch: Dispatch) -> DispatchActorId: """Identity function for the merge criteria.""" - return hash(dispatch.type) + return DispatchActorId(_hash_positive(dispatch.type)) @override - def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: + def filter( + self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch + ) -> bool: """Filter dispatches based on the merge strategy. Keeps start events. @@ -53,6 +64,6 @@ class MergeByTypeTarget(MergeByType): """Merge running intervals based on the dispatch type and target.""" @override - def identity(self, dispatch: Dispatch) -> int: + def identity(self, dispatch: Dispatch) -> DispatchActorId: """Identity function for the merge criteria.""" - return hash((dispatch.type, tuple(dispatch.target))) + return DispatchActorId(_hash_positive((dispatch.type, tuple(dispatch.target)))) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index c96b584..c63049a 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -13,10 +13,12 @@ import pytest import time_machine from frequenz.channels import Receiver +from frequenz.client.common.microgrid import MicrogridId from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule from frequenz.client.dispatch.test.client import FakeClient, to_create_params from frequenz.client.dispatch.test.generator import DispatchGenerator from frequenz.client.dispatch.types import Dispatch as BaseDispatch +from frequenz.client.dispatch.types import TargetIds from pytest import fixture from frequenz.dispatch import ( @@ -66,14 +68,14 @@ class _TestEnv: """The receiver for ready dispatches.""" client: FakeClient """The fake client for the actor.""" - microgrid_id: int + microgrid_id: MicrogridId """The microgrid id.""" @fixture async def test_env() -> AsyncIterator[_TestEnv]: """Return an actor test environment.""" - microgrid_id = randint(1, 100) + microgrid_id = MicrogridId(randint(1, 100)) client = FakeClient() service = DispatchScheduler( @@ -156,6 +158,7 @@ async def _test_new_dispatch_created( received = Dispatch(update_dispatch(sample, dispatch)) assert dispatch == received + await asyncio.sleep(1) return dispatch @@ -473,11 +476,10 @@ async def test_dispatch_new_but_finished( ) fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) # Process the lifecycle event caused by the old dispatch at startup await test_env.lifecycle_events.receive() - await asyncio.sleep(1) - # Create another dispatch the normal way new_dispatch = generator.generate_dispatch() new_dispatch = replace( @@ -549,7 +551,7 @@ async def test_multiple_dispatches_merge_running_intervals( merge_strategy: MergeStrategy, ) -> None: """Test that multiple dispatches are merged into a single running interval.""" - microgrid_id = randint(1, 100) + microgrid_id = MicrogridId(randint(1, 100)) client = FakeClient() service = DispatchScheduler( microgrid_id=microgrid_id, @@ -566,7 +568,9 @@ async def test_multiple_dispatches_merge_running_intervals( generator.generate_dispatch(), active=True, duration=timedelta(seconds=30), - target=[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4], + target=TargetIds( + *[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4] + ), start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -575,7 +579,7 @@ async def test_multiple_dispatches_merge_running_intervals( generator.generate_dispatch(), active=True, duration=timedelta(seconds=10), - target=[3, 4], + target=TargetIds(3, 4), start_time=_now() + timedelta(seconds=10), # starts after dispatch1 recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -629,7 +633,7 @@ async def test_multiple_dispatches_sequential_intervals_merge( Even if dispatches don't overlap but are consecutive, merge_running_intervals=TPYE should treat them as continuous if any event tries to stop. """ - microgrid_id = randint(1, 100) + microgrid_id = MicrogridId(randint(1, 100)) client = FakeClient() service = DispatchScheduler(microgrid_id=microgrid_id, client=client) service.start() @@ -643,7 +647,9 @@ async def test_multiple_dispatches_sequential_intervals_merge( active=True, duration=timedelta(seconds=5), # If merging by type, we want to test having different targets in dispatch 1 and 2 - target=[3, 4] if isinstance(merge_strategy, MergeByType) else [1, 2], + target=TargetIds( + *[3, 4] if isinstance(merge_strategy, MergeByType) else [1, 2] + ), start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -653,7 +659,7 @@ async def test_multiple_dispatches_sequential_intervals_merge( generator.generate_dispatch(), active=True, duration=timedelta(seconds=5), - target=[1, 2], + target=TargetIds(1, 2), start_time=dispatch1.start_time + dispatch1.duration, recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -693,7 +699,7 @@ async def test_at_least_one_running_filter( merge_strategy: MergeStrategy, ) -> None: """Test scenarios directly tied to the _at_least_one_running logic.""" - microgrid_id = randint(1, 100) + microgrid_id = MicrogridId(randint(1, 100)) client = FakeClient() service = DispatchScheduler(microgrid_id=microgrid_id, client=client) service.start() @@ -708,7 +714,9 @@ async def test_at_least_one_running_filter( generator.generate_dispatch(), active=True, duration=timedelta(seconds=10), - target=[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4], + target=TargetIds( + *[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4] + ), start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -739,7 +747,7 @@ async def test_at_least_one_running_filter( generator.generate_dispatch(), active=False, duration=timedelta(seconds=10), - target=[3, 4], + target=TargetIds(3, 4), start_time=_now() + timedelta(seconds=50), recurrence=RecurrenceRule(), type="TEST_TYPE", diff --git a/tests/test_managing_actor.py b/tests/test_managing_actor.py index baca83c..024d81c 100644 --- a/tests/test_managing_actor.py +++ b/tests/test_managing_actor.py @@ -14,11 +14,13 @@ import pytest import time_machine from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.dispatch import recurrence from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule from frequenz.client.dispatch.test.client import FakeClient from frequenz.client.dispatch.test.generator import DispatchGenerator +from frequenz.client.dispatch.types import DispatchId, TargetIds from frequenz.sdk.actor import Actor from pytest import fixture @@ -31,6 +33,10 @@ MergeByTypeTarget, MergeStrategy, ) +from frequenz.dispatch._actor_dispatcher import ( + DispatchActorId, + _convert_target_components, +) from frequenz.dispatch._bg_service import DispatchScheduler @@ -101,8 +107,11 @@ class _TestEnv: running_status_sender: Sender[Dispatch] generator: DispatchGenerator = DispatchGenerator() - def actor(self, identity: int) -> MockActor: + def actor(self, identity: DispatchActorId | int) -> MockActor: """Return the actor.""" + if isinstance(identity, int): + identity = DispatchActorId(identity) + # pylint: disable=protected-access assert identity in self.actors_service._actors return cast(MockActor, self.actors_service._actors[identity].actor) @@ -111,7 +120,7 @@ def actor(self, identity: int) -> MockActor: def is_running(self, identity: int) -> bool: """Return whether the actor is running.""" # pylint: disable-next=protected-access - if identity not in self.actors_service._actors: + if DispatchActorId(identity) not in self.actors_service._actors: return False return self.actor(identity).is_running @@ -125,7 +134,7 @@ async def test_env() -> AsyncIterator[_TestEnv]: actors_service = ActorDispatcher( actor_factory=MockActor.create, running_status_receiver=channel.new_receiver(), - dispatch_identity=lambda dispatch: dispatch.id, + dispatch_identity=lambda dispatch: DispatchActorId(dispatch.id), ) actors_service.start() @@ -149,11 +158,11 @@ async def test_simple_start_stop( dispatch = test_env.generator.generate_dispatch() dispatch = replace( dispatch, - id=1, + id=DispatchId(1), active=True, dry_run=False, duration=duration, - target=[1, 10, 15], + target=TargetIds(1, 10, 15), start_time=now, payload={"test": True}, type="UNIT_TEST", @@ -199,7 +208,7 @@ async def test_start_failed( dispatch = test_env.generator.generate_dispatch() dispatch = replace( dispatch, - id=1, + id=DispatchId(1), active=True, dry_run=False, duration=duration, @@ -282,7 +291,7 @@ async def test_dry_run(test_env: _TestEnv, fake_time: time_machine.Coordinates) dispatch = test_env.generator.generate_dispatch() dispatch = replace( dispatch, - id=1, + id=DispatchId(1), dry_run=True, active=True, start_time=_now(), @@ -301,7 +310,7 @@ async def test_dry_run(test_env: _TestEnv, fake_time: time_machine.Coordinates) event = test_env.actor(1).initial_dispatch assert event.dry_run is dispatch.dry_run - assert event.components == dispatch.target + assert event.components == _convert_target_components(dispatch.target) assert event.options == dispatch.payload assert test_env.actor(1).is_running is True @@ -320,8 +329,8 @@ async def test_manage_abstraction( strategy: MergeStrategy | None, ) -> None: """Test Dispatcher.start_managing sets up correctly.""" - identity: Callable[[Dispatch], int] = ( - strategy.identity if strategy else lambda dispatch: dispatch.id + identity: Callable[[Dispatch], DispatchActorId] = ( + strategy.identity if strategy else lambda dispatch: DispatchActorId(dispatch.id) ) class MyFakeClient(FakeClient): @@ -341,7 +350,7 @@ def __init__( assert stream_timeout super().__init__() - mid = 1 + mid = MicrogridId(1) # Patch `Client` class in Dispatcher with MyFakeClient with patch("frequenz.dispatch._dispatcher.DispatchApiClient", MyFakeClient): @@ -416,7 +425,7 @@ async def test_actor_dispatcher_update_isolation( # Create first dispatch dispatch1_spec = replace( test_env.generator.generate_dispatch(), - id=101, # Unique ID + id=DispatchId(101), # Unique ID type=dispatch_type, active=True, dry_run=False, @@ -430,7 +439,7 @@ async def test_actor_dispatcher_update_isolation( # Create second dispatch of the same type, different ID dispatch2_spec = replace( test_env.generator.generate_dispatch(), - id=102, # Unique ID + id=DispatchId(102), # Unique ID type=dispatch_type, # Same type active=True, dry_run=False, @@ -451,7 +460,7 @@ async def test_actor_dispatcher_update_isolation( actor1 = test_env.actor(101) assert actor1 is not None # pylint: disable-next=protected-access - assert actor1.initial_dispatch._src.id == 101 + assert actor1.initial_dispatch._src.id == DispatchId(101) assert actor1.initial_dispatch.options == {"instance": 1} assert not test_env.is_running(102), "Actor 2 should not be running yet" @@ -466,7 +475,7 @@ async def test_actor_dispatcher_update_isolation( actor2 = test_env.actor(102) assert actor2 is not None # pylint: disable-next=protected-access - assert actor2.initial_dispatch._src.id == 102 + assert actor2.initial_dispatch._src.id == DispatchId(102) assert actor2.initial_dispatch.options == {"instance": 2} # Now, send an update to stop dispatch 1