diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 99055f03bc77..8b1359fce4ad 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -723,8 +723,12 @@ void RemoteQueryExecutor::processReadTaskRequest() if (!extension || !extension->task_iterator) throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized"); + if (!extension->replica_info) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized"); + ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); - auto response = (*extension->task_iterator)(); + + auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica); connections->sendReadTaskResponse(response); } diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 16f37ca75796..4a6d49c98375 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -30,7 +30,7 @@ class RemoteQueryExecutorReadContext; class ParallelReplicasReadingCoordinator; /// This is the same type as StorageS3Source::IteratorWrapper -using TaskIterator = std::function; +using TaskIterator = std::function; /// This class allows one to launch queries on remote replicas of one shard and get results class RemoteQueryExecutor diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 6b5e9f0e49ba..f2be48c8ae10 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -34,6 +34,7 @@ namespace Setting extern const SettingsBool async_query_sending_for_remote; extern const SettingsBool async_socket_for_remote; extern const SettingsBool skip_unavailable_shards; + extern const SettingsNonZeroUInt64 max_parallel_replicas; } namespace ErrorCodes @@ -59,15 +60,19 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); - createExtension(predicate); + auto max_replicas_to_use = static_cast(cluster->getShardsInfo().size()); + if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1) + max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value); + + createExtension(predicate, max_replicas_to_use); } -void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) +void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas) { if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context); + extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); } /// The code executes on initiator @@ -155,8 +160,6 @@ SinkToStoragePtr IStorageCluster::write( void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - createExtension(nullptr); - const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; @@ -164,29 +167,47 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const auto new_context = updateSettings(context->getSettingsRef()); const auto & current_settings = new_context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + + size_t replica_index = 0; + auto max_replicas_to_use = static_cast(cluster->getShardsInfo().size()); + if (current_settings[Setting::max_parallel_replicas] > 1) + max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); + + createExtension(nullptr, max_replicas_to_use); + for (const auto & shard_info : cluster->getShardsInfo()) { - auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY); - for (auto & try_result : try_results) - { - auto remote_query_executor = std::make_shared( - std::vector{try_result}, - queryToString(query_to_send), - getOutputHeader(), - new_context, - /*throttler=*/nullptr, - scalars, - Tables(), - processed_stage, - extension); - - remote_query_executor->setLogger(log); - pipes.emplace_back(std::make_shared( - remote_query_executor, - add_agg_info, - current_settings[Setting::async_socket_for_remote], - current_settings[Setting::async_query_sending_for_remote])); - } + /// We're taking all replicas as shards, + /// so each shard will have only one address to connect to. + auto try_results = shard_info.pool->getMany( + timeouts, + current_settings, + PoolMode::GET_ONE, + {}, + /*skip_unavailable_endpoints=*/true); + + if (try_results.empty()) + continue; + + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + + auto remote_query_executor = std::make_shared( + std::vector{try_results.front()}, + queryToString(query_to_send), + getOutputHeader(), + new_context, + /*throttler=*/nullptr, + scalars, + Tables(), + processed_stage, + RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}); + + remote_query_executor->setLogger(log); + pipes.emplace_back(std::make_shared( + remote_query_executor, + add_agg_info, + current_settings[Setting::async_socket_for_remote], + current_settings[Setting::async_query_sending_for_remote])); } auto pipe = Pipe::unitePipes(std::move(pipes)); diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2992c3bc2497..b43220cda171 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -40,7 +40,7 @@ class IStorageCluster : public IStorage ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } /// Query is needed for pruning by virtual columns (_file, _path) - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; + virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; @@ -127,7 +127,7 @@ class ReadFromCluster : public SourceStepWithFilter std::optional extension; - void createExtension(const ActionsDAG::Node * predicate); + void createExtension(const ActionsDAG::Node * predicate, const size_t number_of_replicas); ContextPtr updateSettings(const Settings & settings); }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 3a2bfe8a990b..5ce84223862d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -17,6 +17,8 @@ #include #include #include +#include + namespace DB { @@ -279,19 +281,19 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & local_context) const + const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, getVirtualsList(), nullptr, local_context->getFileProgressCallback()); - auto callback = std::make_shared>([iterator]() mutable -> String - { - auto object_info = iterator->next(0); - if (object_info) - return object_info->getPath(); - return ""; - }); + auto task_distributor = std::make_shared(iterator, number_of_replicas); + + auto callback = std::make_shared( + [task_distributor](size_t number_of_current_replica) mutable -> String { + return task_distributor->getNextTask(number_of_current_replica).value_or(""); + }); + return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 492ea02c505a..b2981a2431eb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -30,7 +30,7 @@ class StorageObjectStorageCluster : public IStorageCluster std::string getName() const override; RemoteQueryExecutor::Extension getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp new file mode 100644 index 000000000000..55ad2202ea94 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -0,0 +1,155 @@ +#include "StorageObjectStorageStableTaskDistributor.h" +#include +#include +#include + +namespace DB +{ + +StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + size_t number_of_replicas_) + : iterator(std::move(iterator_)) + , connection_to_files(number_of_replicas_) + , iterator_exhausted(false) +{ +} + +std::optional StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica) +{ + LOG_TRACE( + log, + "Received a new connection from replica {} looking for a file", + number_of_current_replica + ); + + // 1. Check pre-queued files first + if (auto file = getPreQueuedFile(number_of_current_replica)) + return file; + + // 2. Try to find a matching file from the iterator + if (auto file = getMatchingFileFromIterator(number_of_current_replica)) + return file; + + // 3. Process unprocessed files if iterator is exhausted + return getAnyUnprocessedFile(number_of_current_replica); +} + +size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) +{ + return ConsistentHashing(sipHash64(file_path), connection_to_files.size()); +} + +std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + auto & files = connection_to_files[number_of_current_replica]; + + while (!files.empty()) + { + String next_file = files.back(); + files.pop_back(); + + auto it = unprocessed_files.find(next_file); + if (it == unprocessed_files.end()) + continue; + + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Assigning pre-queued file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica) +{ + { + std::lock_guard lock(mutex); + if (iterator_exhausted) + return std::nullopt; + } + + while (true) + { + ObjectInfoPtr object_info; + + { + std::lock_guard lock(mutex); + object_info = iterator->next(0); + + if (!object_info) + { + iterator_exhausted = true; + break; + } + } + + String file_path; + + auto archive_object_info = std::dynamic_pointer_cast(object_info); + if (archive_object_info) + { + file_path = archive_object_info->getPathToArchive(); + } + else + { + file_path = object_info->getPath(); + } + + size_t file_replica_idx = getReplicaForFile(file_path); + if (file_replica_idx == number_of_current_replica) + { + LOG_TRACE( + log, + "Found file {} for replica {}", + file_path, + number_of_current_replica + ); + + return file_path; + } + + // Queue file for its assigned replica + { + std::lock_guard lock(mutex); + unprocessed_files.insert(file_path); + connection_to_files[file_replica_idx].push_back(file_path); + } + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + if (!unprocessed_files.empty()) + { + auto it = unprocessed_files.begin(); + String next_file = *it; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h new file mode 100644 index 000000000000..a2181c9c4103 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class StorageObjectStorageStableTaskDistributor +{ +public: + using IObjectIterator = StorageObjectStorageSource::IIterator; + using ObjectInfoPtr = StorageObjectStorage::ObjectInfoPtr; + + StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + size_t number_of_replicas_); + + std::optional getNextTask(size_t number_of_current_replica); + +private: + size_t getReplicaForFile(const String & file_path); + std::optional getPreQueuedFile(size_t number_of_current_replica); + std::optional getMatchingFileFromIterator(size_t number_of_current_replica); + std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + + std::shared_ptr iterator; + + std::vector> connection_to_files; + std::unordered_set unprocessed_files; + + std::mutex mutex; + bool iterator_exhausted = false; + + LoggerPtr log = getLogger("StorageClusterTaskDistributor"); +}; + +} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 7e0db58edb0d..917d8e98287e 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1126,7 +1126,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor predicate = filter->getOutputs().at(0); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, 1 /* number_of_replicas */); auto dst_cluster = getCluster(); diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index c01738067c40..a5c54a060f3f 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -74,10 +74,10 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); } -RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 9549f3a035c3..a19790219af4 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -27,7 +27,7 @@ class StorageFileCluster : public IStorageCluster const ConstraintsDescription & constraints_); std::string getName() const override { return "FileCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 0c90fa80cda3..5915e0c502f7 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5948,7 +5948,7 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con std::optional StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) { const auto & settings = local_context->getSettingsRef(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, 1 /* number_of_replicas */); /// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function auto src_cluster = src_storage_cluster->getCluster(local_context); diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 04fd5fb96750..9253f92ff1de 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -94,11 +94,11 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); } -RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 31bffa062104..98dd1d3ece12 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -30,7 +30,7 @@ class StorageURLCluster : public IStorageCluster const StorageURL::Configuration & configuration_); std::string getName() const override { return "URLCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;