Skip to content

Commit 9ee80fd

Browse files
committed
Send STOPPED message when duration is reached
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 573cc0f commit 9ee80fd

File tree

4 files changed

+26
-3
lines changed

4 files changed

+26
-3
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
* `Dispatcher.running_state_change` now also sends a message when the duration specified in the dispatch has passed. If no duration is specified, no STOPPED message will be sent.
1010

1111
## New Features
1212

src/frequenz/dispatch/_dispatch.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ def running(self, type_: str) -> RunningState:
117117
if not self.active or self.deleted:
118118
return RunningState.STOPPED
119119

120+
# A dispatch without duration is always running
121+
if self.duration is None:
122+
return RunningState.RUNNING
123+
120124
now = datetime.now(tz=timezone.utc)
121125
if until := self._until(now):
122126
return RunningState.RUNNING if now < until else RunningState.STOPPED

src/frequenz/dispatch/actor.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,20 @@ def next_run_info() -> tuple[datetime, datetime] | None:
183183
_logger.info("Dispatch %s scheduled for %s", dispatch.id, next_time)
184184
await asyncio.sleep((next_time - now).total_seconds())
185185

186-
_logger.info("Dispatch ready: %s", dispatch)
186+
_logger.info("Dispatch %s executing...", dispatch)
187187
await self._running_state_change_sender.send(dispatch)
188188

189-
_logger.info("Dispatch finished: %s", dispatch)
189+
# Wait for the duration of the dispatch if set
190+
if dispatch.duration:
191+
_logger.info(
192+
"Dispatch %s running for %s", dispatch.id, dispatch.duration
193+
)
194+
await asyncio.sleep(dispatch.duration.total_seconds())
195+
196+
_logger.info("Dispatch %s runtime duration reached", dispatch.id)
197+
await self._running_state_change_sender.send(dispatch)
198+
199+
_logger.info("Dispatch completed: %s", dispatch)
190200
self._scheduled.pop(dispatch.id)
191201

192202
def _running_state_change(

tests/test_frequenz_dispatch.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ async def test_dispatch_schedule(
236236
fake_time.shift(next_run - _now() - timedelta(seconds=1))
237237
await asyncio.sleep(1)
238238

239+
# Expect notification of the dispatch being ready to run
239240
ready_dispatch = await actor_env.ready_dispatches.receive()
240241

241242
assert ready_dispatch == dispatch
243+
244+
# Shift time to the end of the dispatch
245+
fake_time.shift(dispatch.duration + timedelta(seconds=1))
246+
await asyncio.sleep(1)
247+
248+
# Expect notification to stop the dispatch
249+
done_dispatch = await actor_env.ready_dispatches.receive()
250+
assert done_dispatch == dispatch

0 commit comments

Comments
 (0)