Skip to content

Commit 672bc2d

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #797 from Altinity/feature/antalya-25.3/rendezvous_hashing
25.3 Antalya port of #709, #760 - Rendezvous hashing
1 parent efd42b3 commit 672bc2d

12 files changed

+78
-19
lines changed

src/Storages/IStorageCluster.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t
117117
if (extension)
118118
return;
119119

120-
extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
120+
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
121121
}
122122

123123
/// The code executes on initiator
@@ -196,7 +196,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
196196
if (current_settings[Setting::max_parallel_replicas] > 1)
197197
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);
198198

199-
createExtension(nullptr, max_replicas_to_use);
199+
createExtension(nullptr);
200200

201201
for (const auto & shard_info : cluster->getShardsInfo())
202202
{

src/Storages/IStorageCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ class IStorageCluster : public IStorage
3535

3636
ClusterPtr getCluster(ContextPtr context) const;
3737
/// Query is needed for pruning by virtual columns (_file, _path)
38-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
38+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
39+
const ActionsDAG::Node * predicate,
40+
const ContextPtr & context,
41+
ClusterPtr cluster) const = 0;
3942

4043
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4144

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#include <Storages/extractTableFunctionFromSelectQuery.h>
1717
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
1818

19-
2019
namespace DB
2120
{
2221
namespace Setting
@@ -177,13 +176,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
177176
}
178177

179178
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
180-
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
179+
const ActionsDAG::Node * predicate,
180+
const ContextPtr & local_context,
181+
ClusterPtr cluster) const
181182
{
182183
auto iterator = StorageObjectStorageSource::createFileIterator(
183184
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
184185
local_context, predicate, {}, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);
185186

186-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
187+
std::vector<std::string> ids_of_hosts;
188+
for (const auto & shard : cluster->getShardsInfo())
189+
{
190+
if (shard.per_replica_pools.empty())
191+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
192+
for (const auto & replica : shard.per_replica_pools)
193+
{
194+
if (!replica)
195+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
196+
ids_of_hosts.push_back(replica->getAddress());
197+
}
198+
}
199+
200+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
187201

188202
auto callback = std::make_shared<TaskIterator>(
189203
[task_distributor](size_t number_of_current_replica) mutable -> String {

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ class StorageObjectStorageCluster : public IStorageCluster
2424
std::string getName() const override;
2525

2626
RemoteQueryExecutor::Extension getTaskIteratorExtension(
27-
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
27+
const ActionsDAG::Node * predicate,
28+
const ContextPtr & context,
29+
ClusterPtr cluster) const override;
2830

2931
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
3032

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ namespace DB
88

99
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1010
std::shared_ptr<IObjectIterator> iterator_,
11-
size_t number_of_replicas_)
11+
std::vector<std::string> ids_of_nodes_)
1212
: iterator(std::move(iterator_))
13-
, connection_to_files(number_of_replicas_)
13+
, connection_to_files(ids_of_nodes_.size())
14+
, ids_of_nodes(ids_of_nodes_)
1415
, iterator_exhausted(false)
1516
{
1617
}
@@ -37,13 +38,39 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
3738

3839
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
3940
{
40-
return ConsistentHashing(sipHash64(file_path), connection_to_files.size());
41+
size_t nodes_count = ids_of_nodes.size();
42+
43+
/// Trivial case
44+
if (nodes_count < 2)
45+
return 0;
46+
47+
/// Rendezvous hashing
48+
size_t best_id = 0;
49+
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
50+
for (size_t id = 1; id < nodes_count; ++id)
51+
{
52+
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
53+
if (weight > best_weight)
54+
{
55+
best_weight = weight;
56+
best_id = id;
57+
}
58+
}
59+
return best_id;
4160
}
4261

4362
std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
4463
{
4564
std::lock_guard lock(mutex);
4665

66+
if (connection_to_files.size() <= number_of_current_replica)
67+
throw Exception(
68+
ErrorCodes::LOGICAL_ERROR,
69+
"Replica number {} is out of range. Expected range: [0, {})",
70+
number_of_current_replica,
71+
connection_to_files.size()
72+
);
73+
4774
auto & files = connection_to_files[number_of_current_replica];
4875

4976
while (!files.empty())

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <Common/Logger.h>
55
#include <Interpreters/Cluster.h>
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
7+
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
78
#include <unordered_set>
89
#include <vector>
910
#include <mutex>
@@ -17,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor
1718
public:
1819
StorageObjectStorageStableTaskDistributor(
1920
std::shared_ptr<IObjectIterator> iterator_,
20-
size_t number_of_replicas_);
21+
std::vector<std::string> ids_of_nodes_);
2122

2223
std::optional<String> getNextTask(size_t number_of_current_replica);
2324

@@ -32,6 +33,8 @@ class StorageObjectStorageStableTaskDistributor
3233
std::vector<std::vector<String>> connection_to_files;
3334
std::unordered_set<String> unprocessed_files;
3435

36+
std::vector<std::string> ids_of_nodes;
37+
3538
std::mutex mutex;
3639
bool iterator_exhausted = false;
3740

src/Storages/StorageDistributed.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,8 +1300,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
13001300
const auto cluster = getCluster();
13011301

13021302
/// Select query is needed for pruining on virtual columns
1303-
auto number_of_replicas = static_cast<UInt64>(cluster->getShardsInfo().size());
1304-
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas);
1303+
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster);
13051304

13061305
/// Here we take addresses from destination cluster and assume source table exists on these nodes
13071306
size_t replica_index = 0;

src/Storages/StorageFileCluster.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
7777
);
7878
}
7979

80-
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const
80+
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
81+
const ActionsDAG::Node * predicate,
82+
const ContextPtr & context,
83+
ClusterPtr) const
8184
{
8285
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), context);
8386
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });

src/Storages/StorageFileCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster
2727
const ConstraintsDescription & constraints_);
2828

2929
std::string getName() const override { return "FileCluster"; }
30-
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
30+
RemoteQueryExecutor::Extension getTaskIteratorExtension(
31+
const ActionsDAG::Node * predicate,
32+
const ContextPtr & context,
33+
ClusterPtr) const override;
3134

3235
private:
3336
void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6089,8 +6089,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
60896089
ContextMutablePtr query_context = Context::createCopy(local_context);
60906090
query_context->increaseDistributedDepth();
60916091

6092-
auto number_of_replicas = static_cast<UInt64>(src_cluster->getShardsAddresses().size());
6093-
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, number_of_replicas);
6092+
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster);
60946093

60956094
size_t replica_index = 0;
60966095
for (const auto & replicas : src_cluster->getShardsAddresses())

0 commit comments

Comments
 (0)