Skip to content

Commit 21f3cbd

Browse files
authored
Merge pull request #709 from Altinity/feature/rendezvous-hashing-filesystem-cache
Rendezvous hashing filesystem cache
2 parents 493f4c7 + e4b51d1 commit 21f3cbd

19 files changed

+660
-49
lines changed

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,8 +746,12 @@ void RemoteQueryExecutor::processReadTaskRequest()
746746
if (!extension || !extension->task_iterator)
747747
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
748748

749+
if (!extension->replica_info)
750+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized");
751+
749752
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
750-
auto response = (*extension->task_iterator)();
753+
754+
auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica);
751755
connections->sendReadTaskResponse(response);
752756
}
753757

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext;
2828
class ParallelReplicasReadingCoordinator;
2929

3030
/// This is the same type as StorageS3Source::IteratorWrapper
31-
using TaskIterator = std::function<String()>;
31+
using TaskIterator = std::function<String(size_t)>;
3232

3333
/// This class allows one to launch queries on remote replicas of one shard and get results
3434
class RemoteQueryExecutor

src/Storages/IStorageCluster.cpp

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ namespace Setting
3434
extern const SettingsBool async_query_sending_for_remote;
3535
extern const SettingsBool async_socket_for_remote;
3636
extern const SettingsBool skip_unavailable_shards;
37+
extern const SettingsNonZeroUInt64 max_parallel_replicas;
3738
}
3839

3940
namespace ErrorCodes
@@ -67,7 +68,17 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
6768
if (extension)
6869
return;
6970

70-
extension = storage->getTaskIteratorExtension(predicate, context);
71+
std::vector<std::string> ids_of_hosts;
72+
for (const auto & shard : cluster->getShardsInfo())
73+
{
74+
if (shard.per_replica_pools.empty())
75+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
76+
if (!shard.per_replica_pools[0])
77+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
78+
ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress());
79+
}
80+
81+
extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts);
7182
}
7283

7384
/// The code executes on initiator
@@ -155,38 +166,51 @@ SinkToStoragePtr IStorageCluster::write(
155166

156167
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
157168
{
158-
createExtension(nullptr);
159-
160169
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
161170
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
162171

163172
Pipes pipes;
164173
auto new_context = updateSettings(context->getSettingsRef());
165174
const auto & current_settings = new_context->getSettingsRef();
166175
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
176+
177+
size_t replica_index = 0;
178+
179+
createExtension(nullptr);
180+
167181
for (const auto & shard_info : cluster->getShardsInfo())
168182
{
169-
auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY);
170-
for (auto & try_result : try_results)
171-
{
172-
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
173-
std::vector<IConnectionPool::Entry>{try_result},
174-
queryToString(query_to_send),
175-
getOutputHeader(),
176-
new_context,
177-
/*throttler=*/nullptr,
178-
scalars,
179-
Tables(),
180-
processed_stage,
181-
extension);
182-
183-
remote_query_executor->setLogger(log);
184-
pipes.emplace_back(std::make_shared<RemoteSource>(
185-
remote_query_executor,
186-
add_agg_info,
187-
current_settings[Setting::async_socket_for_remote],
188-
current_settings[Setting::async_query_sending_for_remote]));
189-
}
183+
/// We're taking all replicas as shards,
184+
/// so each shard will have only one address to connect to.
185+
auto try_results = shard_info.pool->getMany(
186+
timeouts,
187+
current_settings,
188+
PoolMode::GET_ONE,
189+
{},
190+
/*skip_unavailable_endpoints=*/true);
191+
192+
if (try_results.empty())
193+
continue;
194+
195+
IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
196+
197+
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
198+
std::vector<IConnectionPool::Entry>{try_results.front()},
199+
queryToString(query_to_send),
200+
getOutputHeader(),
201+
new_context,
202+
/*throttler=*/nullptr,
203+
scalars,
204+
Tables(),
205+
processed_stage,
206+
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});
207+
208+
remote_query_executor->setLogger(log);
209+
pipes.emplace_back(std::make_shared<RemoteSource>(
210+
remote_query_executor,
211+
add_agg_info,
212+
current_settings[Setting::async_socket_for_remote],
213+
current_settings[Setting::async_query_sending_for_remote]));
190214
}
191215

192216
auto pipe = Pipe::unitePipes(std::move(pipes));

src/Storages/IStorageCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ class IStorageCluster : public IStorage
4040

4141
ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
4242
/// Query is needed for pruning by virtual columns (_file, _path)
43-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
43+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
44+
const ActionsDAG::Node * predicate,
45+
const ContextPtr & context,
46+
std::optional<std::vector<std::string>> ids_of_hosts = std::nullopt) const = 0;
4447

4548
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4649

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include <Storages/ObjectStorage/Utils.h>
1818
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
1919
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
20+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
21+
2022

2123
namespace DB
2224
{
@@ -281,19 +283,21 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
281283
}
282284

