Skip to content
Merged
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6869,6 +6869,9 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
)", 0) \
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
)", EXPERIMENTAL) \
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
Force specified kind of Exchange operators between distributed query stages.

Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
});
Expand Down
36 changes: 36 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include <Common/Exception.h>
#include <Common/ObjectStorageKeyGenerator.h>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSONException.h>


namespace DB
{
Expand Down Expand Up @@ -97,4 +101,36 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
return write_settings;
}

RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
{
Poco::JSON::Parser parser;
try
{
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
if (!json)
return;

successfully_parsed = true;

if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
return;
}
}

std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
{
Poco::JSON::Object json;
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

}
32 changes: 28 additions & 4 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,37 @@ struct ObjectMetadata

struct RelativePathWithMetadata
Copy link
Collaborator

@zvonand zvonand Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some changes or explanations would be very suitable here. It is not obvious why a RelativePathWithMetadata object stores something called "command", what is task response, why it is here, and, most curious, why sometimes RelativePathWithMetadata does not store the relative path itself.

For me (a relatively new person in this code section) it took a while to understand what is going on here and why it looks strange

{
class CommandInTaskResponse
{
public:
CommandInTaskResponse() = default;
explicit CommandInTaskResponse(const std::string & task);

bool is_parsed() const { return successfully_parsed; }
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }

std::string to_string() const;

std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }

private:
bool successfully_parsed = false;
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
};

String relative_path;
std::optional<ObjectMetadata> metadata;
CommandInTaskResponse command;

RelativePathWithMetadata() = default;

explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: relative_path(std::move(relative_path_))
, metadata(std::move(metadata_))
{}
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
: metadata(std::move(metadata_))
, command(task_string)
{
if (!command.is_parsed())
relative_path = task_string;
}

virtual ~RelativePathWithMetadata() = default;

Expand All @@ -120,6 +142,8 @@ struct RelativePathWithMetadata
virtual bool isArchive() const { return false; }
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }

const CommandInTaskResponse & getCommand() const { return command; }
};

struct ObjectKeyWithMetadata
Expand Down
12 changes: 4 additions & 8 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,15 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);

auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1)
max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value);

createExtension(predicate, max_replicas_to_use);
createExtension(predicate);
}

void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas)
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
{
if (extension)
return;

extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
}

/// The code executes on initiator
Expand Down Expand Up @@ -178,7 +174,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
if (current_settings[Setting::max_parallel_replicas] > 1)
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);

createExtension(nullptr, max_replicas_to_use);
createExtension(nullptr);

for (const auto & shard_info : cluster->getShardsInfo())
{
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ class IStorageCluster : public IStorage
ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }

/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr cluster) const = 0;

QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;

Expand Down Expand Up @@ -130,7 +133,7 @@ class ReadFromCluster : public SourceStepWithFilter

std::optional<RemoteQueryExecutor::Extension> extension;

void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas);
void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
};

Expand Down
36 changes: 34 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ namespace DB
namespace Setting
{
extern const SettingsBool use_hive_partitioning;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
extern const SettingsString object_storage_cluster;
}

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_SETTING_VALUE;
}

String StorageObjectStorageCluster::getPathSample(ContextPtr context)
Expand Down Expand Up @@ -437,13 +439,43 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
}

RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
const ActionsDAG::Node * predicate,
const ContextPtr & local_context,
ClusterPtr cluster) const
{
auto iterator = StorageObjectStorageSource::createFileIterator(
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
local_context, predicate, {}, getVirtualsList(), hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
std::vector<std::string> ids_of_hosts;
for (const auto & shard : cluster->getShardsInfo())
{
if (shard.per_replica_pools.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
for (const auto & replica : shard.per_replica_pools)
{
if (!replica)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
ids_of_hosts.push_back(replica->getAddress());
}
}

uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms];

/// Check value to avoid negative result after conversion in microseconds.
/// Poco::Timestamp::TimeDiff is signed int 64.
static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL;
if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE,
"Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}",
lock_object_storage_task_distribution_ms,
lock_object_storage_task_distribution_ms_max
);

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
iterator,
ids_of_hosts,
lock_object_storage_task_distribution_ms);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class StorageObjectStorageCluster : public IStorageCluster
std::string getName() const override;

RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr cluster) const override;

String getPathSample(ContextPtr context);

Expand Down
25 changes: 23 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/VirtualColumnUtils.h>
Expand Down Expand Up @@ -442,11 +443,31 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
ObjectInfoPtr object_info;
auto query_settings = configuration->getQuerySettings(context_);

bool not_a_path = false;

do
{
not_a_path = false;
object_info = file_iterator->next(processor);

if (!object_info || object_info->getPath().empty())
if (!object_info)
return {};

if (object_info->getCommand().is_parsed())
{
auto retry_after_us = object_info->getCommand().get_retry_after_us();
if (retry_after_us.has_value())
{
not_a_path = true;
/// TODO: Make asyncronous waiting without sleep in thread
/// Now this sleep is on executor node in worker thread
/// Does not block query initiator
sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()));
continue;
}
}

if (object_info->getPath().empty())
return {};

if (!object_info->metadata)
Expand All @@ -465,7 +486,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
object_info->metadata = object_storage->getObjectMetadata(path);
}
}
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));

QueryPipelineBuilder builder;
std::shared_ptr<ISource> source;
Expand Down
Loading
Loading