diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index dbf348b09cd5..11d0b9fad6db 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -25,7 +25,6 @@ mod delimited_stream; mod file_stream; mod json; mod parquet; -mod row_filter; pub(crate) use self::csv::plan_to_csv; pub use self::csv::CsvExec; diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 24159a337643..87c3c220d186 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -18,7 +18,6 @@ //! Execution plan for reading Parquet files use fmt::Debug; -use std::collections::{HashSet, VecDeque}; use std::fmt; use std::fs; use std::ops::Range; @@ -33,7 +32,6 @@ use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; -use crate::physical_plan::file_format::row_filter::build_row_filter; use crate::physical_plan::file_format::FileMeta; use crate::{ error::{DataFusionError, Result}, @@ -42,13 +40,12 @@ use crate::{ physical_plan::{ expressions::PhysicalSortExpr, file_format::{FileScanConfig, SchemaAdapter}, - metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, scalar::ScalarValue, }; -use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array}; use arrow::datatypes::DataType; use arrow::{ array::ArrayRef, @@ -57,27 +54,29 @@ use arrow::{ }; use bytes::Bytes; use datafusion_common::Column; -use datafusion_expr::utils::expr_to_columns; use datafusion_expr::Expr; -use datafusion_optimizer::utils::split_conjunction; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; -use log::{debug, error}; +use log::debug; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelector}; +use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; use parquet::errors::ParquetError; -use parquet::file::page_index::index::Index; use parquet::file::{ metadata::{ParquetMetaData, RowGroupMetaData}, properties::WriterProperties, statistics::Statistics as ParquetStatistics, }; -use parquet::format::PageLocation; use parquet::schema::types::ColumnDescriptor; +mod metrics; +mod page_filter; +mod row_filter; + +pub use metrics::ParquetFileMetrics; + /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { @@ -228,61 +227,6 @@ impl ParquetExec { } } -/// Stores metrics about the parquet execution for a particular parquet file. -/// -/// This component is a subject to **change** in near future and is exposed for low level integrations -/// through [ParquetFileReaderFactory]. -#[derive(Debug, Clone)] -pub struct ParquetFileMetrics { - /// Number of times the predicate could not be evaluated - pub predicate_evaluation_errors: metrics::Count, - /// Number of row groups pruned using - pub row_groups_pruned: metrics::Count, - /// Total number of bytes scanned - pub bytes_scanned: metrics::Count, - /// Total rows filtered out by predicates pushed into parquet scan - pub pushdown_rows_filtered: metrics::Count, - /// Total time spent evaluating pushdown filters - pub pushdown_eval_time: metrics::Time, -} - -impl ParquetFileMetrics { - /// Create new metrics - pub fn new( - partition: usize, - filename: &str, - metrics: &ExecutionPlanMetricsSet, - ) -> Self { - let predicate_evaluation_errors = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("predicate_evaluation_errors", partition); - - let row_groups_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("row_groups_pruned", partition); - - let bytes_scanned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("bytes_scanned", partition); - - let pushdown_rows_filtered = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("pushdown_rows_filtered", partition); - - let pushdown_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .subset_time("pushdown_eval_time", partition); - - Self { - predicate_evaluation_errors, - row_groups_pruned, - bytes_scanned, - pushdown_rows_filtered, - pushdown_eval_time, - } - } -} - impl ExecutionPlan for ParquetExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -463,11 +407,12 @@ impl FileOpener for ParquetOpener { adapted_projections.iter().cloned(), ); + // Filter pushdown: evlauate predicates during scan if let Some(predicate) = pushdown_filters .then(|| pruning_predicate.as_ref().map(|p| p.logical_expr())) .flatten() { - let row_filter = build_row_filter( + let row_filter = row_filter::build_row_filter( predicate.clone(), builder.schema().as_ref(), table_schema.as_ref(), @@ -491,77 +436,33 @@ impl FileOpener for ParquetOpener { }; }; + // Row group pruning: attempt to skip entire row_groups + // using metadata on the row groups let file_metadata = builder.metadata(); - let groups = file_metadata.row_groups(); let row_groups = prune_row_groups( - groups, + file_metadata.row_groups(), file_range, pruning_predicate.clone(), &file_metrics, ); - if enable_page_index { - let page_index_predicates = extract_page_index_push_down_predicates( - &pruning_predicate, - builder.schema().clone(), - )?; - if !page_index_predicates.is_empty() { - let file_offset_indexes = file_metadata.offset_indexes(); - let file_page_indexes = file_metadata.page_indexes(); - if let (Some(file_offset_indexes), Some(file_page_indexes)) = - (file_offset_indexes, file_page_indexes) - { - let mut row_selections = - VecDeque::with_capacity(page_index_predicates.len()); - for predicate in page_index_predicates { - // `extract_page_index_push_down_predicates` only return predicate with one col. - let col_id = *predicate - .need_input_columns_ids() - .iter() - .next() - .unwrap(); - let mut selectors = Vec::with_capacity(row_groups.len()); - for r in &row_groups { - let rg_offset_indexes = file_offset_indexes.get(*r); - let rg_page_indexes = file_page_indexes.get(*r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes)) = - (rg_page_indexes, rg_offset_indexes) - { - selectors.extend( - prune_pages_in_one_row_group( - &groups[*r], - &predicate, - rg_offset_indexes.get(col_id), - rg_page_indexes.get(col_id), - &file_metrics, - ) - .map_err(|e| { - ArrowError::ParquetError(format!( - "Fail in prune_pages_in_one_row_group: {}", - e - )) - }), - ); - } else { - // fallback select all rows - let all_selected = vec![RowSelector::select( - groups[*r].num_rows() as usize, - )]; - selectors.push(all_selected); - } - } - debug!( - "Use filter and page index create RowSelection {:?} from predicate:{:?}", - &selectors, predicate - ); - row_selections.push_back( - selectors.into_iter().flatten().collect::>(), - ); - } - let final_selection = combine_multi_col_selection(row_selections); - builder = builder.with_row_selection(final_selection.into()); - } - } + // page index pruning: if all data on individual pages can + // be ruled using page metadata, rows from other columns + // with that range can be skipped as well + if let Some(row_selection) = enable_page_index + .then(|| { + page_filter::build_page_filter( + pruning_predicate.as_ref(), + builder.schema().clone(), + &row_groups, + file_metadata.as_ref(), + &file_metrics, + ) + }) + .transpose()? + .flatten() + { + builder = builder.with_row_selection(row_selection); } let stream = builder @@ -585,170 +486,6 @@ impl FileOpener for ParquetOpener { } } -/// For example: -/// ```text -/// ┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// ┃ ┌──────────────┐ │ ┌──────────────┐ │ ┃ -/// ┃ │ │ │ │ │ │ ┃ -/// ┃ │ │ │ │ Page │ │ -/// │ │ │ │ │ 3 │ ┃ -/// ┃ │ │ │ │ min: "A" │ │ ┃ -/// ┃ │ │ │ │ │ max: "C" │ ┃ -/// ┃ │ Page │ │ │ first_row: 0 │ │ -/// │ │ 1 │ │ │ │ ┃ -/// ┃ │ min: 10 │ │ └──────────────┘ │ ┃ -/// ┃ │ │ max: 20 │ │ ┌──────────────┐ ┃ -/// ┃ │ first_row: 0 │ │ │ │ │ -/// │ │ │ │ │ Page │ ┃ -/// ┃ │ │ │ │ 4 │ │ ┃ -/// ┃ │ │ │ │ │ min: "D" │ ┃ -/// ┃ │ │ │ │ max: "G" │ │ -/// │ │ │ │ │first_row: 100│ ┃ -/// ┃ └──────────────┘ │ │ │ │ ┃ -/// ┃ │ ┌──────────────┐ │ │ │ ┃ -/// ┃ │ │ │ └──────────────┘ │ -/// │ │ Page │ │ ┌──────────────┐ ┃ -/// ┃ │ 2 │ │ │ │ │ ┃ -/// ┃ │ │ min: 30 │ │ │ Page │ ┃ -/// ┃ │ max: 40 │ │ │ 5 │ │ -/// │ │first_row: 200│ │ │ min: "H" │ ┃ -/// ┃ │ │ │ │ max: "Z" │ │ ┃ -/// ┃ │ │ │ │ │first_row: 250│ ┃ -/// ┃ └──────────────┘ │ │ │ │ -/// │ │ └──────────────┘ ┃ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ -/// ┃ ColumnChunk ColumnChunk ┃ -/// ┃ A B -/// ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛ -/// -/// Total rows: 300 -/// ``` -/// -/// Given the predicate 'A > 35 AND B = "F"': -/// using `extract_page_index_push_down_predicates` get two single column predicate: -/// Using 'A > 35': could get `RowSelector1: [ Skip(0~199), Read(200~299)]` -/// Using B = "F": could get `RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]` -/// -/// As the Final selection is the intersection of each columns `RowSelectors: -/// final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]` -fn combine_multi_col_selection( - row_selections: VecDeque>, -) -> Vec { - row_selections - .into_iter() - .reduce(intersect_row_selection) - .unwrap() -} - -/// combine two `RowSelection` return the intersection -/// For example: -/// self: NNYYYYNNY -/// other: NYNNNNNNY -/// -/// returned: NNNNNNNNY -/// set `need_combine` true will combine result: Select(2) + Select(1) + Skip(2) -> Select(3) + Skip(2) -/// -/// Move to arrow-rs: https://github.com/apache/arrow-rs/issues/3003 -pub(crate) fn intersect_row_selection( - left: Vec, - right: Vec, -) -> Vec { - let mut res = vec![]; - let mut l_iter = left.into_iter().peekable(); - let mut r_iter = right.into_iter().peekable(); - - while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) { - if a.row_count == 0 { - l_iter.next().unwrap(); - continue; - } - if b.row_count == 0 { - r_iter.next().unwrap(); - continue; - } - match (a.skip, b.skip) { - // Keep both ranges - (false, false) => { - if a.row_count < b.row_count { - res.push(RowSelector::select(a.row_count)); - b.row_count -= a.row_count; - l_iter.next().unwrap(); - } else { - res.push(RowSelector::select(b.row_count)); - a.row_count -= b.row_count; - r_iter.next().unwrap(); - } - } - // skip at least one - _ => { - if a.row_count < b.row_count { - res.push(RowSelector::skip(a.row_count)); - b.row_count -= a.row_count; - l_iter.next().unwrap(); - } else { - res.push(RowSelector::skip(b.row_count)); - a.row_count -= b.row_count; - r_iter.next().unwrap(); - } - } - } - } - if l_iter.peek().is_some() { - res.extend(l_iter); - } - if r_iter.peek().is_some() { - res.extend(r_iter); - } - // combine the adjacent same operators and last zero row count - // TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is released~ - - let mut pre = res[0]; - let mut after_combine = vec![]; - for selector in res.iter_mut().skip(1) { - if selector.skip == pre.skip { - pre.row_count += selector.row_count; - } else { - after_combine.push(pre); - pre = *selector; - } - } - if pre.row_count != 0 { - after_combine.push(pre); - } - after_combine -} - -// Extract single col pruningPredicate from input predicate for evaluating page Index. -fn extract_page_index_push_down_predicates( - predicate: &Option, - schema: SchemaRef, -) -> Result> { - let mut one_col_predicates = vec![]; - if let Some(predicate) = predicate { - let expr = predicate.logical_expr(); - // todo try use CNF rewrite when ready - let predicates = split_conjunction(expr); - let mut one_col_expr = vec![]; - predicates - .into_iter() - .try_for_each::<_, Result<()>>(|predicate| { - let mut columns: HashSet = HashSet::new(); - expr_to_columns(predicate, &mut columns)?; - if columns.len() == 1 { - one_col_expr.push(predicate); - } - Ok(()) - })?; - one_col_predicates = one_col_expr - .into_iter() - .map(|e| PruningPredicate::try_new(e.clone(), schema.clone())) - .collect::>>() - .unwrap_or_default(); - } - Ok(one_col_predicates) -} - /// Factory of parquet file readers. /// /// Provides means to implement custom data access interface. @@ -906,13 +643,6 @@ struct RowGroupPruningStatistics<'a> { parquet_schema: &'a Schema, } -/// Wraps one col page_index in one rowGroup statistics in a way -/// that implements [`PruningStatistics`] -struct PagesPruningStatistics<'a> { - col_page_indexes: &'a Index, - col_offset_indexes: &'a Vec, -} - // TODO: consolidate code with arrow-rs // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. @@ -1045,49 +775,6 @@ macro_rules! get_null_count_values { }}; } -// Extract the min or max value calling `func` from page idex -macro_rules! get_min_max_values_for_page_index { - ($self:expr, $func:ident) => {{ - match $self.col_page_indexes { - Index::NONE => None, - Index::INT32(index) => { - let vec = &index.indexes; - Some(Arc::new(Int32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::INT64(index) => { - let vec = &index.indexes; - Some(Arc::new(Int64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::FLOAT(index) => { - let vec = &index.indexes; - Some(Arc::new(Float32Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::DOUBLE(index) => { - let vec = &index.indexes; - Some(Arc::new(Float64Array::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::BOOLEAN(index) => { - let vec = &index.indexes; - Some(Arc::new(BooleanArray::from_iter( - vec.iter().map(|x| x.$func().cloned()), - ))) - } - Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { - //Todo support these type - None - } - } - }}; -} - // Convert parquet column schema to arrow data type, and just consider the // decimal data type. fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { @@ -1124,45 +811,6 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } } -impl<'a> PruningStatistics for PagesPruningStatistics<'a> { - fn min_values(&self, _column: &Column) -> Option { - get_min_max_values_for_page_index!(self, min) - } - - fn max_values(&self, _column: &Column) -> Option { - get_min_max_values_for_page_index!(self, max) - } - - fn num_containers(&self) -> usize { - self.col_offset_indexes.len() - } - - fn null_counts(&self, _column: &Column) -> Option { - match self.col_page_indexes { - Index::NONE => None, - Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT32(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT64(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter( - index.indexes.iter().map(|x| x.null_count), - ))), - Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { - // Todo support these types - None - } - } - } -} - fn prune_row_groups( groups: &[RowGroupMetaData], range: Option, @@ -1206,82 +854,6 @@ fn prune_row_groups( filtered } -fn prune_pages_in_one_row_group( - group: &RowGroupMetaData, - predicate: &PruningPredicate, - col_offset_indexes: Option<&Vec>, - col_page_indexes: Option<&Index>, - metrics: &ParquetFileMetrics, -) -> Result> { - let num_rows = group.num_rows() as usize; - if let (Some(col_offset_indexes), Some(col_page_indexes)) = - (col_offset_indexes, col_page_indexes) - { - let pruning_stats = PagesPruningStatistics { - col_page_indexes, - col_offset_indexes, - }; - - match predicate.prune(&pruning_stats) { - Ok(values) => { - let mut vec = Vec::with_capacity(values.len()); - let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); - assert_eq!(row_vec.len(), values.len()); - let mut sum_row = *row_vec.first().unwrap(); - let mut selected = *values.first().unwrap(); - - for (i, &f) in values.iter().skip(1).enumerate() { - if f == selected { - sum_row += *row_vec.get(i).unwrap(); - } else { - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - sum_row = *row_vec.get(i).unwrap(); - selected = f; - } - } - - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - return Ok(vec); - } - // stats filter array could not be built - // return a result which will not filter out any pages - Err(e) => { - error!("Error evaluating page index predicate values {}", e); - metrics.predicate_evaluation_errors.add(1); - return Ok(vec![RowSelector::select(group.num_rows() as usize)]); - } - } - } - Err(DataFusionError::ParquetError(ParquetError::General( - "Got some error in prune_pages_in_one_row_group, plz try open the debuglog mode" - .to_string(), - ))) -} - -fn create_row_count_in_each_page( - location: &Vec, - num_rows: usize, -) -> Vec { - let mut vec = Vec::with_capacity(location.len()); - location.windows(2).for_each(|x| { - let start = x[0].first_row_index as usize; - let end = x[1].first_row_index as usize; - vec.push(end - start); - }); - vec.push(num_rows - location.last().unwrap().first_row_index as usize); - vec -} - /// Executes a query and writes the results to a partitioned Parquet file. pub async fn plan_to_parquet( state: &SessionState, @@ -2193,65 +1765,6 @@ mod tests { ParquetFileMetrics::new(0, "file.parquet", &metrics) } - #[test] - fn test_combine_row_selection() { - // a size equal b size - let a = vec![ - RowSelector::select(5), - RowSelector::skip(4), - RowSelector::select(1), - ]; - let b = vec![ - RowSelector::select(8), - RowSelector::skip(1), - RowSelector::select(1), - ]; - - let res = intersect_row_selection(a, b); - assert_eq!( - res, - vec![ - RowSelector::select(5), - RowSelector::skip(4), - RowSelector::select(1) - ], - ); - - // a size larger than b size - let a = vec![ - RowSelector::select(3), - RowSelector::skip(33), - RowSelector::select(3), - RowSelector::skip(33), - ]; - let b = vec![RowSelector::select(36), RowSelector::skip(36)]; - let res = intersect_row_selection(a, b); - assert_eq!(res, vec![RowSelector::select(3), RowSelector::skip(69)]); - - // a size less than b size - let a = vec![RowSelector::select(3), RowSelector::skip(7)]; - let b = vec![ - RowSelector::select(2), - RowSelector::skip(2), - RowSelector::select(2), - RowSelector::skip(2), - RowSelector::select(2), - ]; - let res = intersect_row_selection(a, b); - assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8)]); - - let a = vec![RowSelector::select(3), RowSelector::skip(7)]; - let b = vec![ - RowSelector::select(2), - RowSelector::skip(2), - RowSelector::select(2), - RowSelector::skip(2), - RowSelector::select(2), - ]; - let res = intersect_row_selection(a, b); - assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8),]); - } - #[test] fn row_group_pruning_predicate_simple_expr() { use datafusion_expr::{col, lit}; diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs new file mode 100644 index 000000000000..58e340e62417 --- /dev/null +++ b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, Time, +}; + +/// Stores metrics about the parquet execution for a particular parquet file. +/// +/// This component is a subject to **change** in near future and is exposed for low level integrations +/// through [ParquetFileReaderFactory]. +#[derive(Debug, Clone)] +pub struct ParquetFileMetrics { + /// Number of times the predicate could not be evaluated + pub predicate_evaluation_errors: Count, + /// Number of row groups pruned using + pub row_groups_pruned: Count, + /// Total number of bytes scanned + pub bytes_scanned: Count, + /// Total rows filtered out by predicates pushed into parquet scan + pub pushdown_rows_filtered: Count, + /// Total time spent evaluating pushdown filters + pub pushdown_eval_time: Time, +} + +impl ParquetFileMetrics { + /// Create new metrics + pub fn new( + partition: usize, + filename: &str, + metrics: &ExecutionPlanMetricsSet, + ) -> Self { + let predicate_evaluation_errors = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("predicate_evaluation_errors", partition); + + let row_groups_pruned = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("row_groups_pruned", partition); + + let bytes_scanned = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("bytes_scanned", partition); + + let pushdown_rows_filtered = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("pushdown_rows_filtered", partition); + + let pushdown_eval_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("pushdown_eval_time", partition); + + Self { + predicate_evaluation_errors, + row_groups_pruned, + bytes_scanned, + pushdown_rows_filtered, + pushdown_eval_time, + } + } +} diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs new file mode 100644 index 000000000000..37002af87608 --- /dev/null +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -0,0 +1,514 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains code to filter entire pages + +use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array}; +use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; +use datafusion_common::{Column, DataFusionError, Result}; +use datafusion_expr::utils::expr_to_columns; +use datafusion_optimizer::utils::split_conjunction; +use log::{debug, error}; +use parquet::{ + arrow::arrow_reader::{RowSelection, RowSelector}, + errors::ParquetError, + file::{ + metadata::{ParquetMetaData, RowGroupMetaData}, + page_index::index::Index, + }, + format::PageLocation, +}; +use std::collections::{HashSet, VecDeque}; +use std::sync::Arc; + +use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; + +use super::metrics::ParquetFileMetrics; + +/// Create a RowSelection that may rule out ranges of rows based on +/// parquet page level statistics, if any. +/// +/// For example, given a row group with two column (chunks) for `A` +/// and `B` with the following with page level statistics: +/// +/// ```text +/// ┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ +/// ┃ ┌──────────────┐ │ ┌──────────────┐ │ ┃ +/// ┃ │ │ │ │ │ │ ┃ +/// ┃ │ │ │ │ Page │ │ +/// │ │ │ │ │ 3 │ ┃ +/// ┃ │ │ │ │ min: "A" │ │ ┃ +/// ┃ │ │ │ │ │ max: "C" │ ┃ +/// ┃ │ Page │ │ │ first_row: 0 │ │ +/// │ │ 1 │ │ │ │ ┃ +/// ┃ │ min: 10 │ │ └──────────────┘ │ ┃ +/// ┃ │ │ max: 20 │ │ ┌──────────────┐ ┃ +/// ┃ │ first_row: 0 │ │ │ │ │ +/// │ │ │ │ │ Page │ ┃ +/// ┃ │ │ │ │ 4 │ │ ┃ +/// ┃ │ │ │ │ │ min: "D" │ ┃ +/// ┃ │ │ │ │ max: "G" │ │ +/// │ │ │ │ │first_row: 100│ ┃ +/// ┃ └──────────────┘ │ │ │ │ ┃ +/// ┃ │ ┌──────────────┐ │ │ │ ┃ +/// ┃ │ │ │ └──────────────┘ │ +/// │ │ Page │ │ ┌──────────────┐ ┃ +/// ┃ │ 2 │ │ │ │ │ ┃ +/// ┃ │ │ min: 30 │ │ │ Page │ ┃ +/// ┃ │ max: 40 │ │ │ 5 │ │ +/// │ │first_row: 200│ │ │ min: "H" │ ┃ +/// ┃ │ │ │ │ max: "Z" │ │ ┃ +/// ┃ │ │ │ │ │first_row: 250│ ┃ +/// ┃ └──────────────┘ │ │ │ │ +/// │ │ └──────────────┘ ┃ +/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ +/// ┃ ColumnChunk ColumnChunk ┃ +/// ┃ A B +/// ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛ +/// +/// Total rows: 300 +/// +/// ``` +/// +/// Given the predicate `A > 35 AND B = 'F'`: +/// +/// Using `A > 35`: can rule out all of values in Page 1 (rows 0 -> 199) +/// +/// Using `B = 'F'`: can rule out all vaues in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299) +/// +/// So we can entirely skip rows 0->199 and 250->299 as we know they +/// can not contain rows that match the predicate. +pub(crate) fn build_page_filter( + pruning_predicate: Option<&PruningPredicate>, + schema: SchemaRef, + row_groups: &[usize], + file_metadata: &ParquetMetaData, + file_metrics: &ParquetFileMetrics, +) -> Result> { + let page_index_predicates = + extract_page_index_push_down_predicates(pruning_predicate, schema)?; + + if page_index_predicates.is_empty() { + return Ok(None); + } + + let groups = file_metadata.row_groups(); + + let file_offset_indexes = file_metadata.offset_indexes(); + let file_page_indexes = file_metadata.page_indexes(); + if let (Some(file_offset_indexes), Some(file_page_indexes)) = + (file_offset_indexes, file_page_indexes) + { + let mut row_selections = VecDeque::with_capacity(page_index_predicates.len()); + for predicate in page_index_predicates { + // `extract_page_index_push_down_predicates` only return predicate with one col. + let col_id = *predicate.need_input_columns_ids().iter().next().unwrap(); + let mut selectors = Vec::with_capacity(row_groups.len()); + for r in row_groups.iter() { + let rg_offset_indexes = file_offset_indexes.get(*r); + let rg_page_indexes = file_page_indexes.get(*r); + if let (Some(rg_page_indexes), Some(rg_offset_indexes)) = + (rg_page_indexes, rg_offset_indexes) + { + selectors.extend( + prune_pages_in_one_row_group( + &groups[*r], + &predicate, + rg_offset_indexes.get(col_id), + rg_page_indexes.get(col_id), + file_metrics, + ) + .map_err(|e| { + ArrowError::ParquetError(format!( + "Fail in prune_pages_in_one_row_group: {}", + e + )) + }), + ); + } else { + // fallback select all rows + let all_selected = + vec![RowSelector::select(groups[*r].num_rows() as usize)]; + selectors.push(all_selected); + } + } + debug!( + "Use filter and page index create RowSelection {:?} from predicate:{:?}", + &selectors, predicate + ); + row_selections.push_back(selectors.into_iter().flatten().collect::>()); + } + let final_selection = combine_multi_col_selection(row_selections); + Ok(Some(final_selection.into())) + } else { + Ok(None) + } +} +/// Intersects the [`RowSelector`]s +/// +/// For exampe, given: +/// * `RowSelector1: [ Skip(0~199), Read(200~299)]` +/// * `RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]` +/// +/// The final selection is the intersection of these `RowSelector`s: +/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]` +fn combine_multi_col_selection( + row_selections: VecDeque>, +) -> Vec { + row_selections + .into_iter() + .reduce(intersect_row_selection) + .unwrap() +} + +/// combine two `RowSelection` return the intersection +/// For example: +/// self: NNYYYYNNY +/// other: NYNNNNNNY +/// +/// returned: NNNNNNNNY +/// set `need_combine` true will combine result: Select(2) + Select(1) + Skip(2) -> Select(3) + Skip(2) +/// +/// Move to arrow-rs: https://github.com/apache/arrow-rs/issues/3003 +pub(crate) fn intersect_row_selection( + left: Vec, + right: Vec, +) -> Vec { + let mut res = vec![]; + let mut l_iter = left.into_iter().peekable(); + let mut r_iter = right.into_iter().peekable(); + + while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) { + if a.row_count == 0 { + l_iter.next().unwrap(); + continue; + } + if b.row_count == 0 { + r_iter.next().unwrap(); + continue; + } + match (a.skip, b.skip) { + // Keep both ranges + (false, false) => { + if a.row_count < b.row_count { + res.push(RowSelector::select(a.row_count)); + b.row_count -= a.row_count; + l_iter.next().unwrap(); + } else { + res.push(RowSelector::select(b.row_count)); + a.row_count -= b.row_count; + r_iter.next().unwrap(); + } + } + // skip at least one + _ => { + if a.row_count < b.row_count { + res.push(RowSelector::skip(a.row_count)); + b.row_count -= a.row_count; + l_iter.next().unwrap(); + } else { + res.push(RowSelector::skip(b.row_count)); + a.row_count -= b.row_count; + r_iter.next().unwrap(); + } + } + } + } + if l_iter.peek().is_some() { + res.extend(l_iter); + } + if r_iter.peek().is_some() { + res.extend(r_iter); + } + // combine the adjacent same operators and last zero row count + // TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is released~ + + let mut pre = res[0]; + let mut after_combine = vec![]; + for selector in res.iter_mut().skip(1) { + if selector.skip == pre.skip { + pre.row_count += selector.row_count; + } else { + after_combine.push(pre); + pre = *selector; + } + } + if pre.row_count != 0 { + after_combine.push(pre); + } + after_combine +} + +// Extract single col pruningPredicate from input predicate for evaluating page Index. +fn extract_page_index_push_down_predicates( + predicate: Option<&PruningPredicate>, + schema: SchemaRef, +) -> Result> { + let mut one_col_predicates = vec![]; + if let Some(predicate) = predicate { + let expr = predicate.logical_expr(); + // todo try use CNF rewrite when ready + let predicates = split_conjunction(expr); + let mut one_col_expr = vec![]; + predicates + .into_iter() + .try_for_each::<_, Result<()>>(|predicate| { + let mut columns: HashSet = HashSet::new(); + expr_to_columns(predicate, &mut columns)?; + if columns.len() == 1 { + one_col_expr.push(predicate); + } + Ok(()) + })?; + one_col_predicates = one_col_expr + .into_iter() + .map(|e| PruningPredicate::try_new(e.clone(), schema.clone())) + .collect::>>() + .unwrap_or_default(); + } + Ok(one_col_predicates) +} + +fn prune_pages_in_one_row_group( + group: &RowGroupMetaData, + predicate: &PruningPredicate, + col_offset_indexes: Option<&Vec>, + col_page_indexes: Option<&Index>, + metrics: &ParquetFileMetrics, +) -> Result> { + let num_rows = group.num_rows() as usize; + if let (Some(col_offset_indexes), Some(col_page_indexes)) = + (col_offset_indexes, col_page_indexes) + { + let pruning_stats = PagesPruningStatistics { + col_page_indexes, + col_offset_indexes, + }; + + match predicate.prune(&pruning_stats) { + Ok(values) => { + let mut vec = Vec::with_capacity(values.len()); + let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); + assert_eq!(row_vec.len(), values.len()); + let mut sum_row = *row_vec.first().unwrap(); + let mut selected = *values.first().unwrap(); + + for (i, &f) in values.iter().skip(1).enumerate() { + if f == selected { + sum_row += *row_vec.get(i).unwrap(); + } else { + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + sum_row = *row_vec.get(i).unwrap(); + selected = f; + } + } + + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + return Ok(vec); + } + // stats filter array could not be built + // return a result which will not filter out any pages + Err(e) => { + error!("Error evaluating page index predicate values {}", e); + metrics.predicate_evaluation_errors.add(1); + return Ok(vec![RowSelector::select(group.num_rows() as usize)]); + } + } + } + Err(DataFusionError::ParquetError(ParquetError::General( + "Got some error in prune_pages_in_one_row_group, plz try open the debuglog mode" + .to_string(), + ))) +} + +fn create_row_count_in_each_page( + location: &Vec, + num_rows: usize, +) -> Vec { + let mut vec = Vec::with_capacity(location.len()); + location.windows(2).for_each(|x| { + let start = x[0].first_row_index as usize; + let end = x[1].first_row_index as usize; + vec.push(end - start); + }); + vec.push(num_rows - location.last().unwrap().first_row_index as usize); + vec +} + +/// Wraps one col page_index in one rowGroup statistics in a way +/// that implements [`PruningStatistics`] +struct PagesPruningStatistics<'a> { + col_page_indexes: &'a Index, + col_offset_indexes: &'a Vec, +} + +// Extract the min or max value calling `func` from page idex +macro_rules! get_min_max_values_for_page_index { + ($self:expr, $func:ident) => {{ + match $self.col_page_indexes { + Index::NONE => None, + Index::INT32(index) => { + let vec = &index.indexes; + Some(Arc::new(Int32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::INT64(index) => { + let vec = &index.indexes; + Some(Arc::new(Int64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::FLOAT(index) => { + let vec = &index.indexes; + Some(Arc::new(Float32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::DOUBLE(index) => { + let vec = &index.indexes; + Some(Arc::new(Float64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::BOOLEAN(index) => { + let vec = &index.indexes; + Some(Arc::new(BooleanArray::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + //Todo support these type + None + } + } + }}; +} + +impl<'a> PruningStatistics for PagesPruningStatistics<'a> { + fn min_values(&self, _column: &Column) -> Option { + get_min_max_values_for_page_index!(self, min) + } + + fn max_values(&self, _column: &Column) -> Option { + get_min_max_values_for_page_index!(self, max) + } + + fn num_containers(&self) -> usize { + self.col_offset_indexes.len() + } + + fn null_counts(&self, _column: &Column) -> Option { + match self.col_page_indexes { + Index::NONE => None, + Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT32(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT64(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => { + // Todo support these types + None + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_combine_row_selection() { + // a size equal b size + let a = vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(1), + ]; + let b = vec![ + RowSelector::select(8), + RowSelector::skip(1), + RowSelector::select(1), + ]; + + let res = intersect_row_selection(a, b); + assert_eq!( + res, + vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(1) + ], + ); + + // a size larger than b size + let a = vec![ + RowSelector::select(3), + RowSelector::skip(33), + RowSelector::select(3), + RowSelector::skip(33), + ]; + let b = vec![RowSelector::select(36), RowSelector::skip(36)]; + let res = intersect_row_selection(a, b); + assert_eq!(res, vec![RowSelector::select(3), RowSelector::skip(69)]); + + // a size less than b size + let a = vec![RowSelector::select(3), RowSelector::skip(7)]; + let b = vec![ + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + ]; + let res = intersect_row_selection(a, b); + assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8)]); + + let a = vec![RowSelector::select(3), RowSelector::skip(7)]; + let b = vec![ + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(2), + ]; + let res = intersect_row_selection(a, b); + assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8),]); + } +} diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/row_filter.rs rename to datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs index 470dc9fbf4df..876891813c46 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs @@ -405,7 +405,6 @@ pub fn build_row_filter( #[cfg(test)] mod test { use super::*; - use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder; use arrow::datatypes::Field; use datafusion_expr::{cast, col, lit}; use parquet::arrow::parquet_to_arrow_schema;