Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions redis_benchmarks_specification/__runner__/args.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import argparse

from redis_benchmarks_specification.__common__.env import (
SPECS_PATH_TEST_SUITES,
ALLOWED_PROFILERS,
DATASINK_RTS_AUTH,
DATASINK_RTS_HOST,
DATASINK_RTS_PORT,
DATASINK_RTS_AUTH,
DATASINK_RTS_USER,
DATASINK_RTS_PUSH,
DATASINK_RTS_USER,
MACHINE_NAME,
PROFILERS_ENABLED,
PROFILERS,
PROFILERS_DEFAULT,
ALLOWED_PROFILERS,
PROFILERS_ENABLED,
SPECS_PATH_TEST_SUITES,
)


Expand Down Expand Up @@ -80,8 +80,8 @@ def create_client_runner_args(project_name):
default=PROFILERS_ENABLED,
action="store_true",
help="Enable Identifying On-CPU and Off-CPU Time using perf/ebpf/vtune tooling. "
+ "By default the chosen profilers are {}".format(PROFILERS_DEFAULT)
+ "Full list of profilers: {}".format(ALLOWED_PROFILERS)
+ f"By default the chosen profilers are {PROFILERS_DEFAULT}"
+ f"Full list of profilers: {ALLOWED_PROFILERS}"
+ "Only available on x86 Linux platform and kernel version >= 4.9",
)

Expand Down Expand Up @@ -136,4 +136,9 @@ def create_client_runner_args(project_name):
default="",
help="Use specified CA certs bundle for TLS",
)
parser.add_argument(
"--resp",
default="2",
help="Set up RESP protocol version",
)
return parser
97 changes: 49 additions & 48 deletions redis_benchmarks_specification/__runner__/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,38 @@
import json
import logging
import os
import shutil
import sys
import tempfile
import traceback
from pathlib import Path
import shutil

import docker
import redis
from docker.models.containers import Container
from pytablewriter import CsvTableWriter, MarkdownTableWriter
from redisbench_admin.profilers.profilers_local import (
check_compatible_system_and_kernel_and_prepare_profile,
profilers_start_if_required,
local_profilers_platform_checks,
profilers_start_if_required,
profilers_stop_if_required,
)
import docker
import redis
from docker.models.containers import Container
from pytablewriter import MarkdownTableWriter
from pytablewriter import CsvTableWriter

from redisbench_admin.run.common import (
get_start_time_vars,
prepare_benchmark_parameters,
execute_init_commands,
get_start_time_vars,
merge_default_and_config_metrics,
prepare_benchmark_parameters,
)
from redisbench_admin.run.metrics import extract_results_table
from redisbench_admin.run.redistimeseries import timeseries_test_sucess_flow
from redisbench_admin.run.run import calculate_client_tool_duration_and_check
from redisbench_admin.utils.benchmark_config import (
get_final_benchmark_config,
)
from redisbench_admin.utils.benchmark_config import get_final_benchmark_config
from redisbench_admin.utils.local import get_local_run_full_filename
from redisbench_admin.utils.results import post_process_benchmark_results

from redis_benchmarks_specification.__common__.env import (
LOG_FORMAT,
LOG_DATEFMT,
LOG_FORMAT,
LOG_LEVEL,
REDIS_HEALTH_CHECK_INTERVAL,
REDIS_SOCKET_TIMEOUT,
Expand All @@ -49,8 +45,8 @@
)
from redis_benchmarks_specification.__common__.runner import extract_testsuites
from redis_benchmarks_specification.__common__.spec import (
extract_client_cpu_limit,
extract_client_container_image,
extract_client_cpu_limit,
extract_client_tool,
)
from redis_benchmarks_specification.__runner__.args import create_client_runner_args
Expand All @@ -59,7 +55,7 @@
def main():
_, _, project_version = populate_with_poetry_data()
project_name_suffix = "redis-benchmarks-spec-client-runner"
project_name = "{} (solely client)".format(project_name_suffix)
project_name = f"{project_name_suffix} (solely client)"
parser = create_client_runner_args(
get_version_string(project_name, project_version)
)
Expand All @@ -70,7 +66,7 @@ def main():

def run_client_runner_logic(args, project_name, project_name_suffix, project_version):
if args.logname is not None:
print("Writting log to {}".format(args.logname))
print(f"Writting log to {args.logname}")
logging.basicConfig(
filename=args.logname,
filemode="a",
Expand Down Expand Up @@ -114,14 +110,15 @@ def run_client_runner_logic(args, project_name, project_name_suffix, project_ver
args.datasink_redistimeseries_port,
)
)
logging.error("Error message {}".format(e.__str__()))
logging.error(f"Error message {e.__str__()}")
exit(1)
running_platform = args.platform_name
tls_enabled = args.tls
tls_skip_verify = args.tls_skip_verify
tls_cert = args.cert
tls_key = args.key
tls_cacert = args.cacert
resp_version = args.resp
client_aggregated_results_folder = args.client_aggregated_results_folder
preserve_temporary_client_dirs = args.preserve_temporary_client_dirs
docker_client = docker.from_env()
Expand Down Expand Up @@ -158,6 +155,7 @@ def run_client_runner_logic(args, project_name, project_name_suffix, project_ver
tls_cacert,
client_aggregated_results_folder,
preserve_temporary_client_dirs,
resp_version,
)


Expand All @@ -173,13 +171,14 @@ def prepare_memtier_benchmark_parameters(
tls_cert=None,
tls_key=None,
tls_cacert=None,
resp_version=None,
):
benchmark_command = [
full_benchmark_path,
"--port",
"{}".format(port),
f"{port}",
"--server",
"{}".format(server),
f"{server}",
"--json-out-file",
local_benchmark_output_filename,
]
Expand All @@ -194,6 +193,14 @@ def prepare_memtier_benchmark_parameters(
if tls_skip_verify:
benchmark_command.append("--tls-skip-verify")

if resp_version:
tool = clientconfig["tool"]
if tool == "memtier_benchmark":
benchmark_command.extend(["--resp", resp_version])
elif tool == "redis-benchmark":
if resp_version == "3":
benchmark_command.append("-3")

if oss_cluster_api_enabled is True:
benchmark_command.append("--cluster-mode")
benchmark_command_str = " ".join(benchmark_command)
Expand Down Expand Up @@ -222,6 +229,7 @@ def process_self_contained_coordinator_stream(
tls_cacert=None,
client_aggregated_results_folder="",
preserve_temporary_client_dirs=False,
resp_version=None,
):
overall_result = True
results_matrix = []
Expand All @@ -245,6 +253,7 @@ def process_self_contained_coordinator_stream(

for topology_spec_name in benchmark_config["redis-topologies"]:
test_result = False
benchmark_tool_global = ""
try:
current_cpu_pos = args.cpuset_start_pos
temporary_dir_client = tempfile.mkdtemp(dir=home)
Expand Down Expand Up @@ -280,7 +289,7 @@ def process_self_contained_coordinator_stream(
redis_pids.append(first_redis_pid)

setup_name = "oss-standalone"
github_actor = "{}-{}".format(tf_triggering_env, running_platform)
github_actor = f"{tf_triggering_env}-{running_platform}"
dso = "redis-server"
profilers_artifacts_matrix = []

Expand Down Expand Up @@ -344,17 +353,19 @@ def process_self_contained_coordinator_stream(
test_tls_cert,
test_tls_key,
test_tls_cacert,
resp_version,
)

execute_init_commands(
benchmark_config, r, dbconfig_keyname="dbconfig"
)

benchmark_tool = extract_client_tool(benchmark_config)
benchmark_tool_global = benchmark_tool
# backwards compatible
if benchmark_tool is None:
benchmark_tool = "redis-benchmark"
full_benchmark_path = "/usr/local/bin/{}".format(benchmark_tool)
full_benchmark_path = f"/usr/local/bin/{benchmark_tool}"

# setup the benchmark
(
Expand Down Expand Up @@ -404,6 +415,7 @@ def process_self_contained_coordinator_stream(
test_tls_cert,
test_tls_key,
test_tls_cacert,
resp_version,
)

client_container_image = extract_client_container_image(
Expand Down Expand Up @@ -491,9 +503,7 @@ def process_self_contained_coordinator_stream(
full_result_path = "{}/{}".format(
temporary_dir_client, local_benchmark_output_filename
)
logging.info(
"Reading results json from {}".format(full_result_path)
)
logging.info(f"Reading results json from {full_result_path}")

with open(
full_result_path,
Expand All @@ -518,9 +528,7 @@ def process_self_contained_coordinator_stream(

dataset_load_duration_seconds = 0

logging.info(
"Using datapoint_time_ms: {}".format(datapoint_time_ms)
)
logging.info(f"Using datapoint_time_ms: {datapoint_time_ms}")

timeseries_test_sucess_flow(
datasink_push_results_redistimeseries,
Expand Down Expand Up @@ -587,17 +595,15 @@ def process_self_contained_coordinator_stream(

if preserve_temporary_client_dirs is True:
logging.info(
"Preserving temporary client dir {}".format(
temporary_dir_client
)
f"Preserving temporary client dir {temporary_dir_client}"
)
else:
if "redis-benchmark" in benchmark_tool:
if "redis-benchmark" in benchmark_tool_global:
os.remove(full_result_path)
logging.info("Removing temporary JSON file")
shutil.rmtree(temporary_dir_client, ignore_errors=True)
logging.info(
"Removing temporary client dir {}".format(temporary_dir_client)
f"Removing temporary client dir {temporary_dir_client}"
)

table_name = "Results for entire test-suite"
Expand All @@ -615,13 +621,8 @@ def process_self_contained_coordinator_stream(

if client_aggregated_results_folder != "":
os.makedirs(client_aggregated_results_folder, exist_ok=True)
dest_fpath = "{}/{}".format(
client_aggregated_results_folder,
"aggregate-results.csv",
)
logging.info(
"Storing an aggregated results CSV into {}".format(full_result_path)
)
dest_fpath = f"{client_aggregated_results_folder}/aggregate-results.csv"
logging.info(f"Storing an aggregated results CSV into {full_result_path}")

csv_writer = CsvTableWriter(
table_name=table_name,
Expand All @@ -633,12 +634,10 @@ def process_self_contained_coordinator_stream(

def cp_to_workdir(benchmark_tool_workdir, srcfile):
head, filename = os.path.split(srcfile)
dstfile = "{}/{}".format(benchmark_tool_workdir, filename)
dstfile = f"{benchmark_tool_workdir}/{filename}"
shutil.copyfile(srcfile, dstfile)
logging.info(
"Copying to workdir the following file {}. Final workdir file {}".format(
srcfile, dstfile
)
f"Copying to workdir the following file {srcfile}. Final workdir file {dstfile}"
)
return dstfile, filename

Expand All @@ -657,14 +656,14 @@ def print_results_table_stdout(
default_metrics,
None,
)
table_name = "Results for {} test-case on {} topology".format(test_name, setup_name)
table_name = f"Results for {test_name} test-case on {setup_name} topology"
results_matrix_headers = [
"Metric JSON Path",
"Metric Value",
]
results_matrix = extract_results_table(metrics, results_dict)

results_matrix = [[x[0], "{:.3f}".format(x[3])] for x in results_matrix]
results_matrix = [[x[0], f"{x[3]:.3f}"] for x in results_matrix]
writer = MarkdownTableWriter(
table_name=table_name,
headers=results_matrix_headers,
Expand All @@ -684,7 +683,7 @@ def prepare_overall_total_test_results(
)
current_test_results_matrix = extract_results_table(metrics, results_dict)
current_test_results_matrix = [
[test_name, x[0], "{:.3f}".format(x[3])] for x in current_test_results_matrix
[test_name, x[0], f"{x[3]:.3f}"] for x in current_test_results_matrix
]
overall_results_matrix.extend(current_test_results_matrix)

Expand All @@ -704,6 +703,7 @@ def data_prepopulation_step(
tls_cert=None,
tls_key=None,
tls_cacert=None,
resp_version=None,
):
# setup the benchmark
(
Expand All @@ -721,7 +721,7 @@ def data_prepopulation_step(
benchmark_config["dbconfig"], "preload_tool"
)
preload_tool = extract_client_tool(benchmark_config["dbconfig"], "preload_tool")
full_benchmark_path = "/usr/local/bin/{}".format(preload_tool)
full_benchmark_path = f"/usr/local/bin/{preload_tool}"
client_mnt_point = "/mnt/client/"
if "memtier_benchmark" in preload_tool:
(_, preload_command_str,) = prepare_memtier_benchmark_parameters(
Expand All @@ -736,6 +736,7 @@ def data_prepopulation_step(
tls_cert,
tls_key,
tls_cacert,
resp_version,
)

logging.info(
Expand Down