diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 259e53d5b1fb..8f66771aad6b 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -69,7 +69,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, cluster); + extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster); } /// The code executes on initiator diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 7332cab5ce9f..1fbea4f4d96a 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -43,6 +43,7 @@ class IStorageCluster : public IStorage /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const = 0; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 11f8fe0ec4bb..404a89b5a3c0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -339,12 +339,13 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & local_context, ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, - local_context, predicate, {}, getVirtualsList(), nullptr, local_context->getFileProgressCallback()); + local_context, predicate, filter_actions_dag, getVirtualsList(), nullptr, local_context->getFileProgressCallback()); std::vector ids_of_hosts; for (const auto & shard : cluster->getShardsInfo()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index ef216ac5c450..7b7031a7c728 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -31,6 +31,7 @@ class StorageObjectStorageCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const override; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 3da44b877997..9d9e1c5310ee 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1160,7 +1160,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, filter, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 146fc15d6389..9946d4188907 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -76,6 +76,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 49d39a24ceba..93627fdc31d6 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -29,6 +29,7 @@ class StorageFileCluster : public IStorageCluster std::string getName() const override { return "FileCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f700db3093b8..21fa041d47ea 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6007,7 +6007,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, {}, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index eec1db3b6966..f9eab56ba58d 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -95,6 +95,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 9bfbaffe30f8..884ca16e7b12 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -32,6 +32,7 @@ class StorageURLCluster : public IStorageCluster std::string getName() const override { return "URLCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 5218396a4fd6..70df0bc90827 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -581,7 +581,7 @@ def test_types(started_cluster, format_version, storage_type): [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date32)"], + ["c", "Nullable(Date)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -604,7 +604,7 @@ def test_types(started_cluster, format_version, storage_type): [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date32)"], + ["c", "Nullable(Date)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -2076,7 +2076,10 @@ def test_filesystem_cache(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_partition_pruning(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_partition_pruning(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str() @@ -2124,7 +2127,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression): @@ -3020,7 +3023,10 @@ def test_explicit_metadata_file(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_minmax_pruning_with_null(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() @@ -3091,7 +3097,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression):