Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,9 @@ async def run():

actor = MagicMock() # replace with your actor

changed_running_status_rx = dispatcher.running_status_change.new_receiver()
changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE")

async for dispatch in changed_running_status_rx:
if dispatch.type != "MY_TYPE":
continue

if dispatch.started:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
if actor.is_running:
Expand Down
7 changes: 4 additions & 3 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

## Upgrading

* The method `Dispatch.running(type: str)` was replaced with the property `Dispatch.started: bool`.
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1500
* Two properties have been replaced by methods that require a type as parameter.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, type: str)`.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->

## Bug Fixes

* Fixed a crash when reading a Dispatch with frequency YEARLY.
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
3 changes: 1 addition & 2 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""

from ._dispatch import Dispatch
from ._dispatcher import Dispatcher, ReceiverFetcher
from ._dispatcher import Dispatcher
from ._event import Created, Deleted, DispatchEvent, Updated
from ._managing_actor import DispatchManagingActor, DispatchUpdate

Expand All @@ -25,7 +25,6 @@
"Deleted",
"DispatchEvent",
"Dispatcher",
"ReceiverFetcher",
"Updated",
"Dispatch",
"DispatchManagingActor",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""The dispatch actor."""
"""The dispatch background service."""

import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from heapq import heappop, heappush

import grpc.aio
from frequenz.channels import Sender, select, selected_from
from frequenz.channels import Broadcast, Receiver, select, selected_from
from frequenz.channels.timer import SkipMissedAndResync, Timer
from frequenz.client.dispatch import Client
from frequenz.client.dispatch.types import Event
from frequenz.sdk.actor import Actor
from frequenz.sdk.actor import BackgroundService

from ._dispatch import Dispatch
from ._event import Created, Deleted, DispatchEvent, Updated
Expand All @@ -22,13 +23,12 @@
"""The logger for this module."""


class DispatchingActor(Actor):
"""Dispatch actor.
# pylint: disable=too-many-instance-attributes
class DispatchScheduler(BackgroundService):
"""Dispatch background service.

This actor is responsible for handling dispatches for a microgrid.

This means staying in sync with the API and scheduling
dispatches as necessary.
This service is responsible for managing dispatches and scheduling them
based on their start and stop times.
"""

@dataclass(order=True)
Expand All @@ -50,24 +50,28 @@ def __init__(
self,
microgrid_id: int,
client: Client,
lifecycle_updates_sender: Sender[DispatchEvent],
running_state_change_sender: Sender[Dispatch],
) -> None:
"""Initialize the actor.
"""Initialize the background service.

Args:
microgrid_id: The microgrid ID to handle dispatches for.
client: The client to use for fetching dispatches.
lifecycle_updates_sender: A sender for dispatch lifecycle events.
running_state_change_sender: A sender for dispatch running state changes.
"""
super().__init__(name="dispatch")

self._client = client
self._dispatches: dict[int, Dispatch] = {}
self._microgrid_id = microgrid_id
self._lifecycle_updates_sender = lifecycle_updates_sender
self._running_state_change_sender = running_state_change_sender

self._lifecycle_events_channel = Broadcast[DispatchEvent](
name="lifecycle_events"
)
self._lifecycle_events_tx = self._lifecycle_events_channel.new_sender()
self._running_state_status_channel = Broadcast[Dispatch](
name="running_state_status"
)

self._running_state_status_tx = self._running_state_status_channel.new_sender()
self._next_event_timer = Timer(
timedelta(seconds=100), SkipMissedAndResync(), auto_start=False
)
Expand All @@ -76,17 +80,65 @@ def __init__(
Interval is chosen arbitrarily, as it will be reset on the first event.
"""

self._scheduled_events: list["DispatchingActor.QueueItem"] = []
self._scheduled_events: list["DispatchScheduler.QueueItem"] = []
"""The scheduled events, sorted by time.

Each event is a tuple of the scheduled time and the dispatch.
heapq is used to keep the list sorted by time, so the next event is
always at index 0.
"""

# pylint: disable=redefined-builtin
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
"""Create a new receiver for lifecycle events.

Args:
type: The type of events to receive.

Returns:
A new receiver for lifecycle events.
"""
return self._lifecycle_events_channel.new_receiver().filter(
lambda event: event.dispatch.type == type
)

async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]:
"""Create a new receiver for running state events.

Args:
type: The type of events to receive.

Returns:
A new receiver for running state status.
"""
# Find all matching dispatches based on the type and collect them
dispatches = [
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
]

# Create receiver with enough capacity to hold all matching dispatches
receiver = self._running_state_status_channel.new_receiver(
limit=max(1, len(dispatches))
).filter(lambda dispatch: dispatch.type == type)

# Send all matching dispatches to the receiver
for dispatch in dispatches:
await self._send_running_state_change(dispatch)

return receiver

# pylint: enable=redefined-builtin

def start(self) -> None:
"""Start the background service."""
self._tasks.add(asyncio.create_task(self._run()))

async def _run(self) -> None:
"""Run the actor."""
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)
"""Run the background service."""
_logger.info(
"Starting dispatching background service for microgrid %s",
self._microgrid_id,
)

# Initial fetch
await self._fetch()
Expand All @@ -111,24 +163,18 @@ async def _run(self) -> None:
case Event.CREATED:
self._dispatches[dispatch.id] = dispatch
await self._update_dispatch_schedule_and_notify(dispatch, None)
await self._lifecycle_updates_sender.send(
Created(dispatch=dispatch)
)
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
case Event.UPDATED:
await self._update_dispatch_schedule_and_notify(
dispatch, self._dispatches[dispatch.id]
)
self._dispatches[dispatch.id] = dispatch
await self._lifecycle_updates_sender.send(
Updated(dispatch=dispatch)
)
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
case Event.DELETED:
self._dispatches.pop(dispatch.id)
await self._update_dispatch_schedule_and_notify(None, dispatch)

await self._lifecycle_updates_sender.send(
Deleted(dispatch=dispatch)
)
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))

async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
"""Execute a scheduled event.
Expand Down Expand Up @@ -170,17 +216,13 @@ async def _fetch(self) -> None:
if not old_dispatch:
_logger.info("New dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(dispatch, None)
await self._lifecycle_updates_sender.send(
Created(dispatch=dispatch)
)
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
elif dispatch.update_time != old_dispatch.update_time:
_logger.info("Updated dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(
dispatch, old_dispatch
)
await self._lifecycle_updates_sender.send(
Updated(dispatch=dispatch)
)
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))

except grpc.aio.AioRpcError as error:
_logger.error("Error fetching dispatches: %s", error)
Expand All @@ -189,13 +231,13 @@ async def _fetch(self) -> None:

for dispatch in old_dispatches.values():
_logger.info("Deleted dispatch: %s", dispatch)
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
await self._update_dispatch_schedule_and_notify(None, dispatch)

# Set deleted only here as it influences the result of dispatch.started
# which is used in above in _running_state_change
dispatch._set_deleted() # pylint: disable=protected-access
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))

async def _update_dispatch_schedule_and_notify(
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
Expand Down Expand Up @@ -359,4 +401,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None:
Args:
dispatch: The dispatch that changed.
"""
await self._running_state_change_sender.send(dispatch)
await self._running_state_status_tx.send(dispatch)
Loading
Loading