-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
@korowa added "skip partial aggregation mode" in #11627 which helps with high cardinality aggregates by doing minimal work for the first phase of the aggregation. This mode is triggered dynamically based on how effective the first aggregation phase is working.
In order to use this new mode, the corresponding GroupsAccumulator needs to implement the convert_to_state method
datafusion/datafusion/expr/src/groups_accumulator.rs
Lines 166 to 213 in c340b6a
| /// Converts an input batch directly the intermediate aggregate state. | |
| /// | |
| /// This is the equivalent of treating each input row as its own group. It | |
| /// is invoked when the Partial phase of a multi-phase aggregation is not | |
| /// reducing the cardinality enough to warrant spending more effort on | |
| /// pre-aggregation (see `Background` section below), and switches to | |
| /// passing intermediate state directly on to the next aggregation phase. | |
| /// | |
| /// Examples: | |
| /// * `COUNT`: an array of 1s for each row in the input batch. | |
| /// * `SUM/MIN/MAX`: the input values themselves. | |
| /// | |
| /// # Arguments | |
| /// * `values`: the input arguments to the accumulator | |
| /// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored | |
| /// | |
| /// # Background | |
| /// | |
| /// In a multi-phase aggregation (see [`Accumulator::state`]), the initial | |
| /// Partial phase reduces the cardinality of the input data as soon as | |
| /// possible in the plan. | |
| /// | |
| /// This strategy is very effective for queries with a small number of | |
| /// groups, as most of the data is aggregated immediately and only a small | |
| /// amount of data must be repartitioned (see [`Accumulator::state`] for | |
| /// background) | |
| /// | |
| /// However, for queries with a large number of groups, the Partial phase | |
| /// often does not reduce the cardinality enough to warrant the memory and | |
| /// CPU cost of actually performing the aggregation. For such cases, the | |
| /// HashAggregate operator will dynamically switch to passing intermediate | |
| /// state directly to the next aggregation phase with minimal processing | |
| /// using this method. | |
| /// | |
| /// [`Accumulator::state`]: crate::Accumulator::state | |
| fn convert_to_state( | |
| &self, | |
| _values: &[ArrayRef], | |
| _opt_filter: Option<&BooleanArray>, | |
| ) -> Result<Vec<ArrayRef>> { | |
| not_impl_err!("Input batch conversion to state not implemented") | |
| } | |
| /// Returns `true` if [`Self::convert_to_state`] is implemented to support | |
| /// intermediate aggregate state conversion. | |
| fn supports_convert_to_state(&self) -> bool { | |
| false | |
| } |
Some aggregates implement the GroupsAccumulator interface directly, but by default they will use the GroupsAccumulatorAdapter along with the Accumulator trait
Describe the solution you'd like
Implement covert_to_state for
Add tests in
| # The main goal of these tests is to verify correctness of transforming | |
| # input values to state by accumulators, supporting `convert_to_state`. |
Describe alternatives you've considered
No response
Additional context
No response
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request