283285
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
284-
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
286+
const ActionsDAG::Node * predicate,
287+
const ContextPtr & local_context,
288+
std::optional<std::vector<std::string>> ids_of_replicas) const
285289
{
286290
auto iterator = StorageObjectStorageSource::createFileIterator(
287291
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
288292
local_context, predicate, getVirtualsList(), nullptr, local_context->getFileProgressCallback());
289293

290-
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
291-
{
292-
auto object_info = iterator->next(0);
293-
if (object_info)
294-
return object_info->getPath();
295-
return "";
296-
});
294+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_replicas);
295+
296+
auto callback = std::make_shared<TaskIterator>(
297+
[task_distributor](size_t number_of_current_replica) mutable -> String {
298+
return task_distributor->getNextTask(number_of_current_replica).value_or("");
299+
});
300+
297301
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
298302
}
299303

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ class StorageObjectStorageCluster : public IStorageCluster
3030
std::string getName() const override;
3131

3232
RemoteQueryExecutor::Extension getTaskIteratorExtension(
33-
const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
33+
const ActionsDAG::Node * predicate,
34+
const ContextPtr & context,
35+
std::optional<std::vector<std::string>> ids_of_replicas) const override;
3436

3537
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
3638

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#include "StorageObjectStorageStableTaskDistributor.h"
2+
#include <Common/SipHash.h>
3+
#include <consistent_hashing.h>
4+
#include <optional>
5+
6+
namespace DB
7+
{
8+
9+
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
10+
std::shared_ptr<IObjectIterator> iterator_,
11+
std::optional<std::vector<std::string>> ids_of_nodes_)
12+
: iterator(std::move(iterator_))
13+
, connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1)
14+
, ids_of_nodes(ids_of_nodes_)
15+
, iterator_exhausted(false)
16+
{
17+
}
18+
19+
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
20+
{
21+
LOG_TRACE(
22+
log,
23+
"Received a new connection from replica {} looking for a file",
24+
number_of_current_replica
25+
);
26+
27+
// 1. Check pre-queued files first
28+
if (auto file = getPreQueuedFile(number_of_current_replica))
29+
return file;
30+
31+
// 2. Try to find a matching file from the iterator
32+
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
33+
return file;
34+
35+
// 3. Process unprocessed files if iterator is exhausted
36+
return getAnyUnprocessedFile(number_of_current_replica);
37+
}
38+
39+
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
40+
{
41+
if (!ids_of_nodes.has_value())
42+
throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer.");
43+
44+
const auto & ids_of_nodes_value = ids_of_nodes.value();
45+
size_t nodes_count = ids_of_nodes_value.size();
46+
47+
/// Trivial case
48+
if (nodes_count < 2)
49+
return 0;
50+
51+
/// Rendezvous hashing
52+
size_t best_id = 0;
53+
UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path);
54+
for (size_t id = 1; id < nodes_count; ++id)
55+
{
56+
UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path);
57+
if (weight > best_weight)
58+
{
59+
best_weight = weight;
60+
best_id = id;
61+
}
62+
}
63+
return best_id;
64+
}
65+
66+
std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
67+
{
68+
std::lock_guard lock(mutex);
69+
70+
auto & files = connection_to_files[number_of_current_replica];
71+
72+
while (!files.empty())
73+
{
74+
String next_file = files.back();
75+
files.pop_back();
76+
77+
auto it = unprocessed_files.find(next_file);
78+
if (it == unprocessed_files.end())
79+
continue;
80+
81+
unprocessed_files.erase(it);
82+
83+
LOG_TRACE(
84+
log,
85+
"Assigning pre-queued file {} to replica {}",
86+
next_file,
87+
number_of_current_replica
88+
);
89+
90+
return next_file;
91+
}
92+
93+
return std::nullopt;
94+
}
95+
96+
std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica)
97+
{
98+
{
99+
std::lock_guard lock(mutex);
100+
if (iterator_exhausted)
101+
return std::nullopt;
102+
}
103+
104+
while (true)
105+
{
106+
ObjectInfoPtr object_info;
107+
108+
{
109+
std::lock_guard lock(mutex);
110+
object_info = iterator->next(0);
111+
112+
if (!object_info)
113+
{
114+
iterator_exhausted = true;
115+
break;
116+
}
117+
}
118+
119+
String file_path;
120+
121+
auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
122+
if (archive_object_info)
123+
{
124+
file_path = archive_object_info->getPathToArchive();
125+
}
126+
else
127+
{
128+
file_path = object_info->getPath();
129+
}
130+
131+
size_t file_replica_idx = getReplicaForFile(file_path);
132+
if (file_replica_idx == number_of_current_replica)
133+
{
134+
LOG_TRACE(
135+
log,
136+
"Found file {} for replica {}",
137+
file_path,
138+
number_of_current_replica
139+
);
140+
141+
return file_path;
142+
}
143+
144+
// Queue file for its assigned replica
145+
{
146+
std::lock_guard lock(mutex);
147+
unprocessed_files.insert(file_path);
148+
connection_to_files[file_replica_idx].push_back(file_path);
149+
}
150+
}
151+
152+
return std::nullopt;
153+
}
154+
155+
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
156+
{
157+
std::lock_guard lock(mutex);
158+
159+
if (!unprocessed_files.empty())
160+
{
161+
auto it = unprocessed_files.begin();
162+
String next_file = *it;
163+
unprocessed_files.erase(it);
164+
165+
LOG_TRACE(
166+
log,
167+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
168+
next_file,
169+
number_of_current_replica
170+
);
171+
172+
return next_file;
173+
}
174+
175+
return std::nullopt;
176+
}
177+
178+
}

0 commit comments

Comments
 (0)