diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d8b82fdb9..8ddfa402d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,7 +12,7 @@ ## New Features - +- Calls to `microgrid.*_pool` methods now accept an optional `in_shifting_group` parameter. Power requests sent to `*_pool` instances that have the `in_shifting_group` flag set, will get resolved separately, and their target power will be added to the target power calculated from regular actors, if any, which would, in effect, shift the zero for the regular actors by the target power from the shifting group. ## Bug Fixes diff --git a/src/frequenz/sdk/actor/_power_managing/_base_classes.py b/src/frequenz/sdk/actor/_power_managing/_base_classes.py index 28377123f..b51929da0 100644 --- a/src/frequenz/sdk/actor/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/actor/_power_managing/_base_classes.py @@ -33,6 +33,9 @@ class ReportRequest: priority: int """The priority of the actor .""" + in_shifting_group: bool + """Whether the proposal gets sent to the shifting group of the power manager.""" + def get_channel_name(self) -> str: """Get the channel name for the report request. @@ -216,6 +219,9 @@ class Proposal: request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0) """The maximum amount of time to wait for the request to be fulfilled.""" + in_shifting_group: bool + """Whether the proposal gets sent to the shifting group of the power manager.""" + def __lt__(self, other: Proposal) -> bool: """Compare two proposals by their priority. @@ -293,6 +299,21 @@ def calculate_target_power( didn't change. """ + @abc.abstractmethod + def get_target_power( + self, + component_ids: frozenset[int], + ) -> Power | None: + """Get the target power for the given components. + + Args: + component_ids: The component IDs to get the target power for. + + Returns: + The target power for the given components, or `None` if there is no target + power. + """ + # The arguments for this method are tightly coupled to the `Matryoshka` algorithm. # It can be loosened up when more algorithms are added. @abc.abstractmethod diff --git a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py index 276c88b92..ceafe8946 100644 --- a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py +++ b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py @@ -152,6 +152,21 @@ def _validate_component_ids( ) return True + def get_target_power( + self, + component_ids: frozenset[int], + ) -> Power | None: + """Get the target power for the given components. + + Args: + component_ids: The component IDs to get the target power for. + + Returns: + The target power for the given components, or `None` if there is no target + power. + """ + return self._target_power.get(component_ids) + @override def calculate_target_power( self, diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 5117220b2..895360dec 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -16,7 +16,8 @@ from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType from typing_extensions import override -from ...timeseries._base_types import SystemBounds +from ...timeseries import Power +from ...timeseries._base_types import Bounds, SystemBounds from .._actor import Actor from .._channel_registry import ChannelRegistry from ._base_classes import Algorithm, BaseAlgorithm, Proposal, ReportRequest, _Report @@ -28,7 +29,7 @@ from .. import power_distributing -class PowerManagingActor(Actor): +class PowerManagingActor(Actor): # pylint: disable=too-many-instance-attributes """The power manager.""" def __init__( # pylint: disable=too-many-arguments @@ -84,10 +85,18 @@ def __init__( # pylint: disable=too-many-arguments self._system_bounds: dict[frozenset[int], SystemBounds] = {} self._bound_tracker_tasks: dict[frozenset[int], asyncio.Task[None]] = {} - self._subscriptions: dict[frozenset[int], dict[int, Sender[_Report]]] = {} + self._non_shifting_subscriptions: dict[ + frozenset[int], dict[int, Sender[_Report]] + ] = {} + self._shifting_subscriptions: dict[ + frozenset[int], dict[int, Sender[_Report]] + ] = {} self._distribution_results: dict[frozenset[int], power_distributing.Result] = {} - self._algorithm: BaseAlgorithm = Matryoshka( + self._non_shifting_group: BaseAlgorithm = Matryoshka( + max_proposal_age=timedelta(seconds=60.0) + ) + self._shifting_group: BaseAlgorithm = Matryoshka( max_proposal_age=timedelta(seconds=60.0) ) @@ -104,14 +113,29 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None: if bounds is None: _logger.warning("PowerManagingActor: No bounds for %s", component_ids) return - for priority, sender in self._subscriptions.get(component_ids, {}).items(): - status = self._algorithm.get_status( + for priority, sender in self._shifting_subscriptions.get( + component_ids, {} + ).items(): + status = self._shifting_group.get_status( component_ids, priority, bounds, self._distribution_results.get(component_ids), ) await sender.send(status) + for priority, sender in self._non_shifting_subscriptions.get( + component_ids, {} + ).items(): + status = self._non_shifting_group.get_status( + component_ids, + priority, + self._calculate_shifted_bounds( + bounds, + self._shifting_group.get_target_power(component_ids), + ), + self._distribution_results.get(component_ids), + ) + await sender.send(status) async def _bounds_tracker( self, @@ -130,7 +154,7 @@ async def _bounds_tracker( await self._send_updated_target_power(component_ids, None) await self._send_reports(component_ids) - def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None: + def _add_system_bounds_tracker(self, component_ids: frozenset[int]) -> None: """Add a bounds tracker. Args: @@ -184,6 +208,116 @@ def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None: self._bounds_tracker(component_ids, bounds_receiver) ) + def _calculate_shifted_bounds( + self, bounds: SystemBounds, target_power: Power | None + ) -> SystemBounds: + """Calculate the shifted bounds corresponding to shifting group's target power. + + Any value regular actors choose within these bounds can be shifted by the + shifting power and still remain within the actual system bounds. + + | system bounds | shifting | shifted | + | | target power | bounds | + |---------------+--------------+------------| + | -100 to 100 | 70 | -170 to 30 | + | -100 to 100 | -50 | -50 to 150 | + + Args: + bounds: The bounds to calculate the remaining bounds from. + target_power: The target power to apply. + + Returns: + The remaining bounds. + """ + if target_power is None: + return bounds + + inclusion_bounds: Bounds[Power] | None = None + if bounds.inclusion_bounds is not None: + inclusion_bounds = Bounds( + bounds.inclusion_bounds.lower - target_power, + bounds.inclusion_bounds.upper - target_power, + ) + return SystemBounds( + timestamp=bounds.timestamp, + inclusion_bounds=inclusion_bounds, + exclusion_bounds=bounds.exclusion_bounds, + ) + + def _calculate_target_power( + self, + component_ids: frozenset[int], + proposal: Proposal | None, + must_send: bool = False, + ) -> Power | None: + """Calculate the target power for a set of components. + + This is the power from the non-shifting group, shifted by the power from the + shifting group. + + Args: + component_ids: The component IDs for which to calculate the target power. + proposal: The proposal to calculate the target power for. + must_send: If `True`, a new request will be sent to the PowerDistributor, + even if there's no change in power. + + Returns: + The target power. + """ + tgt_power_shift: Power | None = None + tgt_power_no_shift: Power | None = None + if proposal is not None: + if proposal.in_shifting_group: + tgt_power_shift = self._shifting_group.calculate_target_power( + component_ids, + proposal, + self._system_bounds[component_ids], + must_send, + ) + tgt_power_no_shift = self._non_shifting_group.calculate_target_power( + component_ids, + None, + self._calculate_shifted_bounds( + self._system_bounds[component_ids], tgt_power_shift + ), + must_send, + ) + else: + tgt_power_no_shift = self._non_shifting_group.calculate_target_power( + component_ids, + proposal, + self._system_bounds[component_ids], + must_send, + ) + tgt_power_shift = self._shifting_group.calculate_target_power( + component_ids, + None, + self._calculate_shifted_bounds( + self._system_bounds[component_ids], tgt_power_no_shift + ), + must_send, + ) + else: + tgt_power_no_shift = self._non_shifting_group.calculate_target_power( + component_ids, + None, + self._system_bounds[component_ids], + must_send, + ) + tgt_power_shift = self._shifting_group.calculate_target_power( + component_ids, + None, + self._calculate_shifted_bounds( + self._system_bounds[component_ids], tgt_power_no_shift + ), + must_send, + ) + if tgt_power_shift is not None and tgt_power_no_shift is not None: + return tgt_power_shift + tgt_power_no_shift + if tgt_power_shift is not None: + return tgt_power_shift + return tgt_power_no_shift + async def _send_updated_target_power( self, component_ids: frozenset[int], @@ -192,10 +326,9 @@ async def _send_updated_target_power( ) -> None: from .. import power_distributing # pylint: disable=import-outside-toplevel - target_power = self._algorithm.calculate_target_power( + target_power = self._calculate_target_power( component_ids, proposal, - self._system_bounds[component_ids], must_send, ) request_timeout = ( @@ -225,7 +358,7 @@ async def _run(self) -> None: if selected_from(selected, self._proposals_receiver): proposal = selected.message if proposal.component_ids not in self._bound_tracker_tasks: - self._add_bounds_tracker(proposal.component_ids) + self._add_system_bounds_tracker(proposal.component_ids) # TODO: must_send=True forces a new request to # pylint: disable=fixme # be sent to the PowerDistributor, even if there's no change in power. @@ -245,22 +378,29 @@ async def _run(self) -> None: sub = selected.message component_ids = sub.component_ids priority = sub.priority + in_shifting_group = sub.in_shifting_group + + subs_set = ( + self._shifting_subscriptions + if in_shifting_group + else self._non_shifting_subscriptions + ) - if component_ids not in self._subscriptions: - self._subscriptions[component_ids] = { + if component_ids not in subs_set: + subs_set[component_ids] = { priority: self._channel_registry.get_or_create( _Report, sub.get_channel_name() ).new_sender() } - elif priority not in self._subscriptions[component_ids]: - self._subscriptions[component_ids][priority] = ( + elif priority not in subs_set[component_ids]: + subs_set[component_ids][priority] = ( self._channel_registry.get_or_create( _Report, sub.get_channel_name() ).new_sender() ) - if sub.component_ids not in self._bound_tracker_tasks: - self._add_bounds_tracker(sub.component_ids) + if component_ids not in self._bound_tracker_tasks: + self._add_system_bounds_tracker(component_ids) elif selected_from(selected, self._power_distributing_results_receiver): from .. import ( # pylint: disable=import-outside-toplevel @@ -287,4 +427,7 @@ async def _run(self) -> None: await self._send_reports(frozenset(result.request.component_ids)) elif selected_from(selected, drop_old_proposals_timer): - self._algorithm.drop_old_proposals(asyncio.get_event_loop().time()) + self._non_shifting_group.drop_old_proposals( + asyncio.get_event_loop().time() + ) + self._shifting_group.drop_old_proposals(asyncio.get_event_loop().time()) diff --git a/src/frequenz/sdk/microgrid/__init__.py b/src/frequenz/sdk/microgrid/__init__.py index 34b26064a..c23c476fc 100644 --- a/src/frequenz/sdk/microgrid/__init__.py +++ b/src/frequenz/sdk/microgrid/__init__.py @@ -137,6 +137,91 @@ [`propose_power`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.propose_power], which accepts values in the {{glossary("psc", "Passive Sign Convention")}} and supports only charging. + +# Component pools + +The SDK provides a unified interface for interacting with sets of Batteries, EV +chargers and PV arrays, through their corresponding `Pool`s. + +* [Battery pool][frequenz.sdk.microgrid.battery_pool] +* [EV charger pool][frequenz.sdk.microgrid.ev_charger_pool] +* [PV pool][frequenz.sdk.microgrid.pv_pool] + +All of them provide support for streaming aggregated data and for setting the +power values of the components. + +## Streaming component data + +All pools have a `power` property, which is a +[`FormulaEngine`][frequenz.sdk.timeseries.formula_engine.FormulaEngine] that can + +- provide a stream of resampled power values, which correspond to the sum of the +power measured from all the components in the pool together. + +- be composed with other power streams to for composite formulas. + +In addition, the battery pool has some additional properties that can be used as +streams for metrics specific to batteries: +[`soc`][frequenz.sdk.timeseries.battery_pool.BatteryPool.soc], +[`capacity`][frequenz.sdk.timeseries.battery_pool.BatteryPool.capacity] and +[`temperature`][frequenz.sdk.timeseries.battery_pool.BatteryPool.temperature]. + +## Setting power + +All pools provide a `propose_power` method for setting power for the pool. This +would then be distributed to the individual components in the pool, using an +algorithm that's suitable for the category of the components. For example, when +controlling batteries, power could be distributed based on the `SoC` of the +individual batteries, to keep the batteries in balance. + +### Resolving conflicting power proposals + +When there are multiple actors trying to control the same set of batteries, a +target power is calculated based on the priorities of the actors making the +requests. Actors need to specify their priorities as parameters when creating +the `*Pool` instances using the constructors mentioned above. + +The algorithm used for resolving power conflicts based on actor priority can be +found in the documentation for any of the +[`propose_power`][frequenz.sdk.timeseries.battery_pool.BatteryPool.propose_power] +methods. + +### Shifting the target power by an offset + +There are cases where the target power needs to be shifted by a certain amount, for +example, to make adjustments to the operating point. This can be done by designating +some actors to be part of the `shifting_group`. + +When creating a `*Pool` instance using the above-mentioned constructors, an optional +`in_shifting_group` parameter can be passed to specify that this actor is special, and +the target power of the regular actors will be shifted by the target power of all +shifting actors together. + +In a location with 2 regular actors and 1 shifting actor, here's how things +would play out: + +1. When only non-shifting actors have made proposals, the power bounds available + from the batteries are available to them exactly. + + | actor priority | in shifting group? | proposed power/bounds | available bounds | + |----------------|--------------------|-----------------------|------------------| + | 3 | No | 1000, -4000..2500 | -3000..3000 | + | 2 | No | 2500 | -3000..2500 | + | 1 | Yes | None | -3000..3000 | + + Power actually distributed to the batteries: 2500W + +2. When the shifting actor has made proposals, the bounds available to the + regular actors gets shifted, and the final power that actually gets + distributed to the batteries is also shifted. + + | actor priority | in shifting group? | proposed power/bounds | available bounds | + |----------------|--------------------|-----------------------|------------------| + | 3 | No | 1000, -4000..2500 | -2000..4000 | + | 2 | No | 2500 | -2000..2500 | + | 1 | Yes | -1000 | -3000..3000 | + + Power actually distributed to the batteries: 1500W """ # noqa: D205, D400 from ..actor import ResamplerConfig diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index 1077e805c..1b86f70fe 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -199,6 +199,7 @@ def ev_charger_pool( priority: int, component_ids: abc.Set[int] | None = None, name: str | None = None, + in_shifting_group: bool = False, ) -> EVChargerPool: """Return the corresponding EVChargerPool instance for the given ids. @@ -211,6 +212,8 @@ def ev_charger_pool( EVChargerPool. name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. Returns: An EVChargerPool instance. @@ -264,6 +267,7 @@ def ev_charger_pool( pool_ref_store=self._ev_charger_pool_reference_stores[ref_store_key], name=name, priority=priority, + in_shifting_group=in_shifting_group, ) def pv_pool( @@ -272,6 +276,7 @@ def pv_pool( priority: int, component_ids: abc.Set[int] | None = None, name: str | None = None, + in_shifting_group: bool = False, ) -> PVPool: """Return a new `PVPool` instance for the given ids. @@ -284,6 +289,8 @@ def pv_pool( `PVPool`. name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. Returns: A `PVPool` instance. @@ -334,6 +341,7 @@ def pv_pool( pool_ref_store=self._pv_pool_reference_stores[ref_store_key], name=name, priority=priority, + in_shifting_group=in_shifting_group, ) def battery_pool( @@ -342,6 +350,7 @@ def battery_pool( priority: int, component_ids: abc.Set[int] | None = None, name: str | None = None, + in_shifting_group: bool = False, ) -> BatteryPool: """Return a new `BatteryPool` instance for the given ids. @@ -354,6 +363,8 @@ def battery_pool( `BatteryPool`. name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. Returns: A `BatteryPool` instance. @@ -409,6 +420,7 @@ def battery_pool( pool_ref_store=self._battery_pool_reference_stores[ref_store_key], name=name, priority=priority, + in_shifting_group=in_shifting_group, ) def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]: @@ -519,6 +531,7 @@ def ev_charger_pool( priority: int, component_ids: abc.Set[int] | None = None, name: str | None = None, + in_shifting_group: bool = False, ) -> EVChargerPool: """Return a new `EVChargerPool` instance for the given parameters. @@ -544,12 +557,17 @@ def ev_charger_pool( component graph are used. name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. Returns: An `EVChargerPool` instance. """ return _get().ev_charger_pool( - priority=priority, component_ids=component_ids, name=name + priority=priority, + component_ids=component_ids, + name=name, + in_shifting_group=in_shifting_group, ) @@ -558,6 +576,7 @@ def battery_pool( priority: int, component_ids: abc.Set[int] | None = None, name: str | None = None, + in_shifting_group: bool = False, ) -> BatteryPool: """Return a new `BatteryPool` instance for the given parameters. @@ -583,12 +602,17 @@ def battery_pool( graph are used. name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. Returns: A `BatteryPool` instance. """ return _get().battery_pool( - priority=priority, component_ids=component_ids, name=name + priority=priority, + component_ids=component_ids, + name=name, + in_shifting_group=in_shifting_group, ) @@ -597,6 +621,7 @@ def pv_pool( priority: int, component_ids: abc.Set[int] | None = None, name: str | None = None, + in_shifting_group: bool = False, ) -> PVPool: """Return a new `PVPool` instance for the given parameters. @@ -622,11 +647,18 @@ def pv_pool( graph are used. name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. Returns: A `PVPool` instance. """ - return _get().pv_pool(priority=priority, component_ids=component_ids, name=name) + return _get().pv_pool( + priority=priority, + component_ids=component_ids, + name=name, + in_shifting_group=in_shifting_group, + ) def grid() -> Grid: diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 869b8323f..b7a83fce7 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -59,6 +59,7 @@ def __init__( pool_ref_store: BatteryPoolReferenceStore, name: str | None, priority: int, + in_shifting_group: bool, ): """Create a BatteryPool instance. @@ -72,11 +73,14 @@ def __init__( name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. priority: The priority of the actor using this wrapper. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. """ self._pool_ref_store = pool_ref_store unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._in_shifting_group = in_shifting_group async def propose_power( self, @@ -130,6 +134,7 @@ async def propose_power( priority=self._priority, creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, + in_shifting_group=self._in_shifting_group, ) ) @@ -176,6 +181,7 @@ async def propose_charge( priority=self._priority, creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, + in_shifting_group=self._in_shifting_group, ) ) @@ -224,6 +230,7 @@ async def propose_discharge( priority=self._priority, creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, + in_shifting_group=self._in_shifting_group, ) ) @@ -382,6 +389,7 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store._batteries, + in_shifting_group=self._in_shifting_group, ) self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = ( asyncio.create_task( diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 7ac29e792..2eefed030 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -45,6 +45,7 @@ def __init__( # pylint: disable=too-many-arguments pool_ref_store: EVChargerPoolReferenceStore, name: str | None, priority: int, + in_shifting_group: bool, ) -> None: """Create an `EVChargerPool` instance. @@ -58,11 +59,14 @@ def __init__( # pylint: disable=too-many-arguments name: An optional name used to identify this instance of the pool or a corresponding actor in the logs. priority: The priority of the actor using this wrapper. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. """ self._pool_ref_store = pool_ref_store unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._in_shifting_group = in_shifting_group async def propose_power( self, @@ -128,6 +132,7 @@ async def propose_power( priority=self._priority, creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, + in_shifting_group=self._in_shifting_group, ) ) @@ -210,6 +215,7 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + in_shifting_group=self._in_shifting_group, ) self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( asyncio.create_task( diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index 0093d1008..a09996b66 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -38,6 +38,7 @@ def __init__( # pylint: disable=too-many-arguments pool_ref_store: PVPoolReferenceStore, name: str | None, priority: int, + in_shifting_group: bool, ) -> None: """Initialize the instance. @@ -50,11 +51,14 @@ def __init__( # pylint: disable=too-many-arguments pool_ref_store: The reference store for the PV pool. name: The name of the PV pool. priority: The priority of the PV pool. + in_shifting_group: Whether the power requests get sent to the shifting group + in the PowerManager or not. """ self._pool_ref_store = pool_ref_store unique_id = uuid.uuid4() self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" self._priority = priority + self._in_shifting_group = in_shifting_group async def propose_power( self, @@ -117,6 +121,7 @@ async def propose_power( priority=self._priority, creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, + in_shifting_group=self._in_shifting_group, ) ) @@ -171,6 +176,7 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]: source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + in_shifting_group=self._in_shifting_group, ) self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( asyncio.create_task( diff --git a/tests/actor/_power_managing/test_matryoshka.py b/tests/actor/_power_managing/test_matryoshka.py index 0664e444b..595b3f81d 100644 --- a/tests/actor/_power_managing/test_matryoshka.py +++ b/tests/actor/_power_managing/test_matryoshka.py @@ -53,6 +53,7 @@ def tgt_power( # pylint: disable=too-many-arguments if creation_time is not None else asyncio.get_event_loop().time() ), + in_shifting_group=False, ), self._system_bounds, must_send, diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index 00feab33a..7896711c3 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -489,3 +489,171 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: result, power_distributing.Success ), ) + + async def test_case_5( # pylint: disable=too-many-statements + self, + mocks: Mocks, + mocker: MockerFixture, + ) -> None: + """Test case 5. + + - four battery pools with same batteries, but different priorities. + - two battery pools are in the shifting group, two are not. + - all batteries are working. + """ + set_power = typing.cast( + AsyncMock, microgrid.connection_manager.get().api_client.set_power + ) + + await self._patch_battery_pool_status(mocks, mocker) + await self._init_data_for_batteries(mocks) + await self._init_data_for_inverters(mocks) + + battery_pool_4 = microgrid.battery_pool(priority=4, in_shifting_group=True) + bounds_4_rx = battery_pool_4.power_status.new_receiver() + battery_pool_3 = microgrid.battery_pool(priority=3, in_shifting_group=True) + bounds_3_rx = battery_pool_3.power_status.new_receiver() + battery_pool_2 = microgrid.battery_pool(priority=2) + bounds_2_rx = battery_pool_2.power_status.new_receiver() + battery_pool_1 = microgrid.battery_pool(priority=1) + bounds_1_rx = battery_pool_1.power_status.new_receiver() + + self._assert_report( + await bounds_4_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_3_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_2_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_1_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + + # The target power of non-shifting battery pools should only be visible to other + # non-shifting battery pools, and vice-versa. + await battery_pool_2.propose_power( + Power.from_watts(200.0), + bounds=timeseries.Bounds( + Power.from_watts(-1000.0), Power.from_watts(1500.0) + ), + ) + self._assert_report( + await bounds_4_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_3_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_2_rx.receive(), power=200.0, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_1_rx.receive(), power=200.0, lower=-1000.0, upper=1500.0 + ) + + assert set_power.call_count == 4 + assert sorted(set_power.call_args_list) == [ + mocker.call(inv_id, 50.0) for inv_id in mocks.microgrid.battery_inverter_ids + ] + set_power.reset_mock() + + # Set a power to the second non-shifting battery pool. This should also have + # no effect on the shifting battery pools. + await battery_pool_1.propose_power( + Power.from_watts(720.0), + ) + self._assert_report( + await bounds_4_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_3_rx.receive(), power=None, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_2_rx.receive(), power=720.0, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_1_rx.receive(), power=720.0, lower=-1000.0, upper=1500.0 + ) + + for _ in range(5): + await bounds_1_rx.receive() + await bounds_2_rx.receive() + await bounds_3_rx.receive() + bounds = await bounds_4_rx.receive() + if bounds.distribution_result is None or not isinstance( + bounds.distribution_result, power_distributing.Success + ): + continue + if bounds.distribution_result.succeeded_power == Power.from_watts(720.0): + break + + assert set_power.call_count == 4 + assert sorted(set_power.call_args_list) == [ + mocker.call(inv_id, 720.0 / 4) + for inv_id in mocks.microgrid.battery_inverter_ids + ] + set_power.reset_mock() + + # Setting power to a shifting battery pool should shift the bounds seen by the + # non-shifting battery pools. It would also shift the final target power sent + # in the batteries. + await battery_pool_3.propose_power( + Power.from_watts(-1000.0), + ) + + self._assert_report( + await bounds_4_rx.receive(), power=-1000.0, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_3_rx.receive(), power=-1000.0, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_2_rx.receive(), power=720.0, lower=-3000.0, upper=5000.0 + ) + self._assert_report( + await bounds_1_rx.receive(), power=720.0, lower=-1000.0, upper=1500.0 + ) + + for _ in range(5): + await bounds_1_rx.receive() + await bounds_2_rx.receive() + await bounds_3_rx.receive() + bounds = await bounds_4_rx.receive() + if bounds.distribution_result is None or not isinstance( + bounds.distribution_result, power_distributing.Success + ): + continue + if bounds.distribution_result.succeeded_power == Power.from_watts(-280.0): + break + + assert set_power.call_count == 4 + assert sorted(set_power.call_args_list) == [ + mocker.call(inv_id, -280.0 / 4) + for inv_id in mocks.microgrid.battery_inverter_ids + ] + set_power.reset_mock() + + # Creating a new non-shifting battery pool that's higher priority than the + # shifting battery pools should still be shifted by the target power of the + # shifting battery pools. + battery_pool_5 = microgrid.battery_pool(priority=5) + bounds_5_rx = battery_pool_5.power_status.new_receiver() + + await battery_pool_5.propose_power(None) + + self._assert_report( + await bounds_5_rx.receive(), power=720.0, lower=-3000.0, upper=5000.0 + ) + self._assert_report( + await bounds_4_rx.receive(), power=-1000.0, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_3_rx.receive(), power=-1000.0, lower=-4000.0, upper=4000.0 + ) + self._assert_report( + await bounds_2_rx.receive(), power=720.0, lower=-3000.0, upper=5000.0 + ) + self._assert_report( + await bounds_1_rx.receive(), power=720.0, lower=-1000.0, upper=1500.0 + )