|
5 | 5 |
|
6 | 6 | import asyncio |
7 | 7 | from datetime import datetime, timedelta, timezone |
8 | | -from typing import Sequence, Tuple |
| 8 | +from typing import Iterator, Sequence, Tuple |
9 | 9 |
|
| 10 | +import async_solipsism |
10 | 11 | import numpy as np |
| 12 | +import pytest |
| 13 | +import time_machine |
11 | 14 | from frequenz.channels import Broadcast, Sender |
12 | 15 |
|
13 | 16 | from frequenz.sdk.timeseries import Sample |
14 | 17 | from frequenz.sdk.timeseries._moving_window import MovingWindow |
| 18 | +from frequenz.sdk.timeseries._resampling import ResamplerConfig |
| 19 | + |
| 20 | + |
| 21 | +# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file. |
| 22 | +@pytest.fixture() |
| 23 | +def event_loop() -> Iterator[async_solipsism.EventLoop]: |
| 24 | + """Replace the loop with one that doesn't interact with the outside world.""" |
| 25 | + loop = async_solipsism.EventLoop() |
| 26 | + yield loop |
| 27 | + loop.close() |
| 28 | + |
| 29 | + |
| 30 | +@pytest.fixture |
| 31 | +def fake_time() -> Iterator[time_machine.Coordinates]: |
| 32 | + """Replace real time with a time machine that doesn't automatically tick.""" |
| 33 | + with time_machine.travel(0, tick=False) as traveller: |
| 34 | + yield traveller |
15 | 35 |
|
16 | 36 |
|
17 | 37 | async def push_lm_data(sender: Sender[Sample], test_seq: Sequence[float]) -> None: |
@@ -75,3 +95,36 @@ async def test_access_window_by_ts_slice() -> None: |
75 | 95 | time_start = datetime(2023, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=3) |
76 | 96 | time_end = time_start + timedelta(seconds=2) |
77 | 97 | assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore |
| 98 | + |
| 99 | + |
| 100 | +# pylint: disable=redefined-outer-name |
| 101 | +async def test_resampling_window(fake_time: time_machine.Coordinates) -> None: |
| 102 | + """Test resampling in MovingWindow.""" |
| 103 | + channel = Broadcast[Sample]("net_power") |
| 104 | + sender = channel.new_sender() |
| 105 | + |
| 106 | + window_size = timedelta(seconds=16) |
| 107 | + input_sampling = timedelta(seconds=1) |
| 108 | + output_sampling = timedelta(seconds=2) |
| 109 | + resampler_config = ResamplerConfig( |
| 110 | + resampling_period_s=output_sampling.total_seconds() |
| 111 | + ) |
| 112 | + |
| 113 | + window = MovingWindow( |
| 114 | + size=window_size, |
| 115 | + resampled_data_recv=channel.new_receiver(), |
| 116 | + input_sampling_period=input_sampling, |
| 117 | + resampler_config=resampler_config, |
| 118 | + ) |
| 119 | + |
| 120 | + stream_values = [4.0, 8.0, 2.0, 6.0, 5.0] * 100 |
| 121 | + for value in stream_values: |
| 122 | + timestamp = datetime.now(tz=timezone.utc) |
| 123 | + sample = Sample(timestamp, float(value)) |
| 124 | + await sender.send(sample) |
| 125 | + await asyncio.sleep(0.1) |
| 126 | + fake_time.shift(0.1) |
| 127 | + |
| 128 | + assert len(window) == window_size / max(input_sampling, output_sampling) |
| 129 | + for value in window: # type: ignore |
| 130 | + assert 4.9 < value < 5.1 |
0 commit comments