Skip to content
Merged
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- Calls to `microgrid.*_pool` methods now accept an optional `in_shifting_group` parameter. Power requests sent to `*_pool` instances that have the `in_shifting_group` flag set, will get resolved separately, and their target power will be added to the target power calculated from regular actors, if any, which would, in effect, shift the zero for the regular actors by the target power from the shifting group.

## Bug Fixes

Expand Down
21 changes: 21 additions & 0 deletions src/frequenz/sdk/actor/_power_managing/_base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class ReportRequest:
priority: int
"""The priority of the actor ."""

in_shifting_group: bool
"""Whether the proposal gets sent to the shifting group of the power manager."""

def get_channel_name(self) -> str:
"""Get the channel name for the report request.

Expand Down Expand Up @@ -216,6 +219,9 @@ class Proposal:
request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0)
"""The maximum amount of time to wait for the request to be fulfilled."""

in_shifting_group: bool
"""Whether the proposal gets sent to the shifting group of the power manager."""

def __lt__(self, other: Proposal) -> bool:
"""Compare two proposals by their priority.

Expand Down Expand Up @@ -293,6 +299,21 @@ def calculate_target_power(
didn't change.
"""

@abc.abstractmethod
def get_target_power(
self,
component_ids: frozenset[int],
) -> Power | None:
"""Get the target power for the given components.

Args:
component_ids: The component IDs to get the target power for.

Returns:
The target power for the given components, or `None` if there is no target
power.
"""

# The arguments for this method are tightly coupled to the `Matryoshka` algorithm.
# It can be loosened up when more algorithms are added.
@abc.abstractmethod
Expand Down
15 changes: 15 additions & 0 deletions src/frequenz/sdk/actor/_power_managing/_matryoshka.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ def _validate_component_ids(
)
return True

def get_target_power(
self,
component_ids: frozenset[int],
) -> Power | None:
"""Get the target power for the given components.

Args:
component_ids: The component IDs to get the target power for.

Returns:
The target power for the given components, or `None` if there is no target
power.
"""
return self._target_power.get(component_ids)

@override
def calculate_target_power(
self,
Expand Down
177 changes: 160 additions & 17 deletions src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
from typing_extensions import override

from ...timeseries._base_types import SystemBounds
from ...timeseries import Power
from ...timeseries._base_types import Bounds, SystemBounds
from .._actor import Actor
from .._channel_registry import ChannelRegistry
from ._base_classes import Algorithm, BaseAlgorithm, Proposal, ReportRequest, _Report
Expand All @@ -28,7 +29,7 @@
from .. import power_distributing


class PowerManagingActor(Actor):
class PowerManagingActor(Actor): # pylint: disable=too-many-instance-attributes
"""The power manager."""

def __init__( # pylint: disable=too-many-arguments
Expand Down Expand Up @@ -84,10 +85,18 @@ def __init__( # pylint: disable=too-many-arguments

self._system_bounds: dict[frozenset[int], SystemBounds] = {}
self._bound_tracker_tasks: dict[frozenset[int], asyncio.Task[None]] = {}
self._subscriptions: dict[frozenset[int], dict[int, Sender[_Report]]] = {}
self._non_shifting_subscriptions: dict[
frozenset[int], dict[int, Sender[_Report]]
] = {}
self._shifting_subscriptions: dict[
frozenset[int], dict[int, Sender[_Report]]
] = {}
self._distribution_results: dict[frozenset[int], power_distributing.Result] = {}

self._algorithm: BaseAlgorithm = Matryoshka(
self._non_shifting_group: BaseAlgorithm = Matryoshka(
max_proposal_age=timedelta(seconds=60.0)
)
self._shifting_group: BaseAlgorithm = Matryoshka(
Comment on lines -87 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not going to insist a lot on this because I don't want to keep focusing on bikeshedding, but I though it out there just in case you agree it could bring some extra clarity. Back to my previous suggestion of grouping the groups, what about having something like:

class PoolGroup:
    algorithm: BaseAlgorithm
    subscriptions: dict[...]
    # Again, I'm not sure if there are common operations with
    # these algorithm and subscriptions that could be moved here too.

class PowerManagingActor(Actor):
    def __init__(self, ...):
        self._default_group = PoolGroup(...)
        self._shifting_group = PoolGroup(...)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up adding more than two groups, I think it makes sense to refactor at that time. For now, I'm not sure it is worth the effort, the code is not really that complicated the way it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created an issue just so it doesn't get lost, but I agree is OK as it is for now:

max_proposal_age=timedelta(seconds=60.0)
)

Expand All @@ -104,14 +113,29 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None:
if bounds is None:
_logger.warning("PowerManagingActor: No bounds for %s", component_ids)
return
for priority, sender in self._subscriptions.get(component_ids, {}).items():
status = self._algorithm.get_status(
for priority, sender in self._shifting_subscriptions.get(
component_ids, {}
).items():
status = self._shifting_group.get_status(
component_ids,
priority,
bounds,
self._distribution_results.get(component_ids),
)
await sender.send(status)
for priority, sender in self._non_shifting_subscriptions.get(
component_ids, {}
).items():
status = self._non_shifting_group.get_status(
component_ids,
priority,
self._calculate_shifted_bounds(
bounds,
self._shifting_group.get_target_power(component_ids),
),
self._distribution_results.get(component_ids),
)
await sender.send(status)

async def _bounds_tracker(
self,
Expand All @@ -130,7 +154,7 @@ async def _bounds_tracker(
await self._send_updated_target_power(component_ids, None)
await self._send_reports(component_ids)

def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None:
def _add_system_bounds_tracker(self, component_ids: frozenset[int]) -> None:
"""Add a bounds tracker.

Args:
Expand Down Expand Up @@ -184,6 +208,116 @@ def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None:
self._bounds_tracker(component_ids, bounds_receiver)
)

