diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 4d9c1236137fb..9acce95a62773 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -306,11 +306,6 @@ impl DataBlock { self.meta.as_ref() } - #[inline] - pub fn meta(&self) -> Result> { - Ok(self.meta.clone()) - } - pub fn from_arrow_chunk>( arrow_chunk: &ArrowChunk, schema: &DataSchema, diff --git a/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs b/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs index 2e2db195aae1f..ab6b806aff455 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_sink_merge.rs @@ -103,7 +103,7 @@ impl Processor for ExchangeMergeSink { let mut meta = vec![]; meta.write_scalar_own(data_block.num_rows() as u32)?; - bincode::serialize_into(&mut meta, &data_block.meta()?) + bincode::serialize_into(&mut meta, &data_block.get_meta()) .map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?; let chunks = data_block.try_into()?; diff --git a/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs b/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs index 0bad9a9a3e2a2..5df6221c1b4cc 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_sink_shuffle.rs @@ -124,7 +124,7 @@ impl Processor for ExchangePublisherSink { let mut meta = vec![]; meta.write_scalar_own(data_block.num_rows() as u32)?; - bincode::serialize_into(&mut meta, &data_block.meta()?) + bincode::serialize_into(&mut meta, &data_block.get_meta()) .map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?; let chunks = data_block.try_into()?; diff --git a/src/query/service/src/api/rpc/exchange/exchange_transform.rs b/src/query/service/src/api/rpc/exchange/exchange_transform.rs index dc07e0d27e75a..62d53014f0239 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_transform.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_transform.rs @@ -232,7 +232,7 @@ impl Processor for ExchangeTransform { } else { let mut meta = vec![]; meta.write_scalar_own(data_block.num_rows() as u32)?; - bincode::serialize_into(&mut meta, &data_block.meta()?).map_err(|_| { + bincode::serialize_into(&mut meta, &data_block.get_meta()).map_err(|_| { ErrorCode::BadBytes("block meta serialize error when exchange") })?; diff --git a/src/query/service/src/api/rpc/flight_scatter_hash.rs b/src/query/service/src/api/rpc/flight_scatter_hash.rs index cba7c98c60eee..5cd2508bbf83f 100644 --- a/src/query/service/src/api/rpc/flight_scatter_hash.rs +++ b/src/query/service/src/api/rpc/flight_scatter_hash.rs @@ -124,10 +124,10 @@ impl FlightScatter for OneHashKeyFlightScatter { let indices = get_hash_values(&indices, num)?; let data_blocks = DataBlock::scatter(data_block, &indices, self.scatter_size)?; - let block_meta = data_block.meta()?; + let block_meta = data_block.get_meta(); let mut res = Vec::with_capacity(data_blocks.len()); for data_block in data_blocks { - res.push(data_block.add_meta(block_meta.clone())?); + res.push(data_block.add_meta(block_meta.cloned())?); } Ok(res) @@ -150,12 +150,12 @@ impl FlightScatter for HashFlightScatter { Ok(vec![0; num]) }?; - let block_meta = data_block.meta()?; + let block_meta = data_block.get_meta(); let data_blocks = DataBlock::scatter(data_block, &indices, self.scatter_size)?; let mut res = Vec::with_capacity(data_blocks.len()); for data_block in data_blocks { - res.push(data_block.add_meta(block_meta.clone())?); + res.push(data_block.add_meta(block_meta.cloned())?); } Ok(res) diff --git a/src/query/service/src/api/rpc/packets/packet_data_precommit.rs b/src/query/service/src/api/rpc/packets/packet_data_precommit.rs index d718e2252b5db..2f95b13894d6d 100644 --- a/src/query/service/src/api/rpc/packets/packet_data_precommit.rs +++ b/src/query/service/src/api/rpc/packets/packet_data_precommit.rs @@ -40,7 +40,7 @@ impl PrecommitBlock { pub fn write(self, bytes: &mut T) -> Result<()> { let data_block = self.0; - let serialized_meta = bincode::serialize(&data_block.meta()?).map_err_to_code( + let serialized_meta = bincode::serialize(&data_block.get_meta()).map_err_to_code( ErrorCode::BadBytes, || "precommit block serialize error when exchange", )?; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 392cafcf8b5b8..572eb8e792a68 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -323,16 +323,25 @@ impl PipelineBuilder { None, )?; + let pass_state_to_final = self.enable_memory_efficient_aggregator(¶ms); + self.main_pipeline.add_transform(|input, output| { TransformAggregator::try_create_partial( AggregatorTransformParams::try_create(input, output, ¶ms)?, self.ctx.clone(), + pass_state_to_final, ) })?; Ok(()) } + fn enable_memory_efficient_aggregator(&self, params: &Arc) -> bool { + self.ctx.get_cluster().is_empty() + && !params.group_columns.is_empty() + && self.main_pipeline.output_len() > 1 + } + fn build_aggregate_final(&mut self, aggregate: &AggregateFinal) -> Result<()> { self.build_pipeline(&aggregate.input)?; @@ -344,10 +353,7 @@ impl PipelineBuilder { aggregate.limit, )?; - if self.ctx.get_cluster().is_empty() - && !params.group_columns.is_empty() - && self.main_pipeline.output_len() > 1 - { + if self.enable_memory_efficient_aggregator(¶ms) { return efficiently_memory_final_aggregator(params, &mut self.main_pipeline); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs new file mode 100644 index 0000000000000..fe438a1428ce3 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_hashstate_info.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_expression::BlockMetaInfo; +use common_expression::BlockMetaInfoPtr; +use serde::Deserialize; +use serde::Deserializer; +use serde::Serialize; +use serde::Serializer; + +use crate::pipelines::processors::transforms::group_by::ArenaHolder; + +#[derive(Debug)] +pub struct AggregateHashStateInfo { + pub bucket: usize, + // a subhashtable state + pub hash_state: Box, + pub state_holder: Option, +} + +impl AggregateHashStateInfo { + pub fn create( + bucket: usize, + hash_state: Box, + state_holder: Option, + ) -> BlockMetaInfoPtr { + Box::new(AggregateHashStateInfo { + bucket, + hash_state, + state_holder, + }) + } +} + +impl Serialize for AggregateHashStateInfo { + fn serialize(&self, _: S) -> Result + where S: Serializer { + unreachable!("AggregateHashStateInfo does not support exchanging between multiple nodes") + } +} + +impl<'de> Deserialize<'de> for AggregateHashStateInfo { + fn deserialize(_: D) -> Result + where D: Deserializer<'de> { + unreachable!("AggregateHashStateInfo does not support exchanging between multiple nodes") + } +} + +#[typetag::serde(name = "aggregate_hash_state_info")] +impl BlockMetaInfo for AggregateHashStateInfo { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn clone_self(&self) -> Box { + unimplemented!("Unimplemented clone for AggregateHashStateInfo") + } + + fn equals(&self, _: &Box) -> bool { + unimplemented!("Unimplemented equals for AggregateHashStateInfo") + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs index 6c2f55cfc4676..557854f6755fa 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs @@ -32,8 +32,10 @@ use common_hashtable::HashtableLike; use tracing::info; use super::estimated_key_size; +use super::AggregateHashStateInfo; use crate::pipelines::processors::transforms::aggregator::aggregate_info::AggregateInfo; use crate::pipelines::processors::transforms::group_by::Area; +use crate::pipelines::processors::transforms::group_by::ArenaHolder; use crate::pipelines::processors::transforms::group_by::GroupColumnsBuilder; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; @@ -148,10 +150,11 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static method: Method, params: Arc, hash_table: Method::HashTable, + state_holders: Vec>, pub(crate) reach_limit: bool, - // used for deserialization only, so we can reuse it during the loop - temp_place: Option, + // used for deserialization only if has agg, so we can reuse it during the loop + temp_place: StateAddr, } impl BucketAggregator @@ -161,8 +164,8 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static let mut area = Area::create(); let hash_table = method.create_hash_table()?; let temp_place = match params.aggregate_functions.is_empty() { - true => None, - false => Some(params.alloc_layout(&mut area)), + true => StateAddr::new(0), + false => params.alloc_layout(&mut area), }; Ok(Self { @@ -171,15 +174,72 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static params, hash_table, reach_limit: false, + state_holders: Vec::with_capacity(16), temp_place, }) } + fn merge_partial_hashstates(&mut self, hashtable: &mut Method::HashTable) -> Result<()> { + if !HAS_AGG { + unsafe { + for key in hashtable.iter() { + let _ = self.hash_table.insert_and_entry(key.key()); + } + if let Some(limit) = self.params.limit { + if self.hash_table.len() >= limit { + return Ok(()); + } + } + } + } else { + let aggregate_functions = &self.params.aggregate_functions; + let offsets_aggregate_states = &self.params.offsets_aggregate_states; + + for entry in hashtable.iter() { + let key = entry.key(); + unsafe { + match self.hash_table.insert(key) { + Ok(e) => { + // just set new places and the arena will be keeped in partial state + e.write(*entry.get()); + } + Err(place) => { + // place already exists + // that means we should merge the aggregation + let place = StateAddr::new(*place); + let old_place = StateAddr::new(*entry.get()); + + for (idx, aggregate_function) in aggregate_functions.iter().enumerate() + { + let final_place = place.next(offsets_aggregate_states[idx]); + let state_place = old_place.next(offsets_aggregate_states[idx]); + aggregate_function.merge(final_place, state_place)?; + aggregate_function.drop_state(state_place); + } + } + } + } + } + } + hashtable.clear(); + Ok(()) + } + pub fn merge_blocks(&mut self, blocks: Vec) -> Result> { if blocks.is_empty() { return Ok(vec![]); } - for data_block in blocks { + + for mut data_block in blocks { + if let Some(mut meta) = data_block.take_meta() { + if let Some(info) = meta.as_mut_any().downcast_mut::() { + let hashtable = info.hash_state.downcast_mut::().unwrap(); + self.state_holders.push(info.state_holder.take()); + self.merge_partial_hashstates(hashtable)?; + continue; + } + } + let block = data_block.convert_to_full(); // 1.1 and 1.2. let aggregate_function_len = self.params.aggregate_functions.len(); @@ -224,17 +284,15 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static let aggregate_functions = &self.params.aggregate_functions; let offsets_aggregate_states = &self.params.offsets_aggregate_states; - if let Some(temp_place) = self.temp_place { - for (row, place) in places.iter() { - for (idx, aggregate_function) in aggregate_functions.iter().enumerate() { - let final_place = place.next(offsets_aggregate_states[idx]); - let state_place = temp_place.next(offsets_aggregate_states[idx]); - - let mut data = - unsafe { states_binary_columns[idx].index_unchecked(*row) }; - aggregate_function.deserialize(state_place, &mut data)?; - aggregate_function.merge(final_place, state_place)?; - } + + for (row, place) in places.iter() { + for (idx, aggregate_function) in aggregate_functions.iter().enumerate() { + let final_place = place.next(offsets_aggregate_states[idx]); + let state_place = self.temp_place.next(offsets_aggregate_states[idx]); + + let mut data = unsafe { states_binary_columns[idx].index_unchecked(*row) }; + aggregate_function.deserialize(state_place, &mut data)?; + aggregate_function.merge(final_place, state_place)?; } } } @@ -342,12 +400,8 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static places } -} -impl Drop for BucketAggregator -where Method: HashMethod + PolymorphicKeysHelper + Send + 'static -{ - fn drop(&mut self) { + fn drop_states(&mut self) { let aggregator_params = self.params.as_ref(); let aggregate_functions = &aggregator_params.aggregate_functions; let offsets_aggregate_states = &aggregator_params.offsets_aggregate_states; @@ -374,11 +428,20 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static } } - if let Some(temp_place) = self.temp_place { + if HAS_AGG { for (state_offset, function) in state_offsets.iter().zip(functions.iter()) { - let place = temp_place.next(*state_offset); + let place = self.temp_place.next(*state_offset); unsafe { function.drop_state(place) } } } + self.state_holders.clear(); + } +} + +impl Drop for BucketAggregator +where Method: HashMethod + PolymorphicKeysHelper + Send + 'static +{ + fn drop(&mut self) { + self.drop_states(); } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs index 8b7653ff24ba6..dac525bc50a4e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_partial.rs @@ -28,6 +28,7 @@ use common_hashtable::HashtableLike; use super::estimated_key_size; use crate::pipelines::processors::transforms::group_by::Area; +use crate::pipelines::processors::transforms::group_by::ArenaHolder; use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; use crate::pipelines::processors::transforms::transform_aggregator::Aggregator; @@ -39,24 +40,36 @@ where Method: HashMethod + PolymorphicKeysHelper pub states_dropped: bool, pub area: Option, + pub area_holder: Option, pub method: Method, pub hash_table: Method::HashTable, pub params: Arc, pub generated: bool, + pub input_rows: usize, + pub pass_state_to_final: bool, + pub two_level_mode: bool, } impl + Send> PartialAggregator { - pub fn create(method: Method, params: Arc) -> Result { + pub fn create( + method: Method, + params: Arc, + pass_state_to_final: bool, + ) -> Result { let hash_table = method.create_hash_table()?; Ok(Self { params, method, hash_table, area: Some(Area::create()), + area_holder: None, states_dropped: false, generated: false, + input_rows: 0, + pass_state_to_final, + two_level_mode: false, }) } @@ -160,10 +173,16 @@ impl + S .collect::>() } + pub fn try_holder_state(&mut self) { + let area = self.area.take(); + if area.is_some() { + self.area_holder = Some(ArenaHolder::create(area)); + } + } + #[inline(always)] fn generate_data(&mut self) -> Result> { if self.generated || self.hash_table.len() == 0 { - self.drop_states(); return Ok(vec![]); } self.generated = true; @@ -183,6 +202,8 @@ impl + S let mut group_key_builder = self .method .keys_column_builder(state_groups_len, value_size); + + // TODO use batch for group_entity in self.hash_table.iter() { let place = Into::::into(*group_entity.get()); @@ -216,6 +237,7 @@ impl + S const NAME: &'static str = "GroupByPartialTransform"; fn consume(&mut self, block: DataBlock) -> Result<()> { + self.input_rows += block.num_rows(); let block = block.convert_to_full(); // 1.1 and 1.2. let group_columns = Self::group_columns(&block, &self.params.group_columns); @@ -275,9 +297,9 @@ impl> } } } - - self.hash_table.clear(); drop(self.area.take()); + drop(self.area_holder.take()); + self.hash_table.clear(); self.states_dropped = true; } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs index 3d112f8889723..d5e665346108b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_twolevel.rs @@ -33,6 +33,7 @@ use tracing::info; use super::estimated_key_size; use crate::pipelines::processors::transforms::aggregator::aggregate_info::AggregateInfo; use crate::pipelines::processors::transforms::aggregator::aggregator_final_parallel::ParallelFinalAggregator; +use crate::pipelines::processors::transforms::aggregator::AggregateHashStateInfo; use crate::pipelines::processors::transforms::aggregator::PartialAggregator; use crate::pipelines::processors::transforms::aggregator::SingleStateAggregator; use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; @@ -109,11 +110,15 @@ where Ok(TwoLevelAggregator:: { inner: PartialAggregator::> { area: self.area.take(), + area_holder: None, params: self.params.clone(), states_dropped: false, method: two_level_method, hash_table: two_level_hashtable, generated: false, + input_rows: self.input_rows, + pass_state_to_final: self.pass_state_to_final, + two_level_mode: true, }, }) } @@ -155,6 +160,19 @@ where continue; } + if agg.pass_state_to_final { + let table = std::mem::replace(inner_table, agg.method.method.create_hash_table()?); + let rows = table.len(); + agg.try_holder_state(); + let meta = AggregateHashStateInfo::create( + bucket, + Box::new(table), + agg.area_holder.clone(), + ); + let block = DataBlock::new_with_meta(vec![], rows, Some(meta)); + return Ok(vec![block]); + } + let capacity = inner_table.len(); let iterator = inner_table.iter(); @@ -213,8 +231,11 @@ where return Ok(data_blocks); } - drop(agg.area.take()); - agg.states_dropped = true; + if !agg.pass_state_to_final { + drop(agg.area.take()); + drop(agg.area_holder.take()); + } + Ok(data_blocks) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs index 0a74b193b18df..9844a00225659 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod aggregate_hashstate_info; mod aggregate_info; mod aggregator_final; mod aggregator_final_parallel; @@ -21,6 +22,7 @@ mod aggregator_single_key; mod aggregator_twolevel; mod utils; +pub use aggregate_hashstate_info::AggregateHashStateInfo; pub use aggregate_info::AggregateInfo; pub use aggregate_info::OverflowInfo; pub use aggregator_final_parallel::BucketAggregator; diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs index d2b7fc667beda..74c9077793d80 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_keys_builder.rs @@ -49,25 +49,25 @@ impl<'a, T: Number> KeysColumnBuilder for FixedKeysColumnBuilder<'a, T> { } } -pub struct SerializedKeysColumnBuilder<'a> { +pub struct StringKeysColumnBuilder<'a> { pub inner_builder: StringColumnBuilder, - initial: usize, + _initial: usize, _phantom: PhantomData<&'a ()>, } -impl<'a> SerializedKeysColumnBuilder<'a> { +impl<'a> StringKeysColumnBuilder<'a> { pub fn create(capacity: usize, value_capacity: usize) -> Self { - SerializedKeysColumnBuilder { + StringKeysColumnBuilder { inner_builder: StringColumnBuilder::with_capacity(capacity, value_capacity), _phantom: PhantomData, - initial: value_capacity, + _initial: value_capacity, } } } -impl<'a> KeysColumnBuilder for SerializedKeysColumnBuilder<'a> { +impl<'a> KeysColumnBuilder for StringKeysColumnBuilder<'a> { type T = &'a [u8]; fn append_value(&mut self, v: &'a [u8]) { @@ -76,7 +76,7 @@ impl<'a> KeysColumnBuilder for SerializedKeysColumnBuilder<'a> { } fn finish(self) -> Column { - debug_assert!(self.initial == self.inner_builder.data.len()); + debug_assert_eq!(self._initial, self.inner_builder.data.len()); Column::String(self.inner_builder.build()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs index a4a18e774da72..b396520770054 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs @@ -45,7 +45,7 @@ use crate::pipelines::processors::transforms::group_by::aggregator_groups_builde use crate::pipelines::processors::transforms::group_by::aggregator_groups_builder::SerializedKeysGroupColumnsBuilder; use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::FixedKeysColumnBuilder; use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::KeysColumnBuilder; -use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::SerializedKeysColumnBuilder; +use crate::pipelines::processors::transforms::group_by::aggregator_keys_builder::StringKeysColumnBuilder; use crate::pipelines::processors::transforms::group_by::aggregator_keys_iter::FixedKeysColumnIter; use crate::pipelines::processors::transforms::group_by::aggregator_keys_iter::KeysColumnIter; use crate::pipelines::processors::transforms::group_by::aggregator_keys_iter::SerializedKeysColumnIter; @@ -66,7 +66,7 @@ use crate::pipelines::processors::AggregatorParams; // use common_expression::HashMethodSerializer; // use databend_query::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; // use databend_query::pipelines::processors::transforms::group_by::aggregator_state::SerializedKeysAggregatorState; -// use databend_query::pipelines::processors::transforms::group_by::aggregator_keys_builder::SerializedKeysColumnBuilder; +// use databend_query::pipelines::processors::transforms::group_by::aggregator_keys_builder::StringKeysColumnBuilder; // // impl PolymorphicKeysHelper for HashMethodSerializer { // type State = SerializedKeysAggregatorState; @@ -78,9 +78,9 @@ use crate::pipelines::processors::AggregatorParams; // } // } // -// type ColumnBuilder = SerializedKeysColumnBuilder; +// type ColumnBuilder = StringKeysColumnBuilder; // fn state_array_builder(&self, capacity: usize) -> Self::ColumnBuilder { -// SerializedKeysColumnBuilder { +// StringKeysColumnBuilder { // inner_builder: MutableStringColumn::with_capacity(capacity), // } // } @@ -89,7 +89,7 @@ use crate::pipelines::processors::AggregatorParams; pub trait PolymorphicKeysHelper { const SUPPORT_TWO_LEVEL: bool; - type HashTable: HashtableLike + Send; + type HashTable: HashtableLike + Send + Sync + 'static; fn create_hash_table(&self) -> Result; type ColumnBuilder<'a>: KeysColumnBuilder @@ -402,13 +402,13 @@ impl PolymorphicKeysHelper for HashMethodSingleString { Ok(UnsizedHashMap::new()) } - type ColumnBuilder<'a> = SerializedKeysColumnBuilder<'a>; + type ColumnBuilder<'a> = StringKeysColumnBuilder<'a>; fn keys_column_builder( &self, capacity: usize, value_capacity: usize, - ) -> SerializedKeysColumnBuilder<'_> { - SerializedKeysColumnBuilder::create(capacity, value_capacity) + ) -> StringKeysColumnBuilder<'_> { + StringKeysColumnBuilder::create(capacity, value_capacity) } type KeysColumnIter = SerializedKeysColumnIter; @@ -442,13 +442,13 @@ impl PolymorphicKeysHelper for HashMethodSerializer { Ok(SimpleUnsizedHashMap::new()) } - type ColumnBuilder<'a> = SerializedKeysColumnBuilder<'a>; + type ColumnBuilder<'a> = StringKeysColumnBuilder<'a>; fn keys_column_builder( &self, capacity: usize, value_capacity: usize, - ) -> SerializedKeysColumnBuilder<'_> { - SerializedKeysColumnBuilder::create(capacity, value_capacity) + ) -> StringKeysColumnBuilder<'_> { + StringKeysColumnBuilder::create(capacity, value_capacity) } type KeysColumnIter = SerializedKeysColumnIter; @@ -475,7 +475,7 @@ impl PolymorphicKeysHelper for HashMethodSerializer { #[derive(Clone)] pub struct TwoLevelHashMethod { - method: Method, + pub(crate) method: Method, } impl TwoLevelHashMethod { diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs index 95dfbb31ba107..cf94f65c35284 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_state.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::alloc::Layout; +use std::fmt::Debug; use std::ptr::NonNull; +use std::sync::Arc; use bumpalo::Bump; @@ -32,3 +34,26 @@ impl Area { } unsafe impl Send for Area {} + +#[derive(Clone)] +pub struct ArenaHolder { + _data: Arc>, +} + +impl ArenaHolder { + pub fn create(area: Option) -> ArenaHolder { + tracing::info!("Putting one arena into holder"); + ArenaHolder { + _data: Arc::new(area), + } + } +} + +impl Debug for ArenaHolder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArenaHolder").finish() + } +} + +unsafe impl Send for ArenaHolder {} +unsafe impl Sync for ArenaHolder {} diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs b/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs index f839227d36ad8..61984a0083a6c 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/mod.rs @@ -26,5 +26,6 @@ pub use aggregator_keys_iter::KeysColumnIter; pub use aggregator_polymorphic_keys::PolymorphicKeysHelper; pub use aggregator_polymorphic_keys::TwoLevelHashMethod; pub use aggregator_state::Area; +pub use aggregator_state::ArenaHolder; pub use aggregator_state_entity::StateEntityMutRef; pub use aggregator_state_entity::StateEntityRef; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs index 394c2f972b3f9..319fee9df4d9e 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_aggregator.rs @@ -69,6 +69,7 @@ impl TransformAggregator { pub fn try_create_partial( transform_params: AggregatorTransformParams, ctx: Arc, + pass_state_to_final: bool, ) -> Result { let aggregator_params = transform_params.aggregator_params.clone(); @@ -86,14 +87,22 @@ impl TransformAggregator { HashMethodKind::T(method) => AggregatorTransform::create( ctx, transform_params, - PartialAggregator::::create(method, aggregator_params)?, + PartialAggregator::::create( + method, + aggregator_params, + pass_state_to_final, + )?, ), }), false => with_mappedhash_method!(|T| match transform_params.method.clone() { HashMethodKind::T(method) => AggregatorTransform::create( ctx, transform_params, - PartialAggregator::::create(method, aggregator_params)?, + PartialAggregator::::create( + method, + aggregator_params, + pass_state_to_final, + )?, ), }), } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs index 68c55417526e6..b7b653bf4522a 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs @@ -37,6 +37,7 @@ use serde::Deserializer; use serde::Serialize; use serde::Serializer; +use super::aggregator::AggregateHashStateInfo; use crate::pipelines::processors::transforms::aggregator::AggregateInfo; use crate::pipelines::processors::transforms::aggregator::BucketAggregator; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; @@ -101,6 +102,8 @@ struct InputPortState { bucket: isize, } +/// A helper class that Map +/// AggregateInfo/AggregateHashStateInfo ---> ConvertGroupingMetaInfo { meta: blocks with Option } pub struct TransformConvertGrouping> { output: Arc, inputs: Vec, @@ -186,11 +189,10 @@ impl> TransformConvertGroupin } fn add_bucket(&mut self, data_block: DataBlock) -> isize { - let data_block_meta: Option<&AggregateInfo> = data_block + if let Some(info) = data_block .get_meta() - .and_then(|meta| meta.as_any().downcast_ref::()); - - if let Some(info) = data_block_meta { + .and_then(|meta| meta.as_any().downcast_ref::()) + { if info.overflow.is_none() && info.bucket > SINGLE_LEVEL_BUCKET_NUM { let bucket = info.bucket; match self.buckets_blocks.entry(bucket) { @@ -206,6 +208,23 @@ impl> TransformConvertGroupin } } + // check if it's local state + if let Some(info) = data_block + .get_meta() + .and_then(|meta| meta.as_any().downcast_ref::()) + { + let bucket = info.bucket as isize; + match self.buckets_blocks.entry(bucket) { + Entry::Vacant(v) => { + v.insert(vec![data_block]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(data_block); + } + }; + return bucket; + } + self.unsplitted_blocks.push(data_block); SINGLE_LEVEL_BUCKET_NUM } @@ -505,9 +524,9 @@ impl + Send + 'static> Proces fn process(&mut self) -> Result<()> { if let Some(mut data_block) = self.input_block.take() { let mut blocks = vec![]; - if let Some(meta) = data_block.take_meta() { - if let Some(meta) = meta.as_any().downcast_ref::() { - blocks.extend(meta.blocks.iter().cloned()); + if let Some(mut meta) = data_block.take_meta() { + if let Some(meta) = meta.as_mut_any().downcast_mut::() { + std::mem::swap(&mut blocks, &mut meta.blocks); } } diff --git a/src/query/storages/fuse/src/operations/operation_log.rs b/src/query/storages/fuse/src/operations/operation_log.rs index d5df741ebd2d2..0fbd1323cf364 100644 --- a/src/query/storages/fuse/src/operations/operation_log.rs +++ b/src/query/storages/fuse/src/operations/operation_log.rs @@ -53,10 +53,10 @@ impl TryFrom<&DataBlock> for AppendOperationLogEntry { fn try_from(block: &DataBlock) -> Result { let err = ErrorCode::Internal(format!( "invalid data block meta of AppendOperation log, {:?}", - block.meta() + block.get_meta() )); - if let Some(meta) = block.meta()? { + if let Some(meta) = block.get_meta() { let cast = meta.as_any().downcast_ref::(); return match cast { None => Err(err),