From 8ecb32596e32ffee3a0bb21434ca5c5c6145ff90 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Mon, 2 Jun 2025 09:31:20 +0100 Subject: [PATCH 1/3] Handle timestamp collision while pushing data to redistimeseries - If a datapoint already exists in that millisecond, try to add it in the next --- .../__common__/timeseries.py | 62 +++++++++++-------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/redis_benchmarks_specification/__common__/timeseries.py b/redis_benchmarks_specification/__common__/timeseries.py index 5b14374..d6e79d9 100644 --- a/redis_benchmarks_specification/__common__/timeseries.py +++ b/redis_benchmarks_specification/__common__/timeseries.py @@ -495,39 +495,47 @@ def push_data_to_redistimeseries(rts, time_series_dict: dict, expire_msecs=0): ) for timeseries_name, time_series in time_series_dict.items(): exporter_create_ts(rts, time_series, timeseries_name) - for timestamp, value in time_series["data"].items(): - try: - if timestamp is None: - logging.warning("The provided timestamp is null. Using auto-ts") - rts.ts().add( - timeseries_name, - value, - duplicate_policy="last", - ) - else: + for orig_timestamp, value in time_series["data"].items(): + if orig_timestamp is None: + logging.warning("The provided timestamp is null. Using auto-ts") + timestamp = "*" + else: + timestamp = orig_timestamp + + try_to_insert = True + retry_count = 0 + while try_to_insert and retry_count < 100: + # (try to) insert the datapoint in given timestamp + try_to_insert = False + + try: rts.ts().add( timeseries_name, timestamp, value, - duplicate_policy="last", + duplicate_policy="block", ) - datapoint_inserts += 1 - except redis.exceptions.DataError: - logging.warning( - "Error while inserting datapoint ({} : {}) in timeseries named {}. ".format( - timestamp, value, timeseries_name + datapoint_inserts += 1 + except redis.exceptions.DataError: + logging.warning( + "Error while inserting datapoint ({} : {}) in timeseries named {}. ".format( + timestamp, value, timeseries_name + ) ) - ) - datapoint_errors += 1 - pass - except redis.exceptions.ResponseError: - logging.warning( - "Error while inserting datapoint ({} : {}) in timeseries named {}. ".format( - timestamp, value, timeseries_name - ) - ) - datapoint_errors += 1 - pass + datapoint_errors += 1 + except redis.exceptions.ResponseError as e: + if "DUPLICATE_POLICY" in e.__str__(): + # duplicate timestamp: try to insert again, but in the next milisecond + timestamp += 1 + try_to_insert = True + retry_count += 1 + else: + logging.warning( + "Error while inserting datapoint ({} : {}) in timeseries named {}. ".format( + timestamp, value, timeseries_name + ) + ) + datapoint_errors += 1 if expire_msecs > 0: rts.pexpire(timeseries_name, expire_msecs) progress.update() From 85a3ca3fa0c433b639b7ce90ec69a7daf77808a4 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Mon, 2 Jun 2025 14:37:59 +0100 Subject: [PATCH 2/3] Stop container before removing it to prevent Docker exeption --- .../__self_contained_coordinator__/self_contained_coordinator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py b/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py index dca71e5..bab0091 100644 --- a/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py +++ b/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py @@ -1296,6 +1296,7 @@ def process_self_contained_coordinator_stream( stdout=True, stderr=True ) ) + redis_container.stop() redis_container.remove() except docker.errors.NotFound: logging.info( From 73313d4763e71334d37aa8f475937677216fec7e Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Mon, 2 Jun 2025 14:39:26 +0100 Subject: [PATCH 3/3] Add test for duplicated timestamps handling --- ...test_self_contained_coordinator_memtier.py | 127 ++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/utils/tests/test_self_contained_coordinator_memtier.py b/utils/tests/test_self_contained_coordinator_memtier.py index f4e5b11..24dc6b3 100644 --- a/utils/tests/test_self_contained_coordinator_memtier.py +++ b/utils/tests/test_self_contained_coordinator_memtier.py @@ -5,6 +5,7 @@ import yaml from pathlib import Path import logging +import datetime from redisbench_admin.utils.benchmark_config import get_defaults from redisbench_admin.utils.remote import get_overall_dashboard_keynames @@ -1283,3 +1284,129 @@ def test_self_contained_coordinator_blocking_read_valkey(): except redis.exceptions.ConnectionError: pass + + +def test_self_contained_coordinator_duplicated_ts(): + try: + if run_coordinator_tests_dockerhub(): + db_port = int(os.getenv("DATASINK_PORT", "6379")) + conn = redis.StrictRedis(port=db_port) + conn.ping() + conn.flushall() + + id = "dockerhub" + redis_version = "7.4.0" + run_image = f"redis:{redis_version}" + build_arch = "amd64" + testDetails = {} + build_os = "test_build_os" + + # generate 2 stream requests with the same timestamp + timestamp = int(datetime.datetime.now().timestamp()) + for _ in range(0, 2): + build_stream_fields, result = generate_benchmark_stream_request( + id, + conn, + run_image, + build_arch, + testDetails, + build_os, + git_timestamp_ms=timestamp, + use_git_timestamp=True, + ) + build_stream_fields["mnt_point"] = "" + if result is True: + benchmark_stream_id = conn.xadd( + STREAM_KEYNAME_NEW_BUILD_EVENTS, build_stream_fields + ) + logging.info( + "sucessfully requested a new run {}. Stream id: {}".format( + build_stream_fields, benchmark_stream_id + ) + ) + + assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS) + assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) == 2 + + running_platform = "fco-ThinkPad-T490" + + # process the 2 stream requests + for _ in range(0, 2): + + build_runners_consumer_group_create(conn, running_platform, "0") + datasink_conn = redis.StrictRedis(port=db_port) + docker_client = docker.from_env() + home = str(Path.home()) + stream_id = ">" + topologies_map = get_topologies( + "./redis_benchmarks_specification/setups/topologies/topologies.yml" + ) + # we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores + # and we need 1 core for DB and another for CLIENT + testsuite_spec_files = [ + "./utils/tests/test_data/test-suites/test-memtier-dockerhub.yml" + ] + defaults_filename = "./utils/tests/test_data/test-suites/defaults.yml" + ( + _, + _, + default_metrics, + _, + _, + _, + ) = get_defaults(defaults_filename) + + ( + result, + stream_id, + number_processed_streams, + num_process_test_suites, + ) = self_contained_coordinator_blocking_read( + conn, + True, + docker_client, + home, + stream_id, + datasink_conn, + testsuite_spec_files, + topologies_map, + running_platform, + False, + [], + "", + 0, + 6399, + 1, + False, + 5, + default_metrics, + "amd64", + None, + 0, + 10000, + "unstable", + "", + True, + False, + ) + assert result == True + assert number_processed_streams == 1 + assert num_process_test_suites == 1 + + stat_key = f"ci.benchmarks.redislabs/by.version/ci/redis/redis/memtier_benchmark-1Mkeys-load-string-with-10B-values/dockerhub/{running_platform}/oss-standalone/{redis_version}/ALL_STATS.Totals.Ops/sec" + assert datasink_conn.exists(stat_key) + rts = datasink_conn.ts() + + rts_info = rts.info(stat_key) + + # we have two datapoints + assert rts_info.total_samples == 2 + + # first was inserted on the original timestamp + assert rts_info.first_timestamp == timestamp + + # the second has clashed, so it was resolved by adding 1ms to the timestamp + assert rts_info.last_timestamp == timestamp + 1 + + except redis.exceptions.ConnectionError: + pass