diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index f965a63f4e10..259e53d5b1fb 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -69,17 +69,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - std::vector ids_of_hosts; - for (const auto & shard : cluster->getShardsInfo()) - { - if (shard.per_replica_pools.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); - if (!shard.per_replica_pools[0]) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); - ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress()); - } - - extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts); + extension = storage->getTaskIteratorExtension(predicate, context, cluster); } /// The code executes on initiator diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index f26381fa7699..7332cab5ce9f 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -44,7 +44,7 @@ class IStorageCluster : public IStorage virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_hosts = std::nullopt) const = 0; + ClusterPtr cluster) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 0e67d3c2f80e..6d44c348aae0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -340,13 +340,26 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & local_context, - std::optional> ids_of_replicas) const + ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, {}, getVirtualsList(), nullptr, local_context->getFileProgressCallback()); - auto task_distributor = std::make_shared(iterator, ids_of_replicas); + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) + { + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + for (const auto & replica : shard.per_replica_pools) + { + if (!replica) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(replica->getAddress()); + } + } + + auto task_distributor = std::make_shared(iterator, ids_of_hosts); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index f4f4d565d355..a8d6bd6f8629 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -32,7 +32,7 @@ class StorageObjectStorageCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_replicas) const override; + ClusterPtr cluster) const override; String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index d2127a7f45c4..d9ca7b344637 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -8,9 +8,9 @@ namespace DB StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::optional> ids_of_nodes_) + std::vector ids_of_nodes_) : iterator(std::move(iterator_)) - , connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1) + , connection_to_files(ids_of_nodes_.size()) , ids_of_nodes(ids_of_nodes_) , iterator_exhausted(false) { @@ -38,11 +38,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) { - if (!ids_of_nodes.has_value()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer."); - - const auto & ids_of_nodes_value = ids_of_nodes.value(); - size_t nodes_count = ids_of_nodes_value.size(); + size_t nodes_count = ids_of_nodes.size(); /// Trivial case if (nodes_count < 2) @@ -50,10 +46,10 @@ size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String /// Rendezvous hashing size_t best_id = 0; - UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path); + UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); for (size_t id = 1; id < nodes_count; ++id) { - UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path); + UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); if (weight > best_weight) { best_weight = weight; @@ -67,6 +63,14 @@ std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFil { std::lock_guard lock(mutex); + if (connection_to_files.size() <= number_of_current_replica) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} is out of range. Expected range: [0, {})", + number_of_current_replica, + connection_to_files.size() + ); + auto & files = connection_to_files[number_of_current_replica]; while (!files.empty()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index a87884885a45..678ff4372f5f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -18,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor public: StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::optional> ids_of_nodes_); + std::vector ids_of_nodes_); std::optional getNextTask(size_t number_of_current_replica); @@ -33,7 +33,7 @@ class StorageObjectStorageStableTaskDistributor std::vector> connection_to_files; std::unordered_set unprocessed_files; - std::optional> ids_of_nodes; + std::vector ids_of_nodes; std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index e3b7d6e9d3b9..3da44b877997 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1160,7 +1160,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index c4a2ca91ba23..146fc15d6389 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -77,7 +77,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional>) const + ClusterPtr) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 3329223739ae..49d39a24ceba 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -30,7 +30,7 @@ class StorageFileCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_nodes) const override; + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d4897f2532fc..f700db3093b8 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6007,7 +6007,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 56e5cd3b26a1..eec1db3b6966 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -96,7 +96,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional>) const + ClusterPtr) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index d09d2a36bd7e..9bfbaffe30f8 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -33,7 +33,7 @@ class StorageURLCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context, - std::optional> ids_of_replicas) const override; + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;