Skip to content

Commit 95a407b

Browse files
committed
Add test for cache locality
1 parent 420b584 commit 95a407b

File tree

5 files changed

+332
-0
lines changed

5 files changed

+332
-0
lines changed

tests/integration/test_s3_cache_locality/__init__.py

Whitespace-only changes.
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
2+
<clickhouse>
3+
<remote_servers>
4+
5+
<cluster_12345>
6+
<shard>
7+
<replica>
8+
<host>clickhouse1</host>
9+
<port>9000</port>
10+
</replica>
11+
<replica>
12+
<host>clickhouse2</host>
13+
<port>9000</port>
14+
</replica>
15+
<replica>
16+
<host>clickhouse3</host>
17+
<port>9000</port>
18+
</replica>
19+
<replica>
20+
<host>clickhouse4</host>
21+
<port>9000</port>
22+
</replica>
23+
<replica>
24+
<host>clickhouse5</host>
25+
<port>9000</port>
26+
</replica>
27+
</shard>
28+
</cluster_12345>
29+
30+
<cluster_1234>
31+
<shard>
32+
<replica>
33+
<host>clickhouse1</host>
34+
<port>9000</port>
35+
</replica>
36+
<replica>
37+
<host>clickhouse2</host>
38+
<port>9000</port>
39+
</replica>
40+
<replica>
41+
<host>clickhouse3</host>
42+
<port>9000</port>
43+
</replica>
44+
<replica>
45+
<host>clickhouse4</host>
46+
<port>9000</port>
47+
</replica>
48+
</shard>
49+
</cluster_1234>
50+
51+
<cluster_2345>
52+
<shard>
53+
<replica>
54+
<host>clickhouse2</host>
55+
<port>9000</port>
56+
</replica>
57+
<replica>
58+
<host>clickhouse3</host>
59+
<port>9000</port>
60+
</replica>
61+
<replica>
62+
<host>clickhouse4</host>
63+
<port>9000</port>
64+
</replica>
65+
<replica>
66+
<host>clickhouse5</host>
67+
<port>9000</port>
68+
</replica>
69+
</shard>
70+
</cluster_2345>
71+
72+
<cluster_34512>
73+
<shard>
74+
<replica>
75+
<host>clickhouse3</host>
76+
<port>9000</port>
77+
</replica>
78+
<replica>
79+
<host>clickhouse4</host>
80+
<port>9000</port>
81+
</replica>
82+
<replica>
83+
<host>clickhouse5</host>
84+
<port>9000</port>
85+
</replica>
86+
<replica>
87+
<host>clickhouse1</host>
88+
<port>9000</port>
89+
</replica>
90+
<replica>
91+
<host>clickhouse2</host>
92+
<port>9000</port>
93+
</replica>
94+
</shard>
95+
</cluster_34512>
96+
97+
<cluster_4523>
98+
<shard>
99+
<replica>
100+
<host>clickhouse4</host>
101+
<port>9000</port>
102+
</replica>
103+
<replica>
104+
<host>clickhouse5</host>
105+
<port>9000</port>
106+
</replica>
107+
<replica>
108+
<host>clickhouse2</host>
109+
<port>9000</port>
110+
</replica>
111+
<replica>
112+
<host>clickhouse3</host>
113+
<port>9000</port>
114+
</replica>
115+
</shard>
116+
</cluster_4523>
117+
118+
</remote_servers>
119+
120+
<filesystem_caches>
121+
<raw_s3_cache>
122+
<path>/var/lib/clickhouse/raw_s3_cache</path>
123+
<max_size>10Gi</max_size>
124+
</raw_s3_cache>
125+
</filesystem_caches>
126+
</clickhouse>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<clickhouse>
2+
<named_collections>
3+
<test_s3>
4+
<url>http://minio1:9001/root/data/*</url>
5+
<access_key_id>minio</access_key_id>
6+
<secret_access_key>minio123</secret_access_key>
7+
<format>CSV</format>>
8+
</test_s3>
9+
</named_collections>
10+
</clickhouse>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<clickhouse>
2+
<users>
3+
<default>
4+
<password></password>
5+
<profile>default</profile>
6+
<named_collection_control>1</named_collection_control>
7+
</default>
8+
</users>
9+
</clickhouse>
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import csv
2+
import logging
3+
import os
4+
import shutil
5+
import uuid
6+
7+
import pytest
8+
9+
from helpers.cluster import ClickHouseCluster
10+
11+
logging.getLogger().setLevel(logging.INFO)
12+
logging.getLogger().addHandler(logging.StreamHandler())
13+
14+
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
15+
16+
17+
def create_buckets_s3(cluster):
18+
minio = cluster.minio_client
19+
20+
s3_data = []
21+
22+
for file_number in range(1000):
23+
file_name = f"data/generated/file_{file_number}.csv"
24+
os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True)
25+
s3_data.append(file_name)
26+
with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f:
27+
# a String, b UInt64
28+
data = []
29+
30+
# Make all files a bit different
31+
data.append(
32+
["str_" + str(file_number), file_number]
33+
)
34+
35+
writer = csv.writer(f)
36+
writer.writerows(data)
37+
38+
for file in s3_data:
39+
minio.fput_object(
40+
bucket_name=cluster.minio_bucket,
41+
object_name=file,
42+
file_path=os.path.join(SCRIPT_DIR, file),
43+
)
44+
45+
for obj in minio.list_objects(cluster.minio_bucket, recursive=True):
46+
print(obj.object_name)
47+
48+
49+
@pytest.fixture(scope="module")
50+
def started_cluster():
51+
try:
52+
cluster = ClickHouseCluster(__file__)
53+
# clickhouse0 not a member of cluster_XXX
54+
for i in range(6):
55+
cluster.add_instance(
56+
f"clickhouse{i}",
57+
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
58+
user_configs=["configs/users.xml"],
59+
macros={"replica": f"clickhouse{i}"},
60+
with_minio=True,
61+
with_zookeeper=True,
62+
)
63+
64+
logging.info("Starting cluster...")
65+
cluster.start()
66+
logging.info("Cluster started")
67+
68+
create_buckets_s3(cluster)
69+
70+
yield cluster
71+
finally:
72+
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"))
73+
cluster.shutdown()
74+
75+
76+
def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache):
77+
for host in list(cluster.instances.values()):
78+
host.query(f"SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'")
79+
80+
query_id_first = str(uuid.uuid4())
81+
result_first = node.query(
82+
f"""
83+
SELECT count(*)
84+
FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64')
85+
WHERE b=42
86+
SETTINGS
87+
enable_filesystem_cache={enable_filesystem_cache},
88+
filesystem_cache_name='raw_s3_cache'
89+
""",
90+
query_id=query_id_first
91+
)
92+
assert result_first == expected_result
93+
query_id_second = str(uuid.uuid4())
94+
result_second = node.query(
95+
f"""
96+
SELECT count(*)
97+
FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64')
98+
WHERE b=42
99+
SETTINGS
100+
enable_filesystem_cache={enable_filesystem_cache},
101+
filesystem_cache_name='raw_s3_cache'
102+
""",
103+
query_id=query_id_second
104+
)
105+
assert result_second == expected_result
106+
107+
node.query("SYSTEM FLUSH LOGS")
108+
node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}")
109+
node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}")
110+
111+
s3_get_first = node.query(
112+
f"""
113+
SELECT sum(ProfileEvents['S3GetObject'])
114+
FROM clusterAllReplicas('{cluster_first}', system.query_log)
115+
WHERE type='QueryFinish'
116+
AND initial_query_id='{query_id_first}'
117+
"""
118+
)
119+
s3_get_second = node.query(
120+
f"""
121+
SELECT sum(ProfileEvents['S3GetObject'])
122+
FROM clusterAllReplicas('{cluster_second}', system.query_log)
123+
WHERE type='QueryFinish'
124+
AND initial_query_id='{query_id_second}'
125+
"""
126+
)
127+
128+
return int(s3_get_first), int(s3_get_second)
129+
130+
131+
def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache):
132+
# Repeat test several times to get average result
133+
iterations = 10
134+
s3_get_first_sum = 0
135+
s3_get_second_sum = 0
136+
for _ in range(iterations):
137+
(s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache)
138+
s3_get_first_sum += s3_get_first
139+
s3_get_second_sum += s3_get_second
140+
return s3_get_first_sum, s3_get_second_sum
141+
142+
143+
def test_cache_locality(started_cluster):
144+
node = started_cluster.instances["clickhouse0"]
145+
146+
expected_result = node.query(
147+
"""
148+
SELECT count(*)
149+
FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64')
150+
WHERE b=42
151+
"""
152+
)
153+
154+
# Algorytm does not give 100% guarantee, so add 10% on dispersion
155+
dispersion = 0.1
156+
157+
# No cache
158+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0)
159+
assert s3_get_second == s3_get_first
160+
161+
# With cache
162+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1)
163+
assert s3_get_second <= s3_get_first * dispersion
164+
165+
# Different nodes order
166+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1)
167+
assert s3_get_second <= s3_get_first * dispersion
168+
169+
# No last node
170+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1)
171+
assert s3_get_second <= s3_get_first * (0.2 + dispersion)
172+
173+
# No first node
174+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1)
175+
assert s3_get_second <= s3_get_first * (0.2 + dispersion)
176+
177+
# No first node, different node order
178+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1)
179+
assert s3_get_second <= s3_get_first * (0.2 + dispersion)
180+
181+
# Add new node, different node order
182+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1)
183+
assert s3_get_second <= s3_get_first * (0.25 + dispersion)
184+
185+
# New node and old node, different node order
186+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1)
187+
assert s3_get_second <= s3_get_first * (0.4375 + dispersion)

0 commit comments

Comments
 (0)