Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
if (extension)
return;

extension = storage->getTaskIteratorExtension(predicate, context, cluster);
extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster);
}

/// The code executes on initiator
Expand Down
1 change: 1 addition & 0 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class IStorageCluster : public IStorage
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & filter_actions_dag,
const ContextPtr & context,
ClusterPtr cluster) const = 0;

Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,13 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(

RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & filter_actions_dag,
const ContextPtr & local_context,
ClusterPtr cluster) const
{
auto iterator = StorageObjectStorageSource::createFileIterator(
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
local_context, predicate, {}, getVirtualsList(), nullptr, local_context->getFileProgressCallback());
local_context, predicate, filter_actions_dag, getVirtualsList(), nullptr, local_context->getFileProgressCallback());

std::vector<std::string> ids_of_hosts;
for (const auto & shard : cluster->getShardsInfo())
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class StorageObjectStorageCluster : public IStorageCluster

RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & filter_actions_dag,
const ContextPtr & context,
ClusterPtr cluster) const override;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
const auto cluster = getCluster();

/// Select query is needed for pruining on virtual columns
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster);
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, filter, local_context, cluster);

/// Here we take addresses from destination cluster and assume source table exists on these nodes
size_t replica_index = 0;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageFileCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto

RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & /* filter_actions_dag */,
const ContextPtr & context,
ClusterPtr) const
{
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageFileCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class StorageFileCluster : public IStorageCluster
std::string getName() const override { return "FileCluster"; }
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & filter_actions_dag,
const ContextPtr & context,
ClusterPtr) const override;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6007,7 +6007,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
ContextMutablePtr query_context = Context::createCopy(local_context);
query_context->increaseDistributedDepth();

auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster);
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, {}, local_context, src_cluster);

size_t replica_index = 0;
for (const auto & replicas : src_cluster->getShardsAddresses())
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageURLCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS

RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & /* filter_actions_dag */,
const ContextPtr & context,
ClusterPtr) const
{
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageURLCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class StorageURLCluster : public IStorageCluster
std::string getName() const override { return "URLCluster"; }
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & filter_actions_dag,
const ContextPtr & context,
ClusterPtr) const override;

Expand Down
18 changes: 12 additions & 6 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def test_types(started_cluster, format_version, storage_type):
[
["a", "Nullable(Int32)"],
["b", "Nullable(String)"],
["c", "Nullable(Date32)"],
["c", "Nullable(Date)"],
["d", "Array(Nullable(String))"],
["e", "Nullable(Bool)"],
]
Expand All @@ -604,7 +604,7 @@ def test_types(started_cluster, format_version, storage_type):
[
["a", "Nullable(Int32)"],
["b", "Nullable(String)"],
["c", "Nullable(Date32)"],
["c", "Nullable(Date)"],
["d", "Array(Nullable(String))"],
["e", "Nullable(Bool)"],
]
Expand Down Expand Up @@ -2076,7 +2076,10 @@ def test_filesystem_cache(started_cluster, storage_type):


@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_partition_pruning(started_cluster, storage_type):
@pytest.mark.parametrize("run_on_cluster", [False, True])
def test_partition_pruning(started_cluster, storage_type, run_on_cluster):
if run_on_cluster and storage_type == "local":
pytest.skip("Local storage is not supported on cluster")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str()
Expand Down Expand Up @@ -2124,7 +2127,7 @@ def execute_spark_query(query: str):
)

creation_expression = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster
)

def check_validity_and_get_prunned_files(select_expression):
Expand Down Expand Up @@ -3020,7 +3023,10 @@ def test_explicit_metadata_file(started_cluster, storage_type):


@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_minmax_pruning_with_null(started_cluster, storage_type):
@pytest.mark.parametrize("run_on_cluster", [False, True])
def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster):
if run_on_cluster and storage_type == "local":
pytest.skip("Local storage is not supported on cluster")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str()
Expand Down Expand Up @@ -3091,7 +3097,7 @@ def execute_spark_query(query: str):
)

creation_expression = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster
)

def check_validity_and_get_prunned_files(select_expression):
Expand Down
Loading