Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,8 +746,12 @@ void RemoteQueryExecutor::processReadTaskRequest()
if (!extension || !extension->task_iterator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");

if (!extension->replica_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized");

ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
auto response = (*extension->task_iterator)();

auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica);
connections->sendReadTaskResponse(response);
}

Expand Down
2 changes: 1 addition & 1 deletion src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext;
class ParallelReplicasReadingCoordinator;

/// This is the same type as StorageS3Source::IteratorWrapper
using TaskIterator = std::function<String()>;
using TaskIterator = std::function<String(size_t)>;

/// This class allows one to launch queries on remote replicas of one shard and get results
class RemoteQueryExecutor
Expand Down
72 changes: 48 additions & 24 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace Setting
extern const SettingsBool async_query_sending_for_remote;
extern const SettingsBool async_socket_for_remote;
extern const SettingsBool skip_unavailable_shards;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
}

namespace ErrorCodes
Expand Down Expand Up @@ -67,7 +68,17 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
if (extension)
return;

extension = storage->getTaskIteratorExtension(predicate, context);
std::vector<std::string> ids_of_hosts;
for (const auto & shard : cluster->getShardsInfo())
{
if (shard.per_replica_pools.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
if (!shard.per_replica_pools[0])
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress());
}

extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts);
}

/// The code executes on initiator
Expand Down Expand Up @@ -155,38 +166,51 @@ SinkToStoragePtr IStorageCluster::write(

void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createExtension(nullptr);

const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;

Pipes pipes;
auto new_context = updateSettings(context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);

size_t replica_index = 0;

createExtension(nullptr);

for (const auto & shard_info : cluster->getShardsInfo())
{
auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
getOutputHeader(),
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
extension);

remote_query_executor->setLogger(log);
pipes.emplace_back(std::make_shared<RemoteSource>(
remote_query_executor,
add_agg_info,
current_settings[Setting::async_socket_for_remote],
current_settings[Setting::async_query_sending_for_remote]));
}
/// We're taking all replicas as shards,
/// so each shard will have only one address to connect to.
auto try_results = shard_info.pool->getMany(
timeouts,
current_settings,
PoolMode::GET_ONE,
{},
/*skip_unavailable_endpoints=*/true);

if (try_results.empty())
continue;

IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };

auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_results.front()},
queryToString(query_to_send),
getOutputHeader(),
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});

remote_query_executor->setLogger(log);
pipes.emplace_back(std::make_shared<RemoteSource>(
remote_query_executor,
add_agg_info,
current_settings[Setting::async_socket_for_remote],
current_settings[Setting::async_query_sending_for_remote]));
}

auto pipe = Pipe::unitePipes(std::move(pipes));
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ class IStorageCluster : public IStorage

ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>> ids_of_hosts = std::nullopt) const = 0;

QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;

Expand Down
20 changes: 12 additions & 8 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>


namespace DB
{
Expand Down Expand Up @@ -281,19 +283,21 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
}

RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
const ActionsDAG::Node * predicate,
const ContextPtr & local_context,
std::optional<std::vector<std::string>> ids_of_replicas) const
{
auto iterator = StorageObjectStorageSource::createFileIterator(
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
local_context, predicate, getVirtualsList(), nullptr, local_context->getFileProgressCallback());

auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
{
auto object_info = iterator->next(0);
if (object_info)
return object_info->getPath();
return "";
});
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_replicas);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String {
return task_distributor->getNextTask(number_of_current_replica).value_or("");
});

return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}

Expand Down
4 changes: 3 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class StorageObjectStorageCluster : public IStorageCluster
std::string getName() const override;

RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
const ActionsDAG::Node * predicate,
const ContextPtr & context,
std::optional<std::vector<std::string>> ids_of_replicas) const override;

String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#include "StorageObjectStorageStableTaskDistributor.h"
#include <Common/SipHash.h>
#include <consistent_hashing.h>
#include <optional>

