Skip to content

Commit 10ce52e

Browse files
committed
Replace channel properties with receiver functions
This allows us to control what happens when a new receiver is created. In this case we want to send all relevant running state events to each new receiver. It won't hurt us that other receivers might get them as well as it would just repeat the states they already know of. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent c4a8200 commit 10ce52e

File tree

4 files changed

+119
-65
lines changed

4 files changed

+119
-65
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66

77
## Upgrading
88

9-
* The method `Dispatch.running(type: str)` was replaced with the property `Dispatch.started: bool`.
10-
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1500
9+
* Two properties have been replaced by methods that require a type as parameter.
10+
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, type: str)`.
11+
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, type: str)`.
1112

1213
## New Features
1314

1415
<!-- Here goes the main new features and examples or instructions on how to use them -->
1516

1617
## Bug Fixes
1718

18-
* Fixed a crash when reading a Dispatch with frequency YEARLY.
19+
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

src/frequenz/dispatch/_dispatcher.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import abc
77
from typing import Protocol, TypeVar
88

9-
from frequenz.channels import Broadcast, Receiver
9+
from frequenz.channels import Receiver
1010
from frequenz.client.dispatch import Client
1111

1212
from ._dispatch import Dispatch
@@ -197,16 +197,10 @@ def __init__(
197197
server_url: The URL of the dispatch service.
198198
key: The key to access the service.
199199
"""
200-
self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
201-
self._lifecycle_events_channel = Broadcast[DispatchEvent](
202-
name="lifecycle_events"
203-
)
204200
self._client = Client(server_url=server_url, key=key)
205201
self._actor = DispatchingActor(
206202
microgrid_id,
207203
self._client,
208-
self._lifecycle_events_channel.new_sender(),
209-
self._running_state_channel.new_sender(),
210204
)
211205

212206
async def start(self) -> None:
@@ -218,18 +212,20 @@ def client(self) -> Client:
218212
"""Return the client."""
219213
return self._client
220214

221-
@property
222-
def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]:
223-
"""Return new, updated or deleted dispatches receiver fetcher.
215+
# pylint: disable=redefined-builtin
216+
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
217+
"""Return new, updated or deleted dispatches receiver.
218+
219+
Args:
220+
type: The type of the dispatch to listen for.
224221
225222
Returns:
226223
A new receiver for new dispatches.
227224
"""
228-
return self._lifecycle_events_channel
225+
return self._actor.new_lifecycle_events_receiver(type)
229226

230-
@property
231-
def running_status_change(self) -> ReceiverFetcher[Dispatch]:
232-
"""Return running status change receiver fetcher.
227+
async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]:
228+
"""Return running state event receiver.
233229
234230
This receiver will receive a message whenever the current running
235231
status of a dispatch changes.
@@ -242,7 +238,7 @@ def running_status_change(self) -> ReceiverFetcher[Dispatch]:
242238
then a message will be sent.
243239
244240
In other words: Any change that is expected to make an actor start, stop
245-
or reconfigure itself with new parameters causes a message to be
241+
or adjust itself according to new dispatch options causes a message to be
246242
sent.
247243
248244
A non-exhaustive list of possible changes that will cause a message to be sent:
@@ -255,7 +251,10 @@ def running_status_change(self) -> ReceiverFetcher[Dispatch]:
255251
- The payload changed
256252
- The dispatch was deleted
257253
254+
Args:
255+
type: The type of the dispatch to listen for.
256+
258257
Returns:
259258
A new receiver for dispatches whose running status changed.
260259
"""
261-
return self._running_state_channel
260+
return await self._actor.new_running_state_event_receiver(type)

src/frequenz/dispatch/actor.py

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from heapq import heappop, heappush
1010

1111
import grpc.aio
12-
from frequenz.channels import Sender, select, selected_from
12+
from frequenz.channels import Broadcast, Receiver, select, selected_from
1313
from frequenz.channels.timer import SkipMissedAndResync, Timer
1414
from frequenz.client.dispatch import Client
1515
from frequenz.client.dispatch.types import Event
@@ -22,6 +22,7 @@
2222
"""The logger for this module."""
2323

2424

25+
# pylint: disable=too-many-instance-attributes
2526
class DispatchingActor(Actor):
2627
"""Dispatch actor.
2728
@@ -50,24 +51,28 @@ def __init__(
5051
self,
5152
microgrid_id: int,
5253
client: Client,
53-
lifecycle_updates_sender: Sender[DispatchEvent],
54-
running_state_change_sender: Sender[Dispatch],
5554
) -> None:
5655
"""Initialize the actor.
5756
5857
Args:
5958
microgrid_id: The microgrid ID to handle dispatches for.
6059
client: The client to use for fetching dispatches.
61-
lifecycle_updates_sender: A sender for dispatch lifecycle events.
62-
running_state_change_sender: A sender for dispatch running state changes.
6360
"""
6461
super().__init__(name="dispatch")
6562

6663
self._client = client
6764
self._dispatches: dict[int, Dispatch] = {}
6865
self._microgrid_id = microgrid_id
69-
self._lifecycle_updates_sender = lifecycle_updates_sender
70-
self._running_state_change_sender = running_state_change_sender
66+
67+
self._lifecycle_events_channel = Broadcast[DispatchEvent](
68+
name="lifecycle_events"
69+
)
70+
self._lifecycle_events_tx = self._lifecycle_events_channel.new_sender()
71+
self._running_state_status_channel = Broadcast[Dispatch](
72+
name="running_state_status"
73+
)
74+
75+
self._running_state_status_tx = self._running_state_status_channel.new_sender()
7176
self._next_event_timer = Timer(
7277
timedelta(seconds=100), SkipMissedAndResync(), auto_start=False
7378
)
@@ -84,6 +89,47 @@ def __init__(
8489
always at index 0.
8590
"""
8691

92+
# pylint: disable=redefined-builtin
93+
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
94+
"""Create a new receiver for lifecycle events.
95+
96+
Args:
97+
type: The type of events to receive.
98+
99+
Returns:
100+
A new receiver for lifecycle events.
101+
"""
102+
return self._lifecycle_events_channel.new_receiver().filter(
103+
lambda event: event.dispatch.type == type
104+
)
105+
106+
async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]:
107+
"""Create a new receiver for running state events.
108+
109+
Args:
110+
type: The type of events to receive.
111+
112+
Returns:
113+
A new receiver for running state status.
114+
"""
115+
# Find all matching dispatches based on the type and collect them
116+
dispatches = [
117+
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
118+
]
119+
120+
# Create receiver with enough capacity to hold all matching dispatches
121+
receiver = self._running_state_status_channel.new_receiver(
122+
limit=max(1, len(dispatches))
123+
).filter(lambda dispatch: dispatch.type == type)
124+
125+
# Send all matching dispatches to the receiver
126+
for dispatch in dispatches:
127+
await self._send_running_state_change(dispatch)
128+
129+
return receiver
130+
131+
# pylint: enable=redefined-builtin
132+
87133
async def _run(self) -> None:
88134
"""Run the actor."""
89135
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)
@@ -111,24 +157,18 @@ async def _run(self) -> None:
111157
case Event.CREATED:
112158
self._dispatches[dispatch.id] = dispatch
113159
await self._update_dispatch_schedule_and_notify(dispatch, None)
114-
await self._lifecycle_updates_sender.send(
115-
Created(dispatch=dispatch)
116-
)
160+
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
117161
case Event.UPDATED:
118162
await self._update_dispatch_schedule_and_notify(
119163
dispatch, self._dispatches[dispatch.id]
120164
)
121165
self._dispatches[dispatch.id] = dispatch
122-
await self._lifecycle_updates_sender.send(
123-
Updated(dispatch=dispatch)
124-
)
166+
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
125167
case Event.DELETED:
126168
self._dispatches.pop(dispatch.id)
127169
await self._update_dispatch_schedule_and_notify(None, dispatch)
128170

129-
await self._lifecycle_updates_sender.send(
130-
Deleted(dispatch=dispatch)
131-
)
171+
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
132172

133173
async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
134174
"""Execute a scheduled event.
@@ -170,17 +210,13 @@ async def _fetch(self) -> None:
170210
if not old_dispatch:
171211
_logger.info("New dispatch: %s", dispatch)
172212
await self._update_dispatch_schedule_and_notify(dispatch, None)
173-
await self._lifecycle_updates_sender.send(
174-
Created(dispatch=dispatch)
175-
)
213+
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
176214
elif dispatch.update_time != old_dispatch.update_time:
177215
_logger.info("Updated dispatch: %s", dispatch)
178216
await self._update_dispatch_schedule_and_notify(
179217
dispatch, old_dispatch
180218
)
181-
await self._lifecycle_updates_sender.send(
182-
Updated(dispatch=dispatch)
183-
)
219+
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
184220

185221
except grpc.aio.AioRpcError as error:
186222
_logger.error("Error fetching dispatches: %s", error)
@@ -189,13 +225,13 @@ async def _fetch(self) -> None:
189225

190226
for dispatch in old_dispatches.values():
191227
_logger.info("Deleted dispatch: %s", dispatch)
192-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
228+
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
193229
await self._update_dispatch_schedule_and_notify(None, dispatch)
194230

195231
# Set deleted only here as it influences the result of dispatch.started
196232
# which is used in above in _running_state_change
197233
dispatch._set_deleted() # pylint: disable=protected-access
198-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
234+
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
199235

200236
async def _update_dispatch_schedule_and_notify(
201237
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
@@ -359,4 +395,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None:
359395
Args:
360396
dispatch: The dispatch that changed.
361397
"""
362-
await self._running_state_change_sender.send(dispatch)
398+
await self._running_state_status_tx.send(dispatch)

0 commit comments

Comments
 (0)