99from heapq import heappop , heappush
1010
1111import grpc .aio
12- from frequenz .channels import Sender , select , selected_from
12+ from frequenz .channels import Broadcast , Receiver , select , selected_from
1313from frequenz .channels .timer import SkipMissedAndResync , Timer
1414from frequenz .client .dispatch import Client
1515from frequenz .client .dispatch .types import Event
@@ -50,24 +50,28 @@ def __init__(
5050 self ,
5151 microgrid_id : int ,
5252 client : Client ,
53- lifecycle_updates_sender : Sender [DispatchEvent ],
54- running_state_change_sender : Sender [Dispatch ],
5553 ) -> None :
5654 """Initialize the actor.
5755
5856 Args:
5957 microgrid_id: The microgrid ID to handle dispatches for.
6058 client: The client to use for fetching dispatches.
61- lifecycle_updates_sender: A sender for dispatch lifecycle events.
62- running_state_change_sender: A sender for dispatch running state changes.
6359 """
6460 super ().__init__ (name = "dispatch" )
6561
6662 self ._client = client
6763 self ._dispatches : dict [int , Dispatch ] = {}
6864 self ._microgrid_id = microgrid_id
69- self ._lifecycle_updates_sender = lifecycle_updates_sender
70- self ._running_state_change_sender = running_state_change_sender
65+
66+ self ._lifecycle_events_channel = Broadcast [DispatchEvent ](
67+ name = "lifecycle_events"
68+ )
69+ self ._lifecycle_events_tx = self ._lifecycle_events_channel .new_sender ()
70+ self ._running_state_status_channel = Broadcast [Dispatch ](
71+ name = "running_state_status"
72+ )
73+
74+ self ._running_state_status_tx = self ._running_state_status_channel .new_sender ()
7175 self ._next_event_timer = Timer (
7276 timedelta (seconds = 100 ), SkipMissedAndResync (), auto_start = False
7377 )
@@ -84,6 +88,44 @@ def __init__(
8488 always at index 0.
8589 """
8690
91+ def new_lifecycle_events_receiver (self , type : str ) -> Receiver [DispatchEvent ]:
92+ """Create a new receiver for lifecycle events.
93+
94+ Args:
95+ type: The type of events to receive.
96+
97+ Returns:
98+ A new receiver for lifecycle events.
99+ """
100+ return self ._lifecycle_events_channel .new_receiver ().filter (
101+ lambda event : event .dispatch .type == type
102+ )
103+
104+ async def new_running_state_event_receiver (self , type : str ) -> Receiver [Dispatch ]:
105+ """Create a new receiver for running state events.
106+
107+ Args:
108+ type: The type of events to receive.
109+
110+ Returns:
111+ A new receiver for running state status.
112+ """
113+ # Find all matching dispatches based on the type and collect them
114+ dispatches = [
115+ dispatch for dispatch in self ._dispatches .values () if dispatch .type == type
116+ ]
117+
118+ # Create receiver with enough capacity to hold all matching dispatches
119+ receiver = self ._running_state_status_channel .new_receiver (
120+ limit = max (1 , len (dispatches ))
121+ ).filter (lambda dispatch : dispatch .type == type )
122+
123+ # Send all matching dispatches to the receiver
124+ for dispatch in dispatches :
125+ await self ._send_running_state_change (dispatch )
126+
127+ return receiver
128+
87129 async def _run (self ) -> None :
88130 """Run the actor."""
89131 _logger .info ("Starting dispatch actor for microgrid %s" , self ._microgrid_id )
@@ -111,24 +153,18 @@ async def _run(self) -> None:
111153 case Event .CREATED :
112154 self ._dispatches [dispatch .id ] = dispatch
113155 await self ._update_dispatch_schedule_and_notify (dispatch , None )
114- await self ._lifecycle_updates_sender .send (
115- Created (dispatch = dispatch )
116- )
156+ await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
117157 case Event .UPDATED :
118158 await self ._update_dispatch_schedule_and_notify (
119159 dispatch , self ._dispatches [dispatch .id ]
120160 )
121161 self ._dispatches [dispatch .id ] = dispatch
122- await self ._lifecycle_updates_sender .send (
123- Updated (dispatch = dispatch )
124- )
162+ await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
125163 case Event .DELETED :
126164 self ._dispatches .pop (dispatch .id )
127165 await self ._update_dispatch_schedule_and_notify (None , dispatch )
128166
129- await self ._lifecycle_updates_sender .send (
130- Deleted (dispatch = dispatch )
131- )
167+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
132168
133169 async def _execute_scheduled_event (self , dispatch : Dispatch ) -> None :
134170 """Execute a scheduled event.
@@ -170,17 +206,13 @@ async def _fetch(self) -> None:
170206 if not old_dispatch :
171207 _logger .info ("New dispatch: %s" , dispatch )
172208 await self ._update_dispatch_schedule_and_notify (dispatch , None )
173- await self ._lifecycle_updates_sender .send (
174- Created (dispatch = dispatch )
175- )
209+ await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
176210 elif dispatch .update_time != old_dispatch .update_time :
177211 _logger .info ("Updated dispatch: %s" , dispatch )
178212 await self ._update_dispatch_schedule_and_notify (
179213 dispatch , old_dispatch
180214 )
181- await self ._lifecycle_updates_sender .send (
182- Updated (dispatch = dispatch )
183- )
215+ await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
184216
185217 except grpc .aio .AioRpcError as error :
186218 _logger .error ("Error fetching dispatches: %s" , error )
@@ -189,13 +221,13 @@ async def _fetch(self) -> None:
189221
190222 for dispatch in old_dispatches .values ():
191223 _logger .info ("Deleted dispatch: %s" , dispatch )
192- await self ._lifecycle_updates_sender .send (Deleted (dispatch = dispatch ))
224+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
193225 await self ._update_dispatch_schedule_and_notify (None , dispatch )
194226
195227 # Set deleted only here as it influences the result of dispatch.started
196228 # which is used in above in _running_state_change
197229 dispatch ._set_deleted () # pylint: disable=protected-access
198- await self ._lifecycle_updates_sender .send (Deleted (dispatch = dispatch ))
230+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
199231
200232 async def _update_dispatch_schedule_and_notify (
201233 self , dispatch : Dispatch | None , old_dispatch : Dispatch | None
@@ -359,4 +391,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None:
359391 Args:
360392 dispatch: The dispatch that changed.
361393 """
362- await self ._running_state_change_sender .send (dispatch )
394+ await self ._running_state_status_tx .send (dispatch )
0 commit comments