Skip to content

Commit 15148db

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #797 from Altinity/feature/antalya-25.3/rendezvous_hashing
25.3 Antalya port of #709, #760 - Rendezvous hashing
1 parent 809953b commit 15148db

12 files changed

+78
-19
lines changed

src/Storages/IStorageCluster.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t
117117
if (extension)
118118
return;
119119

120-
extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
120+
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
121121
}
122122

123123
/// The code executes on initiator
@@ -196,7 +196,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
196196
if (current_settings[Setting::max_parallel_replicas] > 1)
197197
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);
198198

199-
createExtension(nullptr, max_replicas_to_use);
199+
createExtension(nullptr);
200200

201201
for (const auto & shard_info : cluster->getShardsInfo())
202202
{

src/Storages/IStorageCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ class IStorageCluster : public IStorage
3535

3636
ClusterPtr getCluster(ContextPtr context) const;
3737
/// Query is needed for pruning by virtual columns (_file, _path)
38-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
38+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
39+
const ActionsDAG::Node * predicate,
40+
const ContextPtr & context,
41+
ClusterPtr cluster) const = 0;
3942

4043
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4144

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include <Storages/extractTableFunctionFromSelectQuery.h>
1919
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
2020

21-
2221
namespace DB
2322
{
2423
namespace Setting
@@ -214,13 +213,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
214213
}
215214

216215
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
217-
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
216+
const ActionsDAG::Node * predicate,
217+
const ContextPtr & local_context,
218+
ClusterPtr cluster) const
218219
{
219220
auto iterator = StorageObjectStorageSource::createFileIterator(
220221
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
221222
local_context, predicate, {}, virtual_columns, hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);
222223

223-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
224+
std::vector<std::string> ids_of_hosts;
225+
for (const auto & shard : cluster->getShardsInfo())
226+
{
227+
if (shard.per_replica_pools.empty())
228+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
229+
for (const auto & replica : shard.per_replica_pools)
230+
{
231+
if (!replica)
232+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
233+
ids_of_hosts.push_back(replica->getAddress());
234+
}
235+
}
236+
237+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
224238

225239
auto callback = std::make_shared<TaskIterator>(
226240
[task_distributor](size_t number_of_current_replica) mutable -> String

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ class StorageObjectStorageCluster : public IStorageCluster
2525
std::string getName() const override;
2626

2727
RemoteQueryExecutor::Extension getTaskIteratorExtension(
28-
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
28+
const ActionsDAG::Node * predicate,
29+
const ContextPtr & context,
30+
ClusterPtr cluster) const override;
2931

3032
String getPathSample(ContextPtr context);
3133

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ namespace ErrorCodes
1313

1414
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1515
std::shared_ptr<IObjectIterator> iterator_,
16-
size_t number_of_replicas_)
16+
std::vector<std::string> ids_of_nodes_)
1717
: iterator(std::move(iterator_))
18-
, connection_to_files(number_of_replicas_)
18+
, connection_to_files(ids_of_nodes_.size())
19+
, ids_of_nodes(ids_of_nodes_)
1920
, iterator_exhausted(false)
2021
{
2122
}
@@ -45,13 +46,39 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
4546

4647
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
4748
{
48-
return ConsistentHashing(sipHash64(file_path), connection_to_files.size());
49+
size_t nodes_count = ids_of_nodes.size();
50+
51+
/// Trivial case
52+
if (nodes_count < 2)
53+
return 0;
54+
55+
/// Rendezvous hashing
56+
size_t best_id = 0;
57+
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
58+
for (size_t id = 1; id < nodes_count; ++id)
59+
{
60+
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
61+
if (weight > best_weight)
62+
{
63+
best_weight = weight;
64+
best_id = id;
65+
}
66+
}
67+
return best_id;
4968
}
5069

5170
std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
5271
{
5372
std::lock_guard lock(mutex);
5473

74+
if (connection_to_files.size() <= number_of_current_replica)
75+
throw Exception(
76+
ErrorCodes::LOGICAL_ERROR,
77+
"Replica number {} is out of range. Expected range: [0, {})",
78+
number_of_current_replica,
79+
connection_to_files.size()
80+
);
81+
5582
auto & files = connection_to_files[number_of_current_replica];
5683

5784
while (!files.empty())

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <Common/Logger.h>
55
#include <Interpreters/Cluster.h>
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
7+
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
78
#include <unordered_set>
89
#include <vector>
910
#include <mutex>
@@ -17,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor
1718
public:
1819
StorageObjectStorageStableTaskDistributor(
1920
std::shared_ptr<IObjectIterator> iterator_,
20-
size_t number_of_replicas_);
21+
std::vector<std::string> ids_of_nodes_);
2122

2223
std::optional<String> getNextTask(size_t number_of_current_replica);
2324

@@ -32,6 +33,8 @@ class StorageObjectStorageStableTaskDistributor
3233
std::vector<std::vector<String>> connection_to_files;
3334
std::unordered_set<String> unprocessed_files;
3435

36+
std::vector<std::string> ids_of_nodes;
37+
3538
std::mutex mutex;
3639
bool iterator_exhausted = false;
3740

src/Storages/StorageDistributed.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,8 +1317,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
13171317
const auto cluster = getCluster();
13181318

13191319
/// Select query is needed for pruining on virtual columns
1320-
auto number_of_replicas = static_cast<UInt64>(cluster->getShardsInfo().size());
1321-
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas);
1320+
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster);
13221321

13231322
/// Here we take addresses from destination cluster and assume source table exists on these nodes
13241323
size_t replica_index = 0;

src/Storages/StorageFileCluster.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
100100
);
101101
}
102102

103-
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const
103+
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
104+
const ActionsDAG::Node * predicate,
105+
const ContextPtr & context,
106+
ClusterPtr) const
104107
{
105108
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context);
106109
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });

src/Storages/StorageFileCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster
2727
const ConstraintsDescription & constraints_);
2828

2929
std::string getName() const override { return "FileCluster"; }
30-
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
30+
RemoteQueryExecutor::Extension getTaskIteratorExtension(
31+
const ActionsDAG::Node * predicate,
32+
const ContextPtr & context,
33+
ClusterPtr) const override;
3134

3235
private:
3336
void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6086,8 +6086,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
60866086
ContextMutablePtr query_context = Context::createCopy(local_context);
60876087
query_context->increaseDistributedDepth();
60886088

6089-
auto number_of_replicas = static_cast<UInt64>(src_cluster->getShardsAddresses().size());
6090-
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, number_of_replicas);
6089+
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster);
60916090

60926091
size_t replica_index = 0;
60936092
for (const auto & replicas : src_cluster->getShardsAddresses())

0 commit comments

Comments
 (0)