Skip to content

Commit 8ac2f18

Browse files
[dirty-commit] Add resampler to the MovingWindow
Signed-off-by: Daniel Zullo <[email protected]>
1 parent 0f58452 commit 8ac2f18

File tree

1 file changed

+47
-7
lines changed

1 file changed

+47
-7
lines changed

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
from typing import SupportsIndex, overload
1414

1515
import numpy as np
16-
from frequenz.channels import Receiver
16+
from frequenz.channels import Broadcast, Receiver, Sender
1717
from numpy.typing import ArrayLike
1818

1919
from .._internal.asyncio import cancel_and_await
2020
from . import Sample
21+
from ._resampling import Resampler, ResamplerConfig
2122
from ._ringbuffer import OrderedRingBuffer
2223

2324
log = logging.getLogger(__name__)
@@ -91,11 +92,12 @@ class MovingWindow:
9192
```
9293
"""
9394

94-
def __init__(
95+
def __init__( # pylint: disable=too-many-arguments
9596
self,
9697
size: timedelta,
9798
resampled_data_recv: Receiver[Sample],
9899
sampling_period: timedelta,
100+
resampling_period: timedelta | None = None,
99101
window_alignment: datetime = datetime(1, 1, 1),
100102
) -> None:
101103
"""
@@ -110,6 +112,7 @@ def __init__(
110112
resampled_data_recv: A receiver that delivers samples with a
111113
given sampling period.
112114
sampling_period: The sampling period.
115+
resampling_period: The resampling period.
113116
window_alignment: A datetime object that defines a point in time to which
114117
the window is aligned to modulo window size.
115118
(default is midnight 01.01.01)
@@ -125,20 +128,39 @@ def __init__(
125128
sampling_period <= size
126129
), "The sampling period should be equal to or lower than the window size."
127130

131+
sampling = sampling_period
132+
self._resampler: Resampler | None = None
133+
self._avg_sender: Sender[Sample] | None = None
134+
self._resampler_task: asyncio.Task[None] | None = None
135+
if resampling_period is not None:
136+
assert (
137+
resampling_period >= sampling_period
138+
), "Resampling must be equal to or higher than sampling, only down-sampling supported."
139+
140+
if resampling_period > sampling_period:
141+
self._resampler = Resampler(
142+
ResamplerConfig(
143+
resampling_period_s=resampling_period.total_seconds()
144+
)
145+
)
146+
sampling = resampling_period
147+
128148
# Sampling period might not fit perfectly into the window size.
129-
num_samples = math.ceil(size / sampling_period)
149+
num_samples = math.ceil(size.total_seconds() / sampling.total_seconds())
130150

131151
self._resampled_data_recv = resampled_data_recv
132152
self._buffer = OrderedRingBuffer(
133153
np.empty(shape=num_samples, dtype=float),
134-
sampling_period=sampling_period,
154+
sampling_period=sampling,
135155
time_index_alignment=window_alignment,
136156
)
137157

158+
if self._resampler:
159+
self._configure_resampler()
160+
138161
self._update_window_task: asyncio.Task[None] = asyncio.create_task(
139162
self._run_impl()
140163
)
141-
log.debug("Cancelling MovingWindow task: %s", __name__)
142164

143165
async def _run_impl(self) -> None:
144166
"""Awaits samples from the receiver and updates the underlying ringbuffer.
@@ -149,16 +171,34 @@ async def _run_impl(self) -> None:
149171
try:
150172
async for sample in self._resampled_data_recv:
151173
log.debug("Received new sample: %s", sample)
152-
self._buffer.update(sample)
174+
if self._resampler:
175+
await self._avg_sender.send(sample) # type: ignore
176+
else:
177+
self._buffer.update(sample)
178+
153179
except asyncio.CancelledError:
154180
log.info("MovingWindow task has been cancelled.")
155181
raise
156182

157183
log.error("Channel has been closed")
158184

159185
async def stop(self) -> None:
160-
"""Cancel the running task and stop the MovingWindow."""
186+
"""Cancel the running tasks and stop the MovingWindow."""
161187
await cancel_and_await(self._update_window_task)
188+
if self._resampler_task:
189+
await cancel_and_await(self._resampler_task)
190+
191+
def _configure_resampler(self) -> None:
192+
"""Configure the components needed to run the resampler."""
193+
assert isinstance(self._resampler, Resampler)
194+
195+
async def sink_buffer(sample: Sample) -> None:
196+
self._buffer.update(sample)
197+
198+
avg_channel = Broadcast[Sample]("average")
199+
self._avg_sender = avg_channel.new_sender()
200+
self._resampler.add_timeseries("avg", avg_channel.new_receiver(), sink_buffer)
201+
self._resampler_task = asyncio.create_task(self._resampler.resample())
162202

163203
def __len__(self) -> int:
164204
"""

0 commit comments

Comments
 (0)