def _calculate_shifted_bounds(
self, bounds: SystemBounds, target_power: Power | None
) -> SystemBounds:
"""Calculate the shifted bounds corresponding to shifting group's target power.

Any value regular actors choose within these bounds can be shifted by the
shifting power and still remain within the actual system bounds.

| system bounds | shifting | shifted |
| | target power | bounds |
|---------------+--------------+------------|
| -100 to 100 | 70 | -170 to 30 |
| -100 to 100 | -50 | -50 to 150 |

Args:
bounds: The bounds to calculate the remaining bounds from.
target_power: The target power to apply.

Returns:
The remaining bounds.
"""
if target_power is None:
return bounds

inclusion_bounds: Bounds[Power] | None = None
if bounds.inclusion_bounds is not None:
inclusion_bounds = Bounds(
bounds.inclusion_bounds.lower - target_power,
bounds.inclusion_bounds.upper - target_power,
)
return SystemBounds(
timestamp=bounds.timestamp,
inclusion_bounds=inclusion_bounds,
exclusion_bounds=bounds.exclusion_bounds,
)

def _calculate_target_power(
self,
component_ids: frozenset[int],
proposal: Proposal | None,
must_send: bool = False,
) -> Power | None:
"""Calculate the target power for a set of components.

This is the power from the non-shifting group, shifted by the power from the
shifting group.

Args:
component_ids: The component IDs for which to calculate the target power.
proposal: The proposal to calculate the target power for.
must_send: If `True`, a new request will be sent to the PowerDistributor,
even if there's no change in power.

Returns:
The target power.
"""
tgt_power_shift: Power | None = None
tgt_power_no_shift: Power | None = None
if proposal is not None:
if proposal.in_shifting_group:
tgt_power_shift = self._shifting_group.calculate_target_power(
component_ids,
proposal,
self._system_bounds[component_ids],
must_send,
)
tgt_power_no_shift = self._non_shifting_group.calculate_target_power(
component_ids,
None,
self._calculate_shifted_bounds(
self._system_bounds[component_ids], tgt_power_shift
),
must_send,
)
else:
tgt_power_no_shift = self._non_shifting_group.calculate_target_power(
component_ids,
proposal,
self._system_bounds[component_ids],
must_send,
)
tgt_power_shift = self._shifting_group.calculate_target_power(
component_ids,
None,
self._calculate_shifted_bounds(
self._system_bounds[component_ids], tgt_power_no_shift
),
must_send,
)
else:
tgt_power_no_shift = self._non_shifting_group.calculate_target_power(
component_ids,
None,
self._system_bounds[component_ids],
must_send,
)
tgt_power_shift = self._shifting_group.calculate_target_power(
component_ids,
None,
self._calculate_shifted_bounds(
self._system_bounds[component_ids], tgt_power_no_shift
),
must_send,
)
if tgt_power_shift is not None and tgt_power_no_shift is not None:
return tgt_power_shift + tgt_power_no_shift
if tgt_power_shift is not None:
return tgt_power_shift
return tgt_power_no_shift

async def _send_updated_target_power(
self,
component_ids: frozenset[int],
Expand All @@ -192,10 +326,9 @@ async def _send_updated_target_power(
) -> None:
from .. import power_distributing # pylint: disable=import-outside-toplevel

target_power = self._algorithm.calculate_target_power(
target_power = self._calculate_target_power(
component_ids,
proposal,
self._system_bounds[component_ids],
must_send,
)
request_timeout = (
Expand Down Expand Up @@ -225,7 +358,7 @@ async def _run(self) -> None:
if selected_from(selected, self._proposals_receiver):
proposal = selected.message
if proposal.component_ids not in self._bound_tracker_tasks:
self._add_bounds_tracker(proposal.component_ids)
self._add_system_bounds_tracker(proposal.component_ids)

# TODO: must_send=True forces a new request to # pylint: disable=fixme
# be sent to the PowerDistributor, even if there's no change in power.
Expand All @@ -245,22 +378,29 @@ async def _run(self) -> None:
sub = selected.message
component_ids = sub.component_ids
priority = sub.priority
in_shifting_group = sub.in_shifting_group

subs_set = (
self._shifting_subscriptions
if in_shifting_group
else self._non_shifting_subscriptions
)

if component_ids not in self._subscriptions:
self._subscriptions[component_ids] = {
if component_ids not in subs_set:
subs_set[component_ids] = {
priority: self._channel_registry.get_or_create(
_Report, sub.get_channel_name()
).new_sender()
}
elif priority not in self._subscriptions[component_ids]:
self._subscriptions[component_ids][priority] = (
elif priority not in subs_set[component_ids]:
subs_set[component_ids][priority] = (
self._channel_registry.get_or_create(
_Report, sub.get_channel_name()
).new_sender()
)

if sub.component_ids not in self._bound_tracker_tasks:
self._add_bounds_tracker(sub.component_ids)
if component_ids not in self._bound_tracker_tasks:
self._add_system_bounds_tracker(component_ids)

elif selected_from(selected, self._power_distributing_results_receiver):
from .. import ( # pylint: disable=import-outside-toplevel
Expand All @@ -287,4 +427,7 @@ async def _run(self) -> None:
await self._send_reports(frozenset(result.request.component_ids))

elif selected_from(selected, drop_old_proposals_timer):
self._algorithm.drop_old_proposals(asyncio.get_event_loop().time())
self._non_shifting_group.drop_old_proposals(
asyncio.get_event_loop().time()
)
self._shifting_group.drop_old_proposals(asyncio.get_event_loop().time())
Loading