11# License: MIT
22# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33
4- """The dispatch actor ."""
4+ """The dispatch background service ."""
55
6+ import asyncio
67import logging
78from dataclasses import dataclass , field
89from datetime import datetime , timedelta , timezone
910from heapq import heappop , heappush
1011
1112import grpc .aio
12- from frequenz .channels import Sender , select , selected_from
13+ from frequenz .channels import Broadcast , Receiver , select , selected_from
1314from frequenz .channels .timer import SkipMissedAndResync , Timer
1415from frequenz .client .dispatch import Client
1516from frequenz .client .dispatch .types import Event
16- from frequenz .sdk .actor import Actor
17+ from frequenz .sdk .actor import BackgroundService
1718
1819from ._dispatch import Dispatch
1920from ._event import Created , Deleted , DispatchEvent , Updated
2223"""The logger for this module."""
2324
2425
25- class DispatchingActor (Actor ):
26- """Dispatch actor.
26+ # pylint: disable=too-many-instance-attributes
27+ class DispatchScheduler (BackgroundService ):
28+ """Dispatch background service.
2729
28- This actor is responsible for handling dispatches for a microgrid.
29-
30- This means staying in sync with the API and scheduling
31- dispatches as necessary.
30+ This service is responsible for managing dispatches and scheduling them
31+ based on their start and stop times.
3232 """
3333
3434 @dataclass (order = True )
@@ -50,24 +50,28 @@ def __init__(
5050 self ,
5151 microgrid_id : int ,
5252 client : Client ,
53- lifecycle_updates_sender : Sender [DispatchEvent ],
54- running_state_change_sender : Sender [Dispatch ],
5553 ) -> None :
56- """Initialize the actor .
54+ """Initialize the background service .
5755
5856 Args:
5957 microgrid_id: The microgrid ID to handle dispatches for.
6058 client: The client to use for fetching dispatches.
61- lifecycle_updates_sender: A sender for dispatch lifecycle events.
62- running_state_change_sender: A sender for dispatch running state changes.
6359 """
6460 super ().__init__ (name = "dispatch" )
6561
6662 self ._client = client
6763 self ._dispatches : dict [int , Dispatch ] = {}
6864 self ._microgrid_id = microgrid_id
69- self ._lifecycle_updates_sender = lifecycle_updates_sender
70- self ._running_state_change_sender = running_state_change_sender
65+
66+ self ._lifecycle_events_channel = Broadcast [DispatchEvent ](
67+ name = "lifecycle_events"
68+ )
69+ self ._lifecycle_events_tx = self ._lifecycle_events_channel .new_sender ()
70+ self ._running_state_status_channel = Broadcast [Dispatch ](
71+ name = "running_state_status"
72+ )
73+
74+ self ._running_state_status_tx = self ._running_state_status_channel .new_sender ()
7175 self ._next_event_timer = Timer (
7276 timedelta (seconds = 100 ), SkipMissedAndResync (), auto_start = False
7377 )
@@ -76,17 +80,65 @@ def __init__(
7680 Interval is chosen arbitrarily, as it will be reset on the first event.
7781 """
7882
79- self ._scheduled_events : list ["DispatchingActor .QueueItem" ] = []
83+ self ._scheduled_events : list ["DispatchScheduler .QueueItem" ] = []
8084 """The scheduled events, sorted by time.
8185
8286 Each event is a tuple of the scheduled time and the dispatch.
8387 heapq is used to keep the list sorted by time, so the next event is
8488 always at index 0.
8589 """
8690
91+ # pylint: disable=redefined-builtin
92+ def new_lifecycle_events_receiver (self , type : str ) -> Receiver [DispatchEvent ]:
93+ """Create a new receiver for lifecycle events.
94+
95+ Args:
96+ type: The type of events to receive.
97+
98+ Returns:
99+ A new receiver for lifecycle events.
100+ """
101+ return self ._lifecycle_events_channel .new_receiver ().filter (
102+ lambda event : event .dispatch .type == type
103+ )
104+
105+ async def new_running_state_event_receiver (self , type : str ) -> Receiver [Dispatch ]:
106+ """Create a new receiver for running state events.
107+
108+ Args:
109+ type: The type of events to receive.
110+
111+ Returns:
112+ A new receiver for running state status.
113+ """
114+ # Find all matching dispatches based on the type and collect them
115+ dispatches = [
116+ dispatch for dispatch in self ._dispatches .values () if dispatch .type == type
117+ ]
118+
119+ # Create receiver with enough capacity to hold all matching dispatches
120+ receiver = self ._running_state_status_channel .new_receiver (
121+ limit = max (1 , len (dispatches ))
122+ ).filter (lambda dispatch : dispatch .type == type )
123+
124+ # Send all matching dispatches to the receiver
125+ for dispatch in dispatches :
126+ await self ._send_running_state_change (dispatch )
127+
128+ return receiver
129+
130+ # pylint: enable=redefined-builtin
131+
132+ def start (self ) -> None :
133+ """Start the background service."""
134+ self ._tasks .add (asyncio .create_task (self ._run ()))
135+
87136 async def _run (self ) -> None :
88- """Run the actor."""
89- _logger .info ("Starting dispatch actor for microgrid %s" , self ._microgrid_id )
137+ """Run the background service."""
138+ _logger .info (
139+ "Starting dispatching background service for microgrid %s" ,
140+ self ._microgrid_id ,
141+ )
90142
91143 # Initial fetch
92144 await self ._fetch ()
@@ -111,24 +163,18 @@ async def _run(self) -> None:
111163 case Event .CREATED :
112164 self ._dispatches [dispatch .id ] = dispatch
113165 await self ._update_dispatch_schedule_and_notify (dispatch , None )
114- await self ._lifecycle_updates_sender .send (
115- Created (dispatch = dispatch )
116- )
166+ await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
117167 case Event .UPDATED :
118168 await self ._update_dispatch_schedule_and_notify (
119169 dispatch , self ._dispatches [dispatch .id ]
120170 )
121171 self ._dispatches [dispatch .id ] = dispatch
122- await self ._lifecycle_updates_sender .send (
123- Updated (dispatch = dispatch )
124- )
172+ await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
125173 case Event .DELETED :
126174 self ._dispatches .pop (dispatch .id )
127175 await self ._update_dispatch_schedule_and_notify (None , dispatch )
128176
129- await self ._lifecycle_updates_sender .send (
130- Deleted (dispatch = dispatch )
131- )
177+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
132178
133179 async def _execute_scheduled_event (self , dispatch : Dispatch ) -> None :
134180 """Execute a scheduled event.
@@ -170,17 +216,13 @@ async def _fetch(self) -> None:
170216 if not old_dispatch :
171217 _logger .info ("New dispatch: %s" , dispatch )
172218 await self ._update_dispatch_schedule_and_notify (dispatch , None )
173- await self ._lifecycle_updates_sender .send (
174- Created (dispatch = dispatch )
175- )
219+ await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
176220 elif dispatch .update_time != old_dispatch .update_time :
177221 _logger .info ("Updated dispatch: %s" , dispatch )
178222 await self ._update_dispatch_schedule_and_notify (
179223 dispatch , old_dispatch
180224 )
181- await self ._lifecycle_updates_sender .send (
182- Updated (dispatch = dispatch )
183- )
225+ await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
184226
185227 except grpc .aio .AioRpcError as error :
186228 _logger .error ("Error fetching dispatches: %s" , error )
@@ -189,13 +231,13 @@ async def _fetch(self) -> None:
189231
190232 for dispatch in old_dispatches .values ():
191233 _logger .info ("Deleted dispatch: %s" , dispatch )
192- await self ._lifecycle_updates_sender .send (Deleted (dispatch = dispatch ))
234+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
193235 await self ._update_dispatch_schedule_and_notify (None , dispatch )
194236
195237 # Set deleted only here as it influences the result of dispatch.started
196238 # which is used in above in _running_state_change
197239 dispatch ._set_deleted () # pylint: disable=protected-access
198- await self ._lifecycle_updates_sender .send (Deleted (dispatch = dispatch ))
240+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
199241
200242 async def _update_dispatch_schedule_and_notify (
201243 self , dispatch : Dispatch | None , old_dispatch : Dispatch | None
@@ -359,4 +401,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None:
359401 Args:
360402 dispatch: The dispatch that changed.
361403 """
362- await self ._running_state_change_sender .send (dispatch )
404+ await self ._running_state_status_tx .send (dispatch )
0 commit comments