diff --git a/README.md b/README.md index 870cbfe..2e9ac8a 100644 --- a/README.md +++ b/README.md @@ -39,12 +39,9 @@ async def run(): actor = MagicMock() # replace with your actor - changed_running_status_rx = dispatcher.running_status_change.new_receiver() + changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE") async for dispatch in changed_running_status_rx: - if dispatch.type != "MY_TYPE": - continue - if dispatch.started: print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") if actor.is_running: diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index a9fb33e..86ebc24 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,8 +6,9 @@ ## Upgrading -* The method `Dispatch.running(type: str)` was replaced with the property `Dispatch.started: bool`. -* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1500 +* Two properties have been replaced by methods that require a type as parameter. + * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, type: str)`. + * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, type: str)`. ## New Features @@ -15,4 +16,4 @@ ## Bug Fixes -* Fixed a crash when reading a Dispatch with frequency YEARLY. + diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 0dee73a..b2d25d4 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -16,7 +16,7 @@ """ from ._dispatch import Dispatch -from ._dispatcher import Dispatcher, ReceiverFetcher +from ._dispatcher import Dispatcher from ._event import Created, Deleted, DispatchEvent, Updated from ._managing_actor import DispatchManagingActor, DispatchUpdate @@ -25,7 +25,6 @@ "Deleted", "DispatchEvent", "Dispatcher", - "ReceiverFetcher", "Updated", "Dispatch", "DispatchManagingActor", diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/_bg_service.py similarity index 78% rename from src/frequenz/dispatch/actor.py rename to src/frequenz/dispatch/_bg_service.py index 7bd6e49..5ee058d 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -1,19 +1,20 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""The dispatch actor.""" +"""The dispatch background service.""" +import asyncio import logging from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from heapq import heappop, heappush import grpc.aio -from frequenz.channels import Sender, select, selected_from +from frequenz.channels import Broadcast, Receiver, select, selected_from from frequenz.channels.timer import SkipMissedAndResync, Timer from frequenz.client.dispatch import Client from frequenz.client.dispatch.types import Event -from frequenz.sdk.actor import Actor +from frequenz.sdk.actor import BackgroundService from ._dispatch import Dispatch from ._event import Created, Deleted, DispatchEvent, Updated @@ -22,13 +23,12 @@ """The logger for this module.""" -class DispatchingActor(Actor): - """Dispatch actor. +# pylint: disable=too-many-instance-attributes +class DispatchScheduler(BackgroundService): + """Dispatch background service. - This actor is responsible for handling dispatches for a microgrid. - - This means staying in sync with the API and scheduling - dispatches as necessary. + This service is responsible for managing dispatches and scheduling them + based on their start and stop times. """ @dataclass(order=True) @@ -50,24 +50,28 @@ def __init__( self, microgrid_id: int, client: Client, - lifecycle_updates_sender: Sender[DispatchEvent], - running_state_change_sender: Sender[Dispatch], ) -> None: - """Initialize the actor. + """Initialize the background service. Args: microgrid_id: The microgrid ID to handle dispatches for. client: The client to use for fetching dispatches. - lifecycle_updates_sender: A sender for dispatch lifecycle events. - running_state_change_sender: A sender for dispatch running state changes. """ super().__init__(name="dispatch") self._client = client self._dispatches: dict[int, Dispatch] = {} self._microgrid_id = microgrid_id - self._lifecycle_updates_sender = lifecycle_updates_sender - self._running_state_change_sender = running_state_change_sender + + self._lifecycle_events_channel = Broadcast[DispatchEvent]( + name="lifecycle_events" + ) + self._lifecycle_events_tx = self._lifecycle_events_channel.new_sender() + self._running_state_status_channel = Broadcast[Dispatch]( + name="running_state_status" + ) + + self._running_state_status_tx = self._running_state_status_channel.new_sender() self._next_event_timer = Timer( timedelta(seconds=100), SkipMissedAndResync(), auto_start=False ) @@ -76,7 +80,7 @@ def __init__( Interval is chosen arbitrarily, as it will be reset on the first event. """ - self._scheduled_events: list["DispatchingActor.QueueItem"] = [] + self._scheduled_events: list["DispatchScheduler.QueueItem"] = [] """The scheduled events, sorted by time. Each event is a tuple of the scheduled time and the dispatch. @@ -84,9 +88,57 @@ def __init__( always at index 0. """ + # pylint: disable=redefined-builtin + def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]: + """Create a new receiver for lifecycle events. + + Args: + type: The type of events to receive. + + Returns: + A new receiver for lifecycle events. + """ + return self._lifecycle_events_channel.new_receiver().filter( + lambda event: event.dispatch.type == type + ) + + async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]: + """Create a new receiver for running state events. + + Args: + type: The type of events to receive. + + Returns: + A new receiver for running state status. + """ + # Find all matching dispatches based on the type and collect them + dispatches = [ + dispatch for dispatch in self._dispatches.values() if dispatch.type == type + ] + + # Create receiver with enough capacity to hold all matching dispatches + receiver = self._running_state_status_channel.new_receiver( + limit=max(1, len(dispatches)) + ).filter(lambda dispatch: dispatch.type == type) + + # Send all matching dispatches to the receiver + for dispatch in dispatches: + await self._send_running_state_change(dispatch) + + return receiver + + # pylint: enable=redefined-builtin + + def start(self) -> None: + """Start the background service.""" + self._tasks.add(asyncio.create_task(self._run())) + async def _run(self) -> None: - """Run the actor.""" - _logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id) + """Run the background service.""" + _logger.info( + "Starting dispatching background service for microgrid %s", + self._microgrid_id, + ) # Initial fetch await self._fetch() @@ -111,24 +163,18 @@ async def _run(self) -> None: case Event.CREATED: self._dispatches[dispatch.id] = dispatch await self._update_dispatch_schedule_and_notify(dispatch, None) - await self._lifecycle_updates_sender.send( - Created(dispatch=dispatch) - ) + await self._lifecycle_events_tx.send(Created(dispatch=dispatch)) case Event.UPDATED: await self._update_dispatch_schedule_and_notify( dispatch, self._dispatches[dispatch.id] ) self._dispatches[dispatch.id] = dispatch - await self._lifecycle_updates_sender.send( - Updated(dispatch=dispatch) - ) + await self._lifecycle_events_tx.send(Updated(dispatch=dispatch)) case Event.DELETED: self._dispatches.pop(dispatch.id) await self._update_dispatch_schedule_and_notify(None, dispatch) - await self._lifecycle_updates_sender.send( - Deleted(dispatch=dispatch) - ) + await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: """Execute a scheduled event. @@ -170,17 +216,13 @@ async def _fetch(self) -> None: if not old_dispatch: _logger.info("New dispatch: %s", dispatch) await self._update_dispatch_schedule_and_notify(dispatch, None) - await self._lifecycle_updates_sender.send( - Created(dispatch=dispatch) - ) + await self._lifecycle_events_tx.send(Created(dispatch=dispatch)) elif dispatch.update_time != old_dispatch.update_time: _logger.info("Updated dispatch: %s", dispatch) await self._update_dispatch_schedule_and_notify( dispatch, old_dispatch ) - await self._lifecycle_updates_sender.send( - Updated(dispatch=dispatch) - ) + await self._lifecycle_events_tx.send(Updated(dispatch=dispatch)) except grpc.aio.AioRpcError as error: _logger.error("Error fetching dispatches: %s", error) @@ -189,13 +231,13 @@ async def _fetch(self) -> None: for dispatch in old_dispatches.values(): _logger.info("Deleted dispatch: %s", dispatch) - await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) + await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) await self._update_dispatch_schedule_and_notify(None, dispatch) # Set deleted only here as it influences the result of dispatch.started # which is used in above in _running_state_change dispatch._set_deleted() # pylint: disable=protected-access - await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) + await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) async def _update_dispatch_schedule_and_notify( self, dispatch: Dispatch | None, old_dispatch: Dispatch | None @@ -359,4 +401,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None: Args: dispatch: The dispatch that changed. """ - await self._running_state_change_sender.send(dispatch) + await self._running_state_status_tx.send(dispatch) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index c0c8a60..1c7baa1 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -3,36 +3,13 @@ """A highlevel interface for the dispatch API.""" -import abc -from typing import Protocol, TypeVar -from frequenz.channels import Broadcast, Receiver +from frequenz.channels import Receiver from frequenz.client.dispatch import Client +from ._bg_service import DispatchScheduler from ._dispatch import Dispatch from ._event import DispatchEvent -from .actor import DispatchingActor - -ReceivedT_co = TypeVar("ReceivedT_co", covariant=True) -"""The type being received.""" - - -class ReceiverFetcher(Protocol[ReceivedT_co]): - """An interface that just exposes a `new_receiver` method.""" - - @abc.abstractmethod - def new_receiver( - self, *, name: str | None = None, limit: int = 50 - ) -> Receiver[ReceivedT_co]: - """Get a receiver from the channel. - - Args: - name: A name to identify the receiver in the logs. - limit: The maximum size of the receiver. - - Returns: - A receiver instance. - """ class Dispatcher: @@ -72,12 +49,9 @@ async def run(): actor = MagicMock() # replace with your actor - changed_running_status = dispatcher.running_status_change.new_receiver() + changed_running_status = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE") async for dispatch in changed_running_status: - if dispatch.type != "YOUR_DISPATCH_TYPE": - continue - if dispatch.started: print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") if actor.is_running: @@ -120,7 +94,7 @@ async def run(): ) await dispatcher.start() # this will start the actor - events_receiver = dispatcher.lifecycle_events.new_receiver() + events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE") async for event in events_receiver: match event: @@ -197,39 +171,38 @@ def __init__( server_url: The URL of the dispatch service. key: The key to access the service. """ - self._running_state_channel = Broadcast[Dispatch](name="running_state_change") - self._lifecycle_events_channel = Broadcast[DispatchEvent]( - name="lifecycle_events" - ) self._client = Client(server_url=server_url, key=key) - self._actor = DispatchingActor( + self._bg_service = DispatchScheduler( microgrid_id, self._client, - self._lifecycle_events_channel.new_sender(), - self._running_state_channel.new_sender(), ) async def start(self) -> None: - """Start the actor.""" - self._actor.start() + """Start the local dispatch service.""" + self._bg_service.start() @property def client(self) -> Client: """Return the client.""" return self._client - @property - def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]: - """Return new, updated or deleted dispatches receiver fetcher. + def new_lifecycle_events_receiver( + self, dispatch_type: str + ) -> Receiver[DispatchEvent]: + """Return new, updated or deleted dispatches receiver. + + Args: + dispatch_type: The type of the dispatch to listen for. Returns: A new receiver for new dispatches. """ - return self._lifecycle_events_channel + return self._bg_service.new_lifecycle_events_receiver(dispatch_type) - @property - def running_status_change(self) -> ReceiverFetcher[Dispatch]: - """Return running status change receiver fetcher. + async def new_running_state_event_receiver( + self, dispatch_type: str + ) -> Receiver[Dispatch]: + """Return running state event receiver. This receiver will receive a message whenever the current running status of a dispatch changes. @@ -242,7 +215,7 @@ def running_status_change(self) -> ReceiverFetcher[Dispatch]: then a message will be sent. In other words: Any change that is expected to make an actor start, stop - or reconfigure itself with new parameters causes a message to be + or adjust itself according to new dispatch options causes a message to be sent. A non-exhaustive list of possible changes that will cause a message to be sent: @@ -255,7 +228,10 @@ def running_status_change(self) -> ReceiverFetcher[Dispatch]: - The payload changed - The dispatch was deleted + Args: + dispatch_type: The type of the dispatch to listen for. + Returns: A new receiver for dispatches whose running status changed. """ - return self._running_state_channel + return await self._bg_service.new_running_state_event_receiver(dispatch_type) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 1406121..dc87b64 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -11,7 +11,7 @@ import async_solipsism import time_machine -from frequenz.channels import Broadcast, Receiver +from frequenz.channels import Receiver from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule from frequenz.client.dispatch.test.client import FakeClient, to_create_params from frequenz.client.dispatch.test.generator import DispatchGenerator @@ -19,7 +19,7 @@ from pytest import fixture from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated -from frequenz.dispatch.actor import DispatchingActor +from frequenz.dispatch._bg_service import DispatchScheduler @fixture @@ -45,12 +45,12 @@ def _now() -> datetime: @dataclass(frozen=True) -class ActorTestEnv: - """Test environment for the actor.""" +class TestEnv: + """Test environment for the service.""" - actor: DispatchingActor + service: DispatchScheduler """The actor under test.""" - updated_dispatches: Receiver[DispatchEvent] + lifecycle_events: Receiver[DispatchEvent] """The receiver for updated dispatches.""" running_state_change: Receiver[Dispatch] """The receiver for ready dispatches.""" @@ -61,31 +61,29 @@ class ActorTestEnv: @fixture -async def actor_env() -> AsyncIterator[ActorTestEnv]: +async def test_env() -> AsyncIterator[TestEnv]: """Return an actor test environment.""" - lifecycle_updates_dispatches = Broadcast[DispatchEvent](name="lifecycle_updates") - running_state_change_dispatches = Broadcast[Dispatch](name="running_state_change") microgrid_id = randint(1, 100) client = FakeClient() - actor = DispatchingActor( + service = DispatchScheduler( microgrid_id=microgrid_id, - lifecycle_updates_sender=lifecycle_updates_dispatches.new_sender(), - running_state_change_sender=running_state_change_dispatches.new_sender(), client=client, ) - actor.start() + service.start() try: - yield ActorTestEnv( - actor=actor, - updated_dispatches=lifecycle_updates_dispatches.new_receiver(), - running_state_change=running_state_change_dispatches.new_receiver(), + yield TestEnv( + service=service, + lifecycle_events=service.new_lifecycle_events_receiver("TEST_TYPE"), + running_state_change=await service.new_running_state_event_receiver( + "TEST_TYPE" + ), client=client, microgrid_id=microgrid_id, ) finally: - await actor.stop() + await service.stop() @fixture @@ -95,13 +93,13 @@ def generator() -> DispatchGenerator: async def test_new_dispatch_created( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, ) -> None: """Test that a new dispatch is created.""" sample = generator.generate_dispatch() - await _test_new_dispatch_created(actor_env, sample) + await _test_new_dispatch_created(test_env, sample) def update_dispatch(sample: BaseDispatch, dispatch: BaseDispatch) -> BaseDispatch: @@ -123,21 +121,22 @@ def update_dispatch(sample: BaseDispatch, dispatch: BaseDispatch) -> BaseDispatc async def _test_new_dispatch_created( - actor_env: ActorTestEnv, + test_env: TestEnv, sample: BaseDispatch, ) -> Dispatch: """Test that a new dispatch is created. Args: - actor_env: The actor environment + test_env: The actor environment sample: The sample dispatch to create Returns: The sample dispatch that was created """ - await actor_env.client.create(**to_create_params(actor_env.microgrid_id, sample)) + sample = replace(sample, type="TEST_TYPE") + await test_env.client.create(**to_create_params(test_env.microgrid_id, sample)) - dispatch_event = await actor_env.updated_dispatches.receive() + dispatch_event = await test_env.lifecycle_events.receive() match dispatch_event: case Deleted(dispatch) | Updated(dispatch): @@ -150,7 +149,7 @@ async def _test_new_dispatch_created( async def test_existing_dispatch_updated( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: @@ -164,11 +163,11 @@ async def test_existing_dispatch_updated( fake_time.shift(timedelta(seconds=1)) - sample = await _test_new_dispatch_created(actor_env, sample) + sample = await _test_new_dispatch_created(test_env, sample) fake_time.shift(timedelta(seconds=1)) - updated = await actor_env.client.update( - microgrid_id=actor_env.microgrid_id, + updated = await test_env.client.update( + microgrid_id=test_env.microgrid_id, dispatch_id=sample.id, new_fields={ "active": True, @@ -177,7 +176,7 @@ async def test_existing_dispatch_updated( ) fake_time.shift(timedelta(seconds=1)) - dispatch_event = await actor_env.updated_dispatches.receive() + dispatch_event = await test_env.lifecycle_events.receive() match dispatch_event: case Created(dispatch) | Deleted(dispatch): assert False, f"Expected an updated event, got {dispatch_event}" @@ -188,20 +187,20 @@ async def test_existing_dispatch_updated( async def test_existing_dispatch_deleted( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: """Test that an existing dispatch is deleted.""" - sample = await _test_new_dispatch_created(actor_env, generator.generate_dispatch()) + sample = await _test_new_dispatch_created(test_env, generator.generate_dispatch()) - await actor_env.client.delete( - microgrid_id=actor_env.microgrid_id, dispatch_id=sample.id + await test_env.client.delete( + microgrid_id=test_env.microgrid_id, dispatch_id=sample.id ) fake_time.shift(timedelta(seconds=10)) await asyncio.sleep(10) - dispatch_event = await actor_env.updated_dispatches.receive() + dispatch_event = await test_env.lifecycle_events.receive() match dispatch_event: case Created(dispatch) | Updated(dispatch): assert False, "Expected a deleted event" @@ -211,7 +210,7 @@ async def test_existing_dispatch_deleted( async def test_dispatch_inf_duration_deleted( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: @@ -219,30 +218,34 @@ async def test_dispatch_inf_duration_deleted( # Generate a dispatch with infinite duration (duration=None) sample = generator.generate_dispatch() sample = replace( - sample, active=True, duration=None, start_time=_now() + timedelta(seconds=5) + sample, + active=True, + duration=None, + start_time=_now() + timedelta(seconds=5), + type="TEST_TYPE", ) # Create the dispatch - sample = await _test_new_dispatch_created(actor_env, sample) + sample = await _test_new_dispatch_created(test_env, sample) # Advance time to when the dispatch should start fake_time.shift(timedelta(seconds=40)) await asyncio.sleep(40) # Expect notification of the dispatch being ready to run - ready_dispatch = await actor_env.running_state_change.receive() + ready_dispatch = await test_env.running_state_change.receive() assert ready_dispatch.started # Now delete the dispatch - await actor_env.client.delete( - microgrid_id=actor_env.microgrid_id, dispatch_id=sample.id + await test_env.client.delete( + microgrid_id=test_env.microgrid_id, dispatch_id=sample.id ) fake_time.shift(timedelta(seconds=10)) await asyncio.sleep(1) # Expect notification to stop the dispatch - done_dispatch = await actor_env.running_state_change.receive() + done_dispatch = await test_env.running_state_change.receive() assert done_dispatch.started is False async def test_dispatch_inf_duration_updated_stopped_started( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: @@ -250,44 +253,48 @@ async def test_dispatch_inf_duration_updated_stopped_started( # Generate a dispatch with infinite duration (duration=None) sample = generator.generate_dispatch() sample = replace( - sample, active=True, duration=None, start_time=_now() + timedelta(seconds=5) + sample, + active=True, + duration=None, + start_time=_now() + timedelta(seconds=5), + type="TEST_TYPE", ) # Create the dispatch - sample = await _test_new_dispatch_created(actor_env, sample) + sample = await _test_new_dispatch_created(test_env, sample) # Advance time to when the dispatch should start fake_time.shift(timedelta(seconds=40)) await asyncio.sleep(40) # Expect notification of the dispatch being ready to run - ready_dispatch = await actor_env.running_state_change.receive() + ready_dispatch = await test_env.running_state_change.receive() assert ready_dispatch.started # Now update the dispatch to set active=False (stop it) - await actor_env.client.update( - microgrid_id=actor_env.microgrid_id, + await test_env.client.update( + microgrid_id=test_env.microgrid_id, dispatch_id=sample.id, new_fields={"active": False}, ) fake_time.shift(timedelta(seconds=10)) await asyncio.sleep(1) # Expect notification to stop the dispatch - stopped_dispatch = await actor_env.running_state_change.receive() + stopped_dispatch = await test_env.running_state_change.receive() assert stopped_dispatch.started is False # Now update the dispatch to set active=True (start it again) - await actor_env.client.update( - microgrid_id=actor_env.microgrid_id, + await test_env.client.update( + microgrid_id=test_env.microgrid_id, dispatch_id=sample.id, new_fields={"active": True}, ) fake_time.shift(timedelta(seconds=10)) await asyncio.sleep(1) # Expect notification of the dispatch being ready to run again - started_dispatch = await actor_env.running_state_change.receive() + started_dispatch = await test_env.running_state_change.receive() assert started_dispatch.started async def test_dispatch_inf_duration_updated_to_finite_and_stops( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: @@ -299,21 +306,25 @@ async def test_dispatch_inf_duration_updated_to_finite_and_stops( # Generate a dispatch with infinite duration (duration=None) sample = generator.generate_dispatch() sample = replace( - sample, active=True, duration=None, start_time=_now() + timedelta(seconds=5) + sample, + active=True, + duration=None, + start_time=_now() + timedelta(seconds=5), + type="TEST_TYPE", ) # Create the dispatch - sample = await _test_new_dispatch_created(actor_env, sample) + sample = await _test_new_dispatch_created(test_env, sample) # Advance time to when the dispatch should start fake_time.shift(timedelta(seconds=10)) await asyncio.sleep(1) # Expect notification of the dispatch being ready to run - ready_dispatch = await actor_env.running_state_change.receive() + ready_dispatch = await test_env.running_state_change.receive() assert ready_dispatch.started # Update the dispatch to set duration to a finite duration that has already passed # The dispatch has been running for 5 seconds; set duration to 5 seconds - await actor_env.client.update( - microgrid_id=actor_env.microgrid_id, + await test_env.client.update( + microgrid_id=test_env.microgrid_id, dispatch_id=sample.id, new_fields={"duration": timedelta(seconds=5)}, ) @@ -321,21 +332,24 @@ async def test_dispatch_inf_duration_updated_to_finite_and_stops( fake_time.shift(timedelta(seconds=1)) await asyncio.sleep(1) # Expect notification to stop the dispatch because the duration has passed - stopped_dispatch = await actor_env.running_state_change.receive() + stopped_dispatch = await test_env.running_state_change.receive() assert stopped_dispatch.started is False async def test_dispatch_schedule( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: """Test that a random dispatch is scheduled correctly.""" sample = replace( - generator.generate_dispatch(), active=True, duration=timedelta(seconds=10) + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=10), + type="TEST_TYPE", ) - await actor_env.client.create(**to_create_params(actor_env.microgrid_id, sample)) - dispatch = Dispatch(actor_env.client.dispatches(actor_env.microgrid_id)[0]) + await test_env.client.create(**to_create_params(test_env.microgrid_id, sample)) + dispatch = Dispatch(test_env.client.dispatches(test_env.microgrid_id)[0]) next_run = dispatch.next_run_after(_now()) assert next_run is not None @@ -344,7 +358,7 @@ async def test_dispatch_schedule( await asyncio.sleep(1) # Expect notification of the dispatch being ready to run - ready_dispatch = await actor_env.running_state_change.receive() + ready_dispatch = await test_env.running_state_change.receive() assert ready_dispatch == dispatch @@ -354,12 +368,12 @@ async def test_dispatch_schedule( await asyncio.sleep(1) # Expect notification to stop the dispatch - done_dispatch = await actor_env.running_state_change.receive() + done_dispatch = await test_env.running_state_change.receive() assert done_dispatch == dispatch async def test_dispatch_inf_duration_updated_to_finite_and_continues( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: @@ -371,21 +385,25 @@ async def test_dispatch_inf_duration_updated_to_finite_and_continues( # Generate a dispatch with infinite duration (duration=None) sample = generator.generate_dispatch() sample = replace( - sample, active=True, duration=None, start_time=_now() + timedelta(seconds=5) + sample, + active=True, + duration=None, + start_time=_now() + timedelta(seconds=5), + type="TEST_TYPE", ) # Create the dispatch - sample = await _test_new_dispatch_created(actor_env, sample) + sample = await _test_new_dispatch_created(test_env, sample) # Advance time to when the dispatch should start fake_time.shift(timedelta(seconds=10)) await asyncio.sleep(1) # Expect notification of the dispatch being ready to run - ready_dispatch = await actor_env.running_state_change.receive() + ready_dispatch = await test_env.running_state_change.receive() assert ready_dispatch.started # Update the dispatch to set duration to a finite duration that hasn't passed yet # The dispatch has been running for 5 seconds; set duration to 100 seconds - await actor_env.client.update( - microgrid_id=actor_env.microgrid_id, + await test_env.client.update( + microgrid_id=test_env.microgrid_id, dispatch_id=sample.id, new_fields={"duration": timedelta(seconds=100)}, ) @@ -397,12 +415,12 @@ async def test_dispatch_inf_duration_updated_to_finite_and_continues( fake_time.shift(timedelta(seconds=94)) await asyncio.sleep(1) # Expect notification to stop the dispatch because the duration has now passed - stopped_dispatch = await actor_env.running_state_change.receive() + stopped_dispatch = await test_env.running_state_change.receive() assert stopped_dispatch.started is False async def test_dispatch_new_but_finished( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: @@ -415,12 +433,12 @@ async def test_dispatch_new_but_finished( duration=timedelta(seconds=5), start_time=_now() - timedelta(seconds=50), recurrence=RecurrenceRule(), - type="I_SHOULD_NEVER_RUN", + type="TEST_TYPE", ) # Create an old dispatch - actor_env.client.set_dispatches(actor_env.microgrid_id, [finished_dispatch]) - await actor_env.actor.stop() - actor_env.actor.start() + test_env.client.set_dispatches(test_env.microgrid_id, [finished_dispatch]) + await test_env.service.stop() + test_env.service.start() # Create another dispatch the normal way new_dispatch = generator.generate_dispatch() @@ -430,20 +448,20 @@ async def test_dispatch_new_but_finished( duration=timedelta(seconds=10), start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), - type="NEW_BETTER_DISPATCH", + type="TEST_TYPE", ) # Consume one lifecycle_updates event - await actor_env.updated_dispatches.receive() - new_dispatch = await _test_new_dispatch_created(actor_env, new_dispatch) + await test_env.lifecycle_events.receive() + new_dispatch = await _test_new_dispatch_created(test_env, new_dispatch) # Advance time to when the new dispatch should still not start fake_time.shift(timedelta(seconds=100)) - assert await actor_env.running_state_change.receive() == new_dispatch + assert await test_env.running_state_change.receive() == new_dispatch async def test_notification_on_actor_start( - actor_env: ActorTestEnv, + test_env: TestEnv, generator: DispatchGenerator, fake_time: time_machine.Coordinates, ) -> None: @@ -456,7 +474,7 @@ async def test_notification_on_actor_start( duration=timedelta(seconds=10), start_time=_now() - timedelta(seconds=5), recurrence=RecurrenceRule(), - type="I_SHOULD_RUN", + type="TEST_TYPE", ) # Generate a dispatch that is not running stopped_dispatch = generator.generate_dispatch() @@ -466,19 +484,19 @@ async def test_notification_on_actor_start( duration=timedelta(seconds=5), start_time=_now() - timedelta(seconds=5), recurrence=RecurrenceRule(), - type="I_SHOULD_NOT_RUN", + type="TEST_TYPE", ) - await actor_env.actor.stop() + await test_env.service.stop() # Create the dispatches - actor_env.client.set_dispatches( - actor_env.microgrid_id, [running_dispatch, stopped_dispatch] + test_env.client.set_dispatches( + test_env.microgrid_id, [running_dispatch, stopped_dispatch] ) - actor_env.actor.start() + test_env.service.start() fake_time.shift(timedelta(seconds=1)) await asyncio.sleep(1) # Expect notification of the running dispatch being ready to run - ready_dispatch = await actor_env.running_state_change.receive() + ready_dispatch = await test_env.running_state_change.receive() assert ready_dispatch.started diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 027b057..7722b63 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -18,7 +18,7 @@ from pytest import fixture from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate -from frequenz.dispatch.actor import DispatchingActor +from frequenz.dispatch._bg_service import DispatchScheduler @fixture @@ -77,6 +77,8 @@ async def test_env() -> AsyncIterator[TestEnv]: updates_sender=updates_channel.new_sender(), ) + # pylint: disable=protected-access + runner_actor._restart_limit = 0 runner_actor.start() yield TestEnv( @@ -140,14 +142,16 @@ def test_heapq_dispatch_compare(test_env: TestEnv) -> None: until_time = now + timedelta(minutes=5) # Create the heap - scheduled_events: list[DispatchingActor.QueueItem] = [] + scheduled_events: list[DispatchScheduler.QueueItem] = [] # Push two events with the same 'until' time onto the heap heapq.heappush( - scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch1)) + scheduled_events, + DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1)), ) heapq.heappush( - scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch2)) + scheduled_events, + DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2)), )