namespace DB
{

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::optional<std::vector<std::string>> ids_of_nodes_)
: iterator(std::move(iterator_))
, connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1)
, ids_of_nodes(ids_of_nodes_)
, iterator_exhausted(false)
{
}

std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
{
LOG_TRACE(
log,
"Received a new connection from replica {} looking for a file",
number_of_current_replica
);

// 1. Check pre-queued files first
if (auto file = getPreQueuedFile(number_of_current_replica))
return file;

// 2. Try to find a matching file from the iterator
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
return file;

// 3. Process unprocessed files if iterator is exhausted
return getAnyUnprocessedFile(number_of_current_replica);
}

size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
{
if (!ids_of_nodes.has_value())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case would it not have a value? And why is empty != nullopt?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It must always have value, in ReadFromCluster::createExtension it is called with ids_of_nodes always.

About nullopt - it is the same:

if (var)
if (var.has_value())
if (var != std::nullopt)

First and second are synonyms - https://en.cppreference.com/w/cpp/utility/optional/operator_bool
Third - because comparation with nullopt calls bool operator:

/usr/include/c++/11$ grep -B 5 -A 5 'operator==' optional
...
  // Comparisons with nullopt.
  template<typename _Tp>
    constexpr bool
    operator==(const optional<_Tp>& __lhs, nullopt_t) noexcept
    { return !__lhs; }
...
  template<typename _Tp>
    constexpr bool
    operator==(nullopt_t, const optional<_Tp>& __rhs) noexcept
    { return !__rhs; }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't explain my question correctly. I was asking why not just have a vector and check if it is empty instead of an optional of vector

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that instead of doing:

std::optional<std::vector<std::string>> ids_of_nodes;
...
    if (!ids_of_nodes.has_value())

you simply do:

std::vector<std::string> ids_of_nodes;
...
    if (ids_of_nodes.empty())

I mean, it is ok to keep it as optional, just thought it could be simpler

throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer.");

const auto & ids_of_nodes_value = ids_of_nodes.value();
size_t nodes_count = ids_of_nodes_value.size();

/// Trivial case
if (nodes_count < 2)
return 0;

/// Rendezvous hashing
size_t best_id = 0;
UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path);
for (size_t id = 1; id < nodes_count; ++id)
{
UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path);
if (weight > best_weight)
{
best_weight = weight;
best_id = id;
}
}
return best_id;
}

std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
{
std::lock_guard lock(mutex);

auto & files = connection_to_files[number_of_current_replica];

while (!files.empty())
{
String next_file = files.back();
files.pop_back();

auto it = unprocessed_files.find(next_file);
if (it == unprocessed_files.end())
continue;

unprocessed_files.erase(it);

LOG_TRACE(
log,
"Assigning pre-queued file {} to replica {}",
next_file,
number_of_current_replica
);

return next_file;
}

return std::nullopt;
}

std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica)
{
{
std::lock_guard lock(mutex);
if (iterator_exhausted)
return std::nullopt;
}

while (true)
{
ObjectInfoPtr object_info;

{
std::lock_guard lock(mutex);
object_info = iterator->next(0);

if (!object_info)
{
iterator_exhausted = true;
break;
}
}

String file_path;

auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
if (archive_object_info)
{
file_path = archive_object_info->getPathToArchive();
}
else
{
file_path = object_info->getPath();
}

size_t file_replica_idx = getReplicaForFile(file_path);
if (file_replica_idx == number_of_current_replica)
{
LOG_TRACE(
log,
"Found file {} for replica {}",
file_path,
number_of_current_replica
);

return file_path;
}

// Queue file for its assigned replica
{
std::lock_guard lock(mutex);
unprocessed_files.insert(file_path);
connection_to_files[file_replica_idx].push_back(file_path);
}
}

return std::nullopt;
}

std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
{
std::lock_guard lock(mutex);

if (!unprocessed_files.empty())
{
auto it = unprocessed_files.begin();
String next_file = *it;
unprocessed_files.erase(it);

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
next_file,
number_of_current_replica
);

return next_file;
}

return std::nullopt;
}

}
Loading
Loading