Skip to content

Commit 94dac76

Browse files
authored
GroupedHashAggregateStream breaks spill batch (#8004)
... into smaller chunks to decrease memory required for merging.
1 parent 7788b90 commit 94dac76

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2155,7 +2155,7 @@ mod tests {
21552155
spill: bool,
21562156
) -> Result<()> {
21572157
let task_ctx = if spill {
2158-
new_spill_ctx(2, 2812)
2158+
new_spill_ctx(2, 2886)
21592159
} else {
21602160
Arc::new(TaskContext::default())
21612161
};

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,16 @@ impl GroupedHashAggregateStream {
673673
let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
674674
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
675675
// TODO: slice large `sorted` and write to multiple files in parallel
676-
writer.write(&sorted)?;
676+
let mut offset = 0;
677+
let total_rows = sorted.num_rows();
678+
679+
while offset < total_rows {
680+
let length = std::cmp::min(total_rows - offset, self.batch_size);
681+
let batch = sorted.slice(offset, length);
682+
offset += batch.num_rows();
683+
writer.write(&batch)?;
684+
}
685+
677686
writer.finish()?;
678687
self.spill_state.spills.push(spillfile);
679688
Ok(())

0 commit comments

Comments
 (0)