|
| 1 | +import csv |
| 2 | +import logging |
| 3 | +import os |
| 4 | +import shutil |
| 5 | +import uuid |
| 6 | + |
| 7 | +import pytest |
| 8 | + |
| 9 | +from helpers.cluster import ClickHouseCluster |
| 10 | + |
| 11 | +logging.getLogger().setLevel(logging.INFO) |
| 12 | +logging.getLogger().addHandler(logging.StreamHandler()) |
| 13 | + |
| 14 | +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) |
| 15 | + |
| 16 | + |
| 17 | +def create_buckets_s3(cluster): |
| 18 | + minio = cluster.minio_client |
| 19 | + |
| 20 | + s3_data = [] |
| 21 | + |
| 22 | + for file_number in range(1000): |
| 23 | + file_name = f"data/generated/file_{file_number}.csv" |
| 24 | + os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) |
| 25 | + s3_data.append(file_name) |
| 26 | + with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f: |
| 27 | + # a String, b UInt64 |
| 28 | + data = [] |
| 29 | + |
| 30 | + # Make all files a bit different |
| 31 | + data.append( |
| 32 | + ["str_" + str(file_number), file_number] |
| 33 | + ) |
| 34 | + |
| 35 | + writer = csv.writer(f) |
| 36 | + writer.writerows(data) |
| 37 | + |
| 38 | + for file in s3_data: |
| 39 | + minio.fput_object( |
| 40 | + bucket_name=cluster.minio_bucket, |
| 41 | + object_name=file, |
| 42 | + file_path=os.path.join(SCRIPT_DIR, file), |
| 43 | + ) |
| 44 | + |
| 45 | + for obj in minio.list_objects(cluster.minio_bucket, recursive=True): |
| 46 | + print(obj.object_name) |
| 47 | + |
| 48 | + |
| 49 | +@pytest.fixture(scope="module") |
| 50 | +def started_cluster(): |
| 51 | + try: |
| 52 | + cluster = ClickHouseCluster(__file__) |
| 53 | + # clickhouse0 not a member of cluster_XXX |
| 54 | + for i in range(6): |
| 55 | + cluster.add_instance( |
| 56 | + f"clickhouse{i}", |
| 57 | + main_configs=["configs/cluster.xml", "configs/named_collections.xml"], |
| 58 | + user_configs=["configs/users.xml"], |
| 59 | + macros={"replica": f"clickhouse{i}"}, |
| 60 | + with_minio=True, |
| 61 | + with_zookeeper=True, |
| 62 | + ) |
| 63 | + |
| 64 | + logging.info("Starting cluster...") |
| 65 | + cluster.start() |
| 66 | + logging.info("Cluster started") |
| 67 | + |
| 68 | + create_buckets_s3(cluster) |
| 69 | + |
| 70 | + yield cluster |
| 71 | + finally: |
| 72 | + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) |
| 73 | + cluster.shutdown() |
| 74 | + |
| 75 | + |
| 76 | +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): |
| 77 | + for host in list(cluster.instances.values()): |
| 78 | + host.query(f"SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'") |
| 79 | + |
| 80 | + query_id_first = str(uuid.uuid4()) |
| 81 | + result_first = node.query( |
| 82 | + f""" |
| 83 | + SELECT count(*) |
| 84 | + FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') |
| 85 | + WHERE b=42 |
| 86 | + SETTINGS |
| 87 | + enable_filesystem_cache={enable_filesystem_cache}, |
| 88 | + filesystem_cache_name='raw_s3_cache' |
| 89 | + """, |
| 90 | + query_id=query_id_first |
| 91 | + ) |
| 92 | + assert result_first == expected_result |
| 93 | + query_id_second = str(uuid.uuid4()) |
| 94 | + result_second = node.query( |
| 95 | + f""" |
| 96 | + SELECT count(*) |
| 97 | + FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') |
| 98 | + WHERE b=42 |
| 99 | + SETTINGS |
| 100 | + enable_filesystem_cache={enable_filesystem_cache}, |
| 101 | + filesystem_cache_name='raw_s3_cache' |
| 102 | + """, |
| 103 | + query_id=query_id_second |
| 104 | + ) |
| 105 | + assert result_second == expected_result |
| 106 | + |
| 107 | + node.query("SYSTEM FLUSH LOGS") |
| 108 | + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}") |
| 109 | + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}") |
| 110 | + |
| 111 | + s3_get_first = node.query( |
| 112 | + f""" |
| 113 | + SELECT sum(ProfileEvents['S3GetObject']) |
| 114 | + FROM clusterAllReplicas('{cluster_first}', system.query_log) |
| 115 | + WHERE type='QueryFinish' |
| 116 | + AND initial_query_id='{query_id_first}' |
| 117 | + """ |
| 118 | + ) |
| 119 | + s3_get_second = node.query( |
| 120 | + f""" |
| 121 | + SELECT sum(ProfileEvents['S3GetObject']) |
| 122 | + FROM clusterAllReplicas('{cluster_second}', system.query_log) |
| 123 | + WHERE type='QueryFinish' |
| 124 | + AND initial_query_id='{query_id_second}' |
| 125 | + """ |
| 126 | + ) |
| 127 | + |
| 128 | + return int(s3_get_first), int(s3_get_second) |
| 129 | + |
| 130 | + |
| 131 | +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): |
| 132 | + # Repeat test several times to get average result |
| 133 | + iterations = 10 |
| 134 | + s3_get_first_sum = 0 |
| 135 | + s3_get_second_sum = 0 |
| 136 | + for _ in range(iterations): |
| 137 | + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache) |
| 138 | + s3_get_first_sum += s3_get_first |
| 139 | + s3_get_second_sum += s3_get_second |
| 140 | + return s3_get_first_sum, s3_get_second_sum |
| 141 | + |
| 142 | + |
| 143 | +def test_cache_locality(started_cluster): |
| 144 | + node = started_cluster.instances["clickhouse0"] |
| 145 | + |
| 146 | + expected_result = node.query( |
| 147 | + """ |
| 148 | + SELECT count(*) |
| 149 | + FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') |
| 150 | + WHERE b=42 |
| 151 | + """ |
| 152 | + ) |
| 153 | + |
| 154 | + # Algorithm does not give 100% guarantee, so add 10% on dispersion |
| 155 | + dispersion = 0.1 |
| 156 | + |
| 157 | + # No cache |
| 158 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0) |
| 159 | + assert s3_get_second == s3_get_first |
| 160 | + |
| 161 | + # With cache |
| 162 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1) |
| 163 | + assert s3_get_second <= s3_get_first * dispersion |
| 164 | + |
| 165 | + # Different nodes order |
| 166 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1) |
| 167 | + assert s3_get_second <= s3_get_first * dispersion |
| 168 | + |
| 169 | + # No last node |
| 170 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1) |
| 171 | + assert s3_get_second <= s3_get_first * (0.2 + dispersion) |
| 172 | + |
| 173 | + # No first node |
| 174 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1) |
| 175 | + assert s3_get_second <= s3_get_first * (0.2 + dispersion) |
| 176 | + |
| 177 | + # No first node, different node order |
| 178 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1) |
| 179 | + assert s3_get_second <= s3_get_first * (0.2 + dispersion) |
| 180 | + |
| 181 | + # Add new node, different node order |
| 182 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1) |
| 183 | + assert s3_get_second <= s3_get_first * (0.25 + dispersion) |
| 184 | + |
| 185 | + # New node and old node, different node order |
| 186 | + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1) |
| 187 | + assert s3_get_second <= s3_get_first * (0.4375 + dispersion) |
0 commit comments