Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::*;
use arrow::datatypes::SchemaRef;
use arrow_schema::SortOptions;
use datafusion_common::{internal_datafusion_err, DataFusionError, Result};
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -1081,13 +1081,14 @@ impl GroupedHashAggregateStream {

/// Transforms input batch to intermediate aggregate state, without grouping it
fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> {
let group_values = evaluate_group_by(&self.group_by, &batch)?;
let mut group_values = evaluate_group_by(&self.group_by, &batch)?;
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;

let mut output = group_values.first().cloned().ok_or_else(|| {
internal_datafusion_err!("group_values expected to have at least one element")
})?;
if group_values.len() != 1 {
return internal_err!("group_values expected to have single element");
}
let mut output = group_values.swap_remove(0);

let iter = self
.accumulators
Expand Down