diff --git a/Cargo.lock b/Cargo.lock index fe49ac2627c3b..03fabef631af6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4022,7 +4022,6 @@ dependencies = [ "databend-common-storages-view", "databend-common-users", "databend-enterprise-data-mask-feature", - "databend-enterprise-hilbert-clustering", "databend-storages-common-cache", "databend-storages-common-io", "databend-storages-common-session", @@ -5109,6 +5108,7 @@ dependencies = [ "databend-enterprise-attach-table", "databend-enterprise-background-service", "databend-enterprise-data-mask-feature", + "databend-enterprise-hilbert-clustering", "databend-enterprise-inverted-index", "databend-enterprise-resources-management", "databend-enterprise-stream-handler", diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index e041cbf07a8db..fd509d25b8914 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -427,7 +427,6 @@ build_exceptions! { InvalidSessionState(4004), // recluster error codes - NoNeedToRecluster(4011), NoNeedToCompact(4012), UnsupportedClusterType(4013), diff --git a/src/query/ee/src/hilbert_clustering/handler.rs b/src/query/ee/src/hilbert_clustering/handler.rs index 627cd88b19b6a..b2f99ac96d799 100644 --- a/src/query/ee/src/hilbert_clustering/handler.rs +++ b/src/query/ee/src/hilbert_clustering/handler.rs @@ -20,11 +20,14 @@ use databend_common_catalog::plan::ReclusterInfoSideCar; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_expression::BlockThresholds; +use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; +use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT; use databend_common_storages_fuse::pruning::create_segment_location_vector; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::SegmentLocation; +use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; +use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use databend_enterprise_hilbert_clustering::HilbertClusteringHandler; use databend_enterprise_hilbert_clustering::HilbertClusteringHandlerWrapper; use databend_storages_common_table_meta::meta::ClusterStatistics; @@ -53,18 +56,18 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler { return Ok(None); }; - let block_thresholds = fuse_table.get_block_thresholds(); - let thresholds = BlockThresholds { - max_rows_per_block: block_thresholds.block_per_segment - * block_thresholds.max_rows_per_block, - min_rows_per_block: block_thresholds.block_per_segment - * block_thresholds.min_rows_per_block, - max_bytes_per_block: block_thresholds.block_per_segment - * block_thresholds.max_bytes_per_block, - max_bytes_per_file: block_thresholds.block_per_segment - * block_thresholds.max_bytes_per_file, - block_per_segment: block_thresholds.block_per_segment, - }; + let block_per_seg = + fuse_table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + let hilbert_clustering_min_bytes = + ctx.get_settings().get_hilbert_clustering_min_bytes()? as usize; + let max_bytes_per_block = fuse_table.get_option( + FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD, + DEFAULT_BLOCK_BUFFER_SIZE, + ); + let hilbert_min_bytes = std::cmp::max( + hilbert_clustering_min_bytes, + max_bytes_per_block * block_per_seg, + ); let segment_locations = snapshot.segments.clone(); let segment_locations = create_segment_location_vector(segment_locations, None); @@ -72,7 +75,7 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler { let chunk_size = max_threads * 4; let mut checker = ReclusterChecker::new( cluster_key_id, - thresholds, + hilbert_min_bytes, push_downs.as_ref().is_none_or(|v| v.filters.is_none()), ); 'FOR: for chunk in segment_locations.chunks(chunk_size) { @@ -99,12 +102,6 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler { return Ok(None); } - let rows_per_block = - block_thresholds.calc_rows_per_block(checker.total_size, checker.total_rows, 0) as u64; - let block_size = ctx.get_settings().get_max_block_size()?; - ctx.get_settings() - .set_max_block_size(rows_per_block.min(block_size))?; - let mut removed_statistics = Statistics::default(); let mut removed_segment_indexes = Vec::with_capacity(target_segments.len()); for (segment_loc, segment) in target_segments { @@ -137,11 +134,11 @@ impl RealHilbertClusteringHandler { struct ReclusterChecker { segments: Vec<(SegmentLocation, Arc)>, + last_segment: Option<(SegmentLocation, Arc)>, default_cluster_id: u32, - thresholds: BlockThresholds, - total_rows: usize, - total_size: usize, + hilbert_min_bytes: usize, + total_bytes: usize, finished: bool, // Whether the target segments is at the head of snapshot. @@ -149,70 +146,48 @@ struct ReclusterChecker { } impl ReclusterChecker { - fn new(default_cluster_id: u32, thresholds: BlockThresholds, head_of_snapshot: bool) -> Self { + fn new(default_cluster_id: u32, hilbert_min_bytes: usize, head_of_snapshot: bool) -> Self { Self { segments: vec![], + last_segment: None, default_cluster_id, - thresholds, - total_rows: 0, - total_size: 0, + hilbert_min_bytes, + total_bytes: 0, finished: false, head_of_snapshot, } } fn add(&mut self, location: SegmentLocation, segment: Arc) -> bool { - let row_count = segment.summary.row_count as usize; - let byte_size = segment.summary.uncompressed_byte_size as usize; - self.total_rows += row_count; - self.total_size += byte_size; - if !self - .thresholds - .check_large_enough(self.total_rows, self.total_size) - { - // totals < N - self.segments.push((location, segment)); - return false; - } - let segment_should_recluster = self.should_recluster(&segment, |v| { v.cluster_key_id != self.default_cluster_id || v.level != -1 }); - let mut retained = false; - if !self.head_of_snapshot || segment_should_recluster { - if self - .thresholds - .check_for_compact(self.total_rows, self.total_size) - { - // N <= totals < 2N - self.segments.push((location, segment)); - retained = true; - } else if segment_should_recluster { - // totals >= 2N - self.segments = vec![(location, segment)]; - self.total_rows = row_count; - self.total_size = byte_size; - self.finished = true; - return true; - } + + if segment_should_recluster || !self.head_of_snapshot { + self.total_bytes += segment.summary.uncompressed_byte_size as usize; + self.segments.push((location.clone(), segment.clone())); } - if self.check_for_recluster() { - if !retained { - self.total_rows -= row_count; - self.total_size -= byte_size; + if !segment_should_recluster || self.total_bytes >= self.hilbert_min_bytes { + if self.check_for_recluster() { + self.finished = true; + return true; } - self.finished = true; - return true; + self.last_segment = Some((location, segment)); + self.reset(); } - self.reset(); false } fn finalize(&mut self) -> Vec<(SegmentLocation, Arc)> { - if !self.finished && !self.check_for_recluster() { - return vec![]; + if !self.finished { + if let Some((location, segment)) = self.last_segment.take() { + self.segments.push((location, segment)); + } + if !self.check_for_recluster() { + return vec![]; + } } std::mem::take(&mut self.segments) } @@ -233,8 +208,7 @@ impl ReclusterChecker { } fn reset(&mut self) { - self.total_rows = 0; - self.total_size = 0; + self.total_bytes = 0; self.head_of_snapshot = false; self.segments.clear(); } diff --git a/src/query/functions/src/scalars/hilbert.rs b/src/query/functions/src/scalars/hilbert.rs index 615f2d6dab004..fa15a5416abc5 100644 --- a/src/query/functions/src/scalars/hilbert.rs +++ b/src/query/functions/src/scalars/hilbert.rs @@ -12,102 +12,172 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_expression::hilbert_index; +use databend_common_expression::types::ArgType; use databend_common_expression::types::ArrayType; use databend_common_expression::types::BinaryType; +use databend_common_expression::types::DataType; use databend_common_expression::types::GenericType; use databend_common_expression::types::NullableType; -use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberType; -use databend_common_expression::types::StringType; -use databend_common_expression::types::ALL_NUMERICS_TYPES; -use databend_common_expression::vectorize_with_builder_1_arg; +use databend_common_expression::types::ValueType; use databend_common_expression::vectorize_with_builder_2_arg; -use databend_common_expression::with_number_mapped_type; +use databend_common_expression::Column; use databend_common_expression::FixedLengthEncoding; +use databend_common_expression::Function; use databend_common_expression::FunctionDomain; +use databend_common_expression::FunctionEval; use databend_common_expression::FunctionRegistry; +use databend_common_expression::FunctionSignature; +use databend_common_expression::ScalarRef; +use databend_common_expression::Value; +/// Registers Hilbert curve related functions with the function registry. pub fn register(registry: &mut FunctionRegistry) { - registry.register_passthrough_nullable_1_arg::( - "hilbert_key", - |_, _| FunctionDomain::Full, - vectorize_with_builder_1_arg::(|val, builder, _| { - let bytes = val.as_bytes(); - builder.put_slice(bytes); - builder.commit_row(); - }), - ); + // Register the hilbert_range_index function that calculates Hilbert indices for multi-dimensional data + registry.register_function_factory("hilbert_range_index", |_, args_type| { + let args_num = args_type.len(); + // The function supports 2, 3, 4, or 5 dimensions (each dimension requires 2 arguments) + if ![4, 6, 8, 10].contains(&args_num) { + return None; + } - for ty in ALL_NUMERICS_TYPES { - with_number_mapped_type!(|NUM_TYPE| match ty { - NumberDataType::NUM_TYPE => { - registry - .register_passthrough_nullable_1_arg::, BinaryType, _, _>( - "hilbert_key", - |_, _| FunctionDomain::Full, - vectorize_with_builder_1_arg::, BinaryType>( - |val, builder, _| { - let encoded = val.encode(); - builder.put_slice(&encoded); - builder.commit_row(); - }, - ), - ); - } - }) - } + // Create the function signature with appropriate argument types + // For each dimension, we need: + // 1. A value (the point coordinate in that dimension) + // 2. An array of boundaries (for partitioning that dimension) + let sig_args_type = (0..args_num / 2) + .flat_map(|idx| { + [ + DataType::Nullable(Box::new(DataType::Generic(idx))), + DataType::Nullable(Box::new(DataType::Array(Box::new(DataType::Generic(idx))))), + ] + }) + .collect(); - registry.register_combine_nullable_2_arg::>, NumberType, BinaryType, _, _>( - "hilbert_index", - |_, _, _| FunctionDomain::Full, - vectorize_with_builder_2_arg::>, NumberType, NullableType>( - |val, len, builder, ctx| { - let mut points = Vec::with_capacity(val.len()); - for a in val.iter() { - if a.is_none() { - builder.push_null(); - return; + Some(Arc::new(Function { + signature: FunctionSignature { + name: "hilbert_range_index".to_string(), + args_type: sig_args_type, + return_type: DataType::Binary, + }, + eval: FunctionEval::Scalar { + calc_domain: Box::new(|_, _| FunctionDomain::Full), + eval: Box::new(move |args, ctx| { + // Determine if we're processing scalar values or columns + let input_all_scalars = args.iter().all(|arg| arg.as_scalar().is_some()); + let process_rows = if input_all_scalars { 1 } else { ctx.num_rows }; + let mut builder = BinaryType::create_builder(process_rows, &[]); + + for index in 0..process_rows { + let mut points = Vec::with_capacity(args_num / 2); + + // Process each dimension (each dimension has a value and boundaries array) + for i in (0..args_num).step_by(2) { + let arg1 = &args[i]; // The value in this dimension + let arg2 = &args[i + 1]; // The boundaries array for this dimension + + // Get the value and boundaries for this row + let val = unsafe { arg1.index_unchecked(index) }; + let arr = unsafe { arg2.index_unchecked(index) }; + + // Calculate the partition ID for this dimension (capped at 65535, i.e. 2 bytes or 16 bits) + // This effectively discretizes the continuous dimension into buckets + let id = arr + .as_array() + .map(|arr| calc_range_partition_id(val, arr).min(65535) as u16) + .unwrap_or(0); + + // Encode the partition ID as bytes + let key = id.encode(); + points.push(key); + } + + // Convert the multi-dimensional point to a Hilbert index + // This maps the n-dimensional point to a 1-dimensional value + let points = points + .iter() + .map(|array| array.as_slice()) + .collect::>(); + let slice = hilbert_index(&points, 2); + + // Store the Hilbert index in the result + builder.put_slice(&slice); + builder.commit_row(); + } + + // Return the appropriate result type based on input + if input_all_scalars { + Value::Scalar(BinaryType::upcast_scalar(BinaryType::build_scalar(builder))) + } else { + Value::Column(BinaryType::upcast_column(BinaryType::build_column(builder))) } - points.push(a.unwrap()); - } - let dimension = points.len(); - - if std::intrinsics::unlikely(len > 64) { - ctx.set_error(builder.len(), "Width must be less than or equal to 64"); - builder.push_null(); - } else if std::intrinsics::unlikely(!(2..=5).contains(&dimension)) { - ctx.set_error(builder.len(), "Dimension must between 2 and 5"); - builder.push_null(); - } else { - let slice = hilbert_index(&points, len as usize); - builder.push(&slice); - } + }), }, - ), - ); + })) + }); // This `range_partition_id(col, range_bounds)` function calculates the partition ID for each value // in the column based on the specified partition boundaries. // The column values are conceptually divided into multiple partitions defined by the range_bounds. // For example, given the column values (0, 1, 3, 6, 8) and a partition configuration with 3 partitions, // the range_bounds might be [1, 6]. The function would then return partition IDs as (0, 0, 1, 1, 2). - registry.register_passthrough_nullable_2_arg::, ArrayType>, NumberType, _, _>( + registry + .register_2_arg_core::, ArrayType>, NumberType, _, _>( + "range_partition_id", + |_, _, _| FunctionDomain::Full, + vectorize_with_builder_2_arg::< + GenericType<0>, + ArrayType>, + NumberType, + >(|val, arr, builder, _| { + let id = calc_range_partition_id(val, &arr); + builder.push(id); + }), + ); + + registry.register_2_arg_core::>, NullableType>>, NumberType, _, _>( "range_partition_id", |_, _, _| FunctionDomain::Full, - vectorize_with_builder_2_arg::, ArrayType>, NumberType>(|val, arr, builder, _| { - let mut low = 0; - let mut high = arr.len(); - while low < high { - let mid = low + ((high - low) / 2); - let bound = unsafe {arr.index_unchecked(mid)}; - if val > bound { - low = mid + 1; - } else { - high = mid; - } - } - builder.push(low as u64); + vectorize_with_builder_2_arg::>, NullableType>>, NumberType>(|val, arr, builder, _| { + let id = match (val, arr) { + (Some(val), Some(arr)) => calc_range_partition_id(val, &arr), + (None, Some(arr)) => arr.len() as u64, + _ => 0, + }; + builder.push(id); }), ); } + +/// Calculates the partition ID for a value based on range boundaries. +/// +/// # Arguments +/// * `val` - The value to find the partition for +/// * `arr` - The array of boundary values that define the partitions +/// +/// # Returns +/// * The partition ID as a u64 (0 to arr.len()) +/// +/// # Example +/// For boundaries [10, 20, 30]: +/// - Values < 10 get partition ID 0 +/// - Values >= 10 and < 20 get partition ID 1 +/// - Values >= 20 and < 30 get partition ID 2 +/// - Values >= 30 get partition ID 3 +fn calc_range_partition_id(val: ScalarRef, arr: &Column) -> u64 { + let mut low = 0; + let mut high = arr.len(); + while low < high { + let mid = low + ((high - low) / 2); + let bound = unsafe { arr.index_unchecked(mid) }; + if val > bound { + low = mid + 1; + } else { + high = mid; + } + } + low as u64 +} diff --git a/src/query/functions/tests/it/scalars/testdata/function_list.txt b/src/query/functions/tests/it/scalars/testdata/function_list.txt index 118ca79c3aa67..be3c1bd506fb6 100644 --- a/src/query/functions/tests/it/scalars/testdata/function_list.txt +++ b/src/query/functions/tests/it/scalars/testdata/function_list.txt @@ -1962,30 +1962,7 @@ Functions overloads: 1 h3_unidirectional_edge_is_valid(UInt64 NULL) :: Boolean NULL 0 haversine(Float64, Float64, Float64, Float64) :: Float64 1 haversine(Float64 NULL, Float64 NULL, Float64 NULL, Float64 NULL) :: Float64 NULL -0 hilbert_index(Array(Binary NULL), UInt64) :: Binary NULL -1 hilbert_index(Array(Binary NULL) NULL, UInt64 NULL) :: Binary NULL -0 hilbert_key(String) :: Binary -1 hilbert_key(String NULL) :: Binary NULL -2 hilbert_key(UInt8) :: Binary -3 hilbert_key(UInt8 NULL) :: Binary NULL -4 hilbert_key(UInt16) :: Binary -5 hilbert_key(UInt16 NULL) :: Binary NULL -6 hilbert_key(UInt32) :: Binary -7 hilbert_key(UInt32 NULL) :: Binary NULL -8 hilbert_key(UInt64) :: Binary -9 hilbert_key(UInt64 NULL) :: Binary NULL -10 hilbert_key(Int8) :: Binary -11 hilbert_key(Int8 NULL) :: Binary NULL -12 hilbert_key(Int16) :: Binary -13 hilbert_key(Int16 NULL) :: Binary NULL -14 hilbert_key(Int32) :: Binary -15 hilbert_key(Int32 NULL) :: Binary NULL -16 hilbert_key(Int64) :: Binary -17 hilbert_key(Int64 NULL) :: Binary NULL -18 hilbert_key(Float32) :: Binary -19 hilbert_key(Float32 NULL) :: Binary NULL -20 hilbert_key(Float64) :: Binary -21 hilbert_key(Float64 NULL) :: Binary NULL +0 hilbert_range_index FACTORY 0 humanize_number(Float64) :: String 1 humanize_number(Float64 NULL) :: String NULL 0 humanize_size(Float64) :: String @@ -3227,7 +3204,7 @@ Functions overloads: 0 range(UInt64, UInt64) :: Array(UInt64) 1 range(UInt64 NULL, UInt64 NULL) :: Array(UInt64) NULL 0 range_partition_id(T0, Array(T0)) :: UInt64 -1 range_partition_id(T0 NULL, Array(T0) NULL) :: UInt64 NULL +1 range_partition_id(T0 NULL, Array(T0) NULL) :: UInt64 0 regexp(String, String) :: Boolean 1 regexp(String NULL, String NULL) :: Boolean NULL 0 regexp_instr FACTORY diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 24e594ce13305..a215a2c11f6f8 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -107,6 +107,7 @@ databend-enterprise-aggregating-index = { workspace = true } databend-enterprise-attach-table = { workspace = true } databend-enterprise-background-service = { workspace = true } databend-enterprise-data-mask-feature = { workspace = true } +databend-enterprise-hilbert-clustering = { workspace = true } databend-enterprise-inverted-index = { workspace = true } databend-enterprise-resources-management = { workspace = true } databend-enterprise-stream-handler = { workspace = true } diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 362ed5e2ceb14..239f661b05830 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -40,7 +40,6 @@ use databend_common_sql::plans::InsertInputSource; use databend_common_sql::plans::Mutation; use databend_common_sql::plans::OptimizeCompactBlock; use databend_common_sql::plans::PresignAction; -use databend_common_sql::plans::Recluster; use databend_common_sql::plans::RewriteKind; use databend_common_sql::Planner; use databend_common_users::RoleCacheManager; @@ -1023,9 +1022,7 @@ impl AccessChecker for PrivilegeAccess { Plan::DropTableClusterKey(plan) => { self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Drop, false, false).await? } - Plan::ReclusterTable{s_expr, ..} => { - let plan: Recluster = s_expr.plan().clone().try_into()?; - // UDF has been disabled in recluster, no need to check udf privilege access. + Plan::ReclusterTable(plan) => { self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false, false).await? } Plan::TruncateTable(plan) => { diff --git a/src/query/service/src/interpreters/common/table_option_validation.rs b/src/query/service/src/interpreters/common/table_option_validation.rs index 2ce3bc929e5a2..6dd810d758b77 100644 --- a/src/query/service/src/interpreters/common/table_option_validation.rs +++ b/src/query/service/src/interpreters/common/table_option_validation.rs @@ -110,6 +110,7 @@ pub static UNSET_TABLE_OPTIONS_WHITE_LIST: LazyLock> = Laz r.insert(FUSE_OPT_KEY_BLOCK_PER_SEGMENT); r.insert(FUSE_OPT_KEY_ROW_PER_BLOCK); r.insert(FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD); + r.insert(FUSE_OPT_KEY_FILE_SIZE); r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD); r.insert(FUSE_OPT_KEY_FILE_SIZE); r.insert(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS); diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 199212b02006d..775d07066cade 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -26,7 +26,7 @@ use databend_common_pipeline_core::Pipeline; use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::optimizer::SExpr; use databend_common_sql::plans::OptimizeCompactBlock; -use databend_common_sql::plans::Recluster; +use databend_common_sql::plans::ReclusterPlan; use databend_common_sql::plans::RelOperator; use databend_storages_common_table_meta::table::ClusterType; use log::info; @@ -218,21 +218,16 @@ async fn compact_table( &compact_target.table, )?; ctx.set_enable_sort_spill(false); - let recluster = RelOperator::Recluster(Recluster { + let recluster = ReclusterPlan { catalog: compact_target.catalog, database: compact_target.database, table: compact_target.table, limit: Some(settings.get_auto_compaction_segments_limit()? as usize), - filters: None, - }); - let s_expr = SExpr::create_leaf(Arc::new(recluster)); - let recluster_interpreter = ReclusterTableInterpreter::try_create( - ctx.clone(), - s_expr, - None, - lock_opt, - false, - )?; + selection: None, + is_final: false, + }; + let recluster_interpreter = + ReclusterTableInterpreter::try_create(ctx.clone(), recluster, lock_opt)?; // Recluster will be done in `ReclusterTableInterpreter::execute2` directly, // we do not need to use `PipelineCompleteExecutor` to execute it. let build_res = recluster_interpreter.execute2().await?; diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 7cdf2e4867511..ca3794c5575ad 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -250,7 +250,7 @@ async fn plan_sql( let extras = planner.parse_sql(sql)?; if !acquire_queue { // If queue guard is not required, plan the statement directly. - let plan = planner.plan_stmt(&extras.statement, true).await?; + let plan = planner.plan_stmt(&extras.statement).await?; return Ok((plan, extras, AcquireQueueGuard::create(None))); } @@ -261,11 +261,11 @@ async fn plan_sql( // See PR https://github.com/databendlabs/databend/pull/16632 let query_entry = QueryEntry::create_entry(&ctx, &extras, true)?; let guard = QueriesQueueManager::instance().acquire(query_entry).await?; - let plan = planner.plan_stmt(&extras.statement, true).await?; + let plan = planner.plan_stmt(&extras.statement).await?; Ok((plan, extras, guard)) } else { // No lock is needed, plan the statement first, then acquire the queue guard. - let plan = planner.plan_stmt(&extras.statement, true).await?; + let plan = planner.plan_stmt(&extras.statement).await?; let query_entry = QueryEntry::create(&ctx, &plan, &extras)?; let guard = QueriesQueueManager::instance().acquire(query_entry).await?; Ok((plan, extras, guard)) diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 4e91fd985d565..2bda356ad91ec 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -338,16 +338,10 @@ impl InterpreterFactory { Plan::DropTableClusterKey(drop_table_cluster_key) => Ok(Arc::new( DropTableClusterKeyInterpreter::try_create(ctx, *drop_table_cluster_key.clone())?, )), - Plan::ReclusterTable { - s_expr, - hilbert_query, - is_final, - } => Ok(Arc::new(ReclusterTableInterpreter::try_create( + Plan::ReclusterTable(recluster) => Ok(Arc::new(ReclusterTableInterpreter::try_create( ctx, - *s_expr.clone(), - hilbert_query.clone(), + *recluster.clone(), LockTableOption::LockWithRetry, - *is_final, )?)), Plan::TruncateTable(truncate_table) => Ok(Arc::new( TruncateTableInterpreter::try_create(ctx, *truncate_table.clone())?, diff --git a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs index eff2c034798b6..c3bd900836bc1 100644 --- a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs +++ b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs @@ -338,7 +338,10 @@ fn not(expr: ScalarExpr) -> ScalarExpr { }) } -fn scalar_expr_to_remote_expr(expr: &ScalarExpr, block_schema: &DataSchema) -> Result { +pub(crate) fn scalar_expr_to_remote_expr( + expr: &ScalarExpr, + block_schema: &DataSchema, +) -> Result { let expr = expr .as_expr()? .project_column_ref(|col| block_schema.index_of(&col.index.to_string()).unwrap()); diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 9c9f08633d285..99d7a7012e9e8 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -12,59 +12,92 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; -use databend_common_ast::ast::Query; -use databend_common_ast::ast::Statement; use databend_common_catalog::lock::LockTableOption; +use databend_common_catalog::plan::Filters; +use databend_common_catalog::plan::PartInfoType; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::ReclusterInfoSideCar; +use databend_common_catalog::plan::ReclusterParts; +use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::type_check::check_function; +use databend_common_expression::DataBlock; +use databend_common_expression::Scalar; +use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_license::license::Feature; +use databend_common_license::license_manager::LicenseManagerSwitch; +use databend_common_meta_app::schema::TableInfo; +use databend_common_pipeline_core::always_callback; +use databend_common_pipeline_core::ExecutionInfo; +use databend_common_sql::bind_table; +use databend_common_sql::executor::cast_expr_to_non_null_boolean; +use databend_common_sql::executor::physical_plans::CommitSink; +use databend_common_sql::executor::physical_plans::CommitType; +use databend_common_sql::executor::physical_plans::CompactSource; +use databend_common_sql::executor::physical_plans::Exchange; +use databend_common_sql::executor::physical_plans::FragmentKind; +use databend_common_sql::executor::physical_plans::HilbertPartition; +use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_sql::executor::physical_plans::Recluster; +use databend_common_sql::executor::PhysicalPlan; use databend_common_sql::executor::PhysicalPlanBuilder; -use databend_common_sql::optimizer::SExpr; +use databend_common_sql::plans::plan_hilbert_sql; +use databend_common_sql::plans::replace_with_constant; use databend_common_sql::plans::set_update_stream_columns; +use databend_common_sql::plans::BoundColumnRef; use databend_common_sql::plans::Plan; -use databend_common_sql::plans::Recluster; +use databend_common_sql::plans::ReclusterPlan; +use databend_common_sql::query_executor::QueryExecutor; +use databend_common_sql::IdentifierNormalizer; use databend_common_sql::MetadataRef; -use databend_common_sql::Planner; +use databend_common_sql::NameResolutionContext; +use databend_common_sql::ScalarExpr; +use databend_common_sql::TypeChecker; +use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler; +use databend_storages_common_table_meta::meta::TableMetaTimestamps; +use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::table::ClusterType; +use derive_visitor::DriveMut; use log::error; use log::warn; +use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table; use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir; use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; +use crate::interpreters::interpreter_insert_multi_table::scalar_expr_to_remote_expr; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; +use crate::schedulers::ServiceQueryExecutor; use crate::sessions::QueryContext; use crate::sessions::TableContext; pub struct ReclusterTableInterpreter { ctx: Arc, - s_expr: SExpr, - hilbert_query: Option>, + plan: ReclusterPlan, lock_opt: LockTableOption, - is_final: bool, } impl ReclusterTableInterpreter { pub fn try_create( ctx: Arc, - s_expr: SExpr, - hilbert_query: Option>, + plan: ReclusterPlan, lock_opt: LockTableOption, - is_final: bool, ) -> Result { Ok(Self { ctx, - s_expr, - hilbert_query, + plan, lock_opt, - is_final, }) } } @@ -85,9 +118,11 @@ impl Interpreter for ReclusterTableInterpreter { let recluster_timeout_secs = ctx.get_settings().get_recluster_timeout_secs()?; let mut times = 0; + let mut push_downs = None; + let mut hilbert_info = None; let start = SystemTime::now(); let timeout = Duration::from_secs(recluster_timeout_secs); - let plan: Recluster = self.s_expr.plan().clone().try_into()?; + let is_final = self.plan.is_final; loop { if let Err(err) = ctx.check_aborting() { error!( @@ -96,7 +131,9 @@ impl Interpreter for ReclusterTableInterpreter { return Err(err.with_context("failed to execute")); } - let res = self.execute_recluster(plan.clone()).await; + let res = self + .execute_recluster(&mut push_downs, &mut hilbert_info) + .await; match res { Ok(is_break) => { @@ -105,7 +142,7 @@ impl Interpreter for ReclusterTableInterpreter { } } Err(e) => { - if self.is_final + if is_final && matches!( e.code(), ErrorCode::TABLE_LOCK_EXPIRED @@ -132,7 +169,7 @@ impl Interpreter for ReclusterTableInterpreter { ctx.set_status_info(&status); } - if !self.is_final { + if !is_final { break; } @@ -143,10 +180,6 @@ impl Interpreter for ReclusterTableInterpreter { ); break; } - - self.ctx.clear_selected_segment_locations(); - self.ctx - .evict_table_from_cache(&plan.catalog, &plan.database, &plan.table)?; } Ok(PipelineBuildResult::create()) @@ -154,60 +187,85 @@ impl Interpreter for ReclusterTableInterpreter { } impl ReclusterTableInterpreter { - async fn execute_recluster(&self, op: Recluster) -> Result { + async fn execute_recluster( + &self, + push_downs: &mut Option, + hilbert_info: &mut Option, + ) -> Result { let start = SystemTime::now(); + let settings = self.ctx.get_settings(); + let ReclusterPlan { + catalog, + database, + table, + limit, + .. + } = &self.plan; // try to add lock table. let lock_guard = self .ctx .clone() - .acquire_table_lock(&op.catalog, &op.database, &op.table, &self.lock_opt) + .acquire_table_lock(catalog, database, table, &self.lock_opt) .await?; - let tbl = self - .ctx - .get_table(&op.catalog, &op.database, &op.table) - .await?; - let (s_expr, metadata, required) = if let Some(hilbert) = &self.hilbert_query { - let mut planner = Planner::new(self.ctx.clone()); - let plan = planner - .plan_stmt(&Statement::Query(hilbert.clone()), false) - .await?; - let Plan::Query { - mut s_expr, - metadata, - bind_context, - .. - } = plan - else { - unreachable!() - }; - if tbl.change_tracking_enabled() { - *s_expr = set_update_stream_columns(&s_expr)?; - } - let s_expr = self.s_expr.replace_children(vec![Arc::new(*s_expr)]); - (s_expr, metadata, bind_context.column_set()) - } else { - (self.s_expr.clone(), MetadataRef::default(), HashSet::new()) + let tbl = self.ctx.get_table(catalog, database, table).await?; + // check mutability + tbl.check_mutable()?; + let Some(cluster_type) = tbl.cluster_type() else { + return Err(ErrorCode::UnclusteredTable(format!( + "Unclustered table '{}.{}'", + database, table, + ))); }; - let mut builder = PhysicalPlanBuilder::new(metadata, self.ctx.clone(), false); - let physical_plan = match builder.build(&s_expr, required).await { - Ok(res) => res, - Err(e) => { - return if e.code() == ErrorCode::NO_NEED_TO_RECLUSTER { - Ok(true) - } else { - Err(e) - }; + self.build_push_downs(push_downs, &tbl)?; + + let physical_plan = match cluster_type { + ClusterType::Hilbert => { + self.build_hilbert_plan(&tbl, push_downs, hilbert_info) + .await? } + ClusterType::Linear => self.build_linear_plan(&tbl, push_downs, *limit).await?, }; - + let Some(mut physical_plan) = physical_plan else { + return Ok(true); + }; + physical_plan.adjust_plan_id(&mut 0); let mut build_res = build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; + { + let ctx = self.ctx.clone(); + let catalog = self.plan.catalog.clone(); + let database = self.plan.database.clone(); + let table = self.plan.table.clone(); + build_res.main_pipeline.set_on_finished(always_callback( + move |info: &ExecutionInfo| { + ctx.clear_written_segment_locations()?; + ctx.clear_selected_segment_locations(); + ctx.evict_table_from_cache(&catalog, &database, &table)?; + + ctx.unload_spill_meta(); + hook_clear_m_cte_temp_table(&ctx)?; + hook_vacuum_temp_files(&ctx)?; + hook_disk_temp_dir(&ctx)?; + match &info.res { + Ok(_) => { + InterpreterClusteringHistory::write_log( + &ctx, start, &database, &table, + )?; + + Ok(()) + } + Err(error) => Err(error.clone()), + } + }, + )); + } + debug_assert!(build_res.main_pipeline.is_complete_pipeline()?); - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let max_threads = settings.get_max_threads()? as usize; build_res.set_max_threads(max_threads); let executor_settings = ExecutorSettings::try_create(self.ctx.clone())?; @@ -217,19 +275,475 @@ impl ReclusterTableInterpreter { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; - self.ctx.clear_written_segment_locations()?; self.ctx.set_executor(complete_executor.get_inner())?; complete_executor.execute()?; + // make sure the executor is dropped before the next loop. drop(complete_executor); // make sure the lock guard is dropped before the next loop. drop(lock_guard); - // vacuum temp files. - hook_vacuum_temp_files(&self.ctx)?; - hook_disk_temp_dir(&self.ctx)?; - - InterpreterClusteringHistory::write_log(&self.ctx, start, &op.database, &op.table)?; Ok(false) } + + /// Builds physical plan for Hilbert clustering. + /// # Arguments + /// * `tbl` - Reference to the table being reclustered + /// * `push_downs` - Optional filter conditions to push down to storage + /// * `hilbert_info` - Cached Hilbert mapping information (built if None) + /// # Returns + /// * `Result>` - The physical plan if reclustering is needed, None otherwise + async fn build_hilbert_plan( + &self, + tbl: &Arc, + push_downs: &mut Option, + hilbert_info: &mut Option, + ) -> Result> { + LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Feature::HilbertClustering)?; + let handler = get_hilbert_clustering_handler(); + let Some((recluster_info, snapshot)) = handler + .do_hilbert_clustering(tbl.clone(), self.ctx.clone(), push_downs.clone()) + .await? + else { + // No reclustering needed (e.g., table already optimally clustered) + return Ok(None); + }; + + let settings = self.ctx.get_settings(); + let table_info = tbl.get_table_info().clone(); + let scan_progress_value = self.ctx.get_scan_progress_value(); + + let block_thresholds = tbl.get_block_thresholds(); + let total_bytes = recluster_info.removed_statistics.uncompressed_byte_size as usize; + let total_rows = recluster_info.removed_statistics.row_count as usize; + let total_compressed = recluster_info.removed_statistics.compressed_byte_size as usize; + + // Determine rows per block based on data size and compression ratio + let rows_per_block = + block_thresholds.calc_rows_per_block(total_bytes, total_rows, total_compressed); + + // Calculate initial partition count based on data volume and block size + let mut total_partitions = std::cmp::max(total_rows / rows_per_block, 1); + + // Adjust number of partitions according to the block size thresholds + if total_partitions < block_thresholds.block_per_segment + && block_thresholds.check_perfect_segment( + block_thresholds.block_per_segment, // this effectively by-pass the total_blocks criteria + total_rows, + total_bytes, + total_compressed, + ) + { + total_partitions = block_thresholds.block_per_segment; + } + + warn!( + "Do hilbert recluster, total_bytes: {}, total_rows: {}, total_partitions: {}", + total_bytes, total_rows, total_partitions + ); + + // Create a subquery executor for running Hilbert mapping calculations + let subquery_executor = Arc::new(ServiceQueryExecutor::new(QueryContext::create_from( + self.ctx.as_ref(), + ))); + + let partitions = settings.get_hilbert_num_range_ids()? as usize; + + // Ensure Hilbert mapping information is built (if not already) + self.build_hilbert_info(tbl, hilbert_info).await?; + let HilbertBuildInfo { + keys_bound, + index_bound, + query, + } = hilbert_info.as_ref().unwrap(); + + // Variables will store the calculated bounds for Hilbert mapping + let mut variables = VecDeque::new(); + + // Execute the `kyes_bound` plan to calculate bounds for each clustering key + let keys_bounds = self + .execute_hilbert_plan( + &subquery_executor, + keys_bound, + std::cmp::max(total_partitions, partitions), + &variables, + tbl, + ) + .await?; + + // Store each clustering key's bounds in the variables collection + for entry in keys_bounds.columns().iter() { + let v = entry.value.index(0).unwrap().to_owned(); + variables.push_back(v); + } + + // Execute the `index_bound` plan to calculate the Hilbert index bounds + // i.e. `range_bound(..)(hilbert_range_index(..))` + let index_bounds = self + .execute_hilbert_plan( + &subquery_executor, + index_bound, + total_partitions, + &variables, + tbl, + ) + .await?; + + // Add the Hilbert index bound to the front of variables + let val = index_bounds.value_at(0, 0).unwrap().to_owned(); + variables.push_front(val); + + // Reset the scan progress to its original value + self.ctx.get_scan_progress().set(&scan_progress_value); + + let Plan::Query { + s_expr, + metadata, + bind_context, + .. + } = query + else { + unreachable!("Expected a Query plan, but got {:?}", query.kind()); + }; + + // Replace placeholders in the expression + // `range_partition_id(hilbert_range_index(cluster_key, [$key_range_bound], ..), [$hilbert_index_range_bound])` + // with calculated constants. + let mut s_expr = replace_with_constant(s_expr, &variables, total_partitions as u16); + + if tbl.change_tracking_enabled() { + s_expr = set_update_stream_columns(&s_expr)?; + } + + metadata.write().replace_all_tables(tbl.clone()); + let mut builder = PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false); + let mut plan = Box::new(builder.build(&s_expr, bind_context.column_set()).await?); + + // Check if the plan already has an exchange operator + let mut is_exchange = false; + if let PhysicalPlan::Exchange(Exchange { + input, + kind: FragmentKind::Merge, + .. + }) = plan.as_ref() + { + is_exchange = true; + plan = input.clone(); + } + + // Determine if we need distributed execution + let cluster = self.ctx.get_cluster(); + let is_distributed = is_exchange || !cluster.is_empty(); + + // For distributed execution, add an exchange operator to distribute work + if is_distributed { + // Create an expression for the partition column, + // i.e.`range_partition_id(hilbert_range_index({hilbert_keys_str}), [...]) AS _predicate` + let expr = scalar_expr_to_remote_expr( + &ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: bind_context.columns.last().unwrap().clone(), + }), + plan.output_schema()?.as_ref(), + )?; + + // Add exchange operator for data distribution, + // shuffling data based on the hash of range partition IDs derived from the Hilbert index. + plan = Box::new(PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: plan, + kind: FragmentKind::Normal, + keys: vec![expr], + allow_adjust_parallelism: true, + ignore_exchange: false, + })); + } + + let table_meta_timestamps = self + .ctx + .get_table_meta_timestamps(tbl.as_ref(), Some(snapshot.clone()))?; + + // Create the Hilbert partition physical plan, + // collecting data into partitions and persist them + let plan = PhysicalPlan::HilbertPartition(Box::new(HilbertPartition { + plan_id: 0, + input: plan, + table_info: table_info.clone(), + num_partitions: total_partitions, + table_meta_timestamps, + })); + + // Finally, commit the newly clustered table + Ok(Some(Self::add_commit_sink( + plan, + is_distributed, + table_info, + snapshot, + false, + Some(recluster_info), + table_meta_timestamps, + ))) + } + + async fn build_linear_plan( + &self, + tbl: &Arc, + push_downs: &mut Option, + limit: Option, + ) -> Result> { + let Some((parts, snapshot)) = tbl + .recluster(self.ctx.clone(), push_downs.clone(), limit) + .await? + else { + return Ok(None); + }; + if parts.is_empty() { + return Ok(None); + } + let table_meta_timestamps = self + .ctx + .get_table_meta_timestamps(tbl.as_ref(), Some(snapshot.clone()))?; + + let table_info = tbl.get_table_info().clone(); + let is_distributed = parts.is_distributed(self.ctx.clone()); + let plan = match parts { + ReclusterParts::Recluster { + tasks, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + } => { + let root = PhysicalPlan::Recluster(Box::new(Recluster { + tasks, + table_info: table_info.clone(), + plan_id: u32::MAX, + table_meta_timestamps, + })); + + Self::add_commit_sink( + root, + is_distributed, + table_info, + snapshot, + false, + Some(ReclusterInfoSideCar { + merged_blocks: remained_blocks, + removed_segment_indexes, + removed_statistics: removed_segment_summary, + }), + table_meta_timestamps, + ) + } + ReclusterParts::Compact(parts) => { + let merge_meta = parts.partitions_type() == PartInfoType::LazyLevel; + let root = PhysicalPlan::CompactSource(Box::new(CompactSource { + parts, + table_info: table_info.clone(), + column_ids: snapshot.schema.to_leaf_column_id_set(), + plan_id: u32::MAX, + table_meta_timestamps, + })); + + Self::add_commit_sink( + root, + is_distributed, + table_info, + snapshot, + merge_meta, + None, + table_meta_timestamps, + ) + } + }; + Ok(Some(plan)) + } + + fn build_push_downs( + &self, + push_downs: &mut Option, + tbl: &Arc, + ) -> Result<()> { + if push_downs.is_none() { + if let Some(expr) = &self.plan.selection { + let settings = self.ctx.get_settings(); + let (mut bind_context, metadata) = bind_table(tbl.clone())?; + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let mut type_checker = TypeChecker::try_create( + &mut bind_context, + self.ctx.clone(), + &name_resolution_ctx, + metadata, + &[], + true, + )?; + let (scalar, _) = *type_checker.resolve(expr)?; + // prepare the filter expression + let filter = cast_expr_to_non_null_boolean( + scalar + .as_expr()? + .project_column_ref(|col| col.column_name.clone()), + )?; + // prepare the inverse filter expression + let inverted_filter = + check_function(None, "not", &[], &[filter.clone()], &BUILTIN_FUNCTIONS)?; + *push_downs = Some(PushDownInfo { + filters: Some(Filters { + filter: filter.as_remote_expr(), + inverted_filter: inverted_filter.as_remote_expr(), + }), + ..PushDownInfo::default() + }); + } + } + Ok(()) + } + + async fn build_hilbert_info( + &self, + tbl: &Arc, + hilbert_info: &mut Option, + ) -> Result<()> { + if hilbert_info.is_some() { + return Ok(()); + } + + let database = &self.plan.database; + let table = &self.plan.table; + let settings = self.ctx.get_settings(); + let sample_size = settings.get_hilbert_sample_size_per_block()?; + + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let ast_exprs = tbl.resolve_cluster_keys(self.ctx.clone()).unwrap(); + let cluster_keys_len = ast_exprs.len(); + let mut cluster_key_strs = Vec::with_capacity(cluster_keys_len); + for mut ast in ast_exprs { + let mut normalizer = IdentifierNormalizer { + ctx: &name_resolution_ctx, + }; + ast.drive_mut(&mut normalizer); + cluster_key_strs.push(format!("{:#}", &ast)); + } + + let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len()); + let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len()); + for cluster_key_str in cluster_key_strs.into_iter() { + keys_bounds.push(format!( + "range_bound(1000, {sample_size})({cluster_key_str})" + )); + + hilbert_keys.push(format!("{table}.{cluster_key_str}, []")); + } + let hilbert_keys_str = hilbert_keys.join(", "); + + let keys_bounds_query = + format!("SELECT {} FROM {database}.{table}", keys_bounds.join(", ")); + let keys_bound = + plan_hilbert_sql(self.ctx.clone(), MetadataRef::default(), &keys_bounds_query).await?; + + let index_bound_query = format!( + "SELECT \ + range_bound(1000, {sample_size})(hilbert_range_index({hilbert_keys_str})) \ + FROM {database}.{table}" + ); + let index_bound = + plan_hilbert_sql(self.ctx.clone(), MetadataRef::default(), &index_bound_query).await?; + + let quote = settings.get_sql_dialect()?.default_ident_quote(); + let schema = tbl.schema_with_stream(); + let mut output_with_table = Vec::with_capacity(schema.fields.len()); + for field in &schema.fields { + output_with_table.push(format!( + "{quote}{table}{quote}.{quote}{}{quote}", + field.name + )); + } + let output_with_table_str = output_with_table.join(", "); + let query = format!( + "SELECT \ + {output_with_table_str}, \ + range_partition_id(hilbert_range_index({hilbert_keys_str}), [])AS _predicate \ + FROM {database}.{table}" + ); + let query = plan_hilbert_sql(self.ctx.clone(), MetadataRef::default(), &query).await?; + + *hilbert_info = Some(HilbertBuildInfo { + keys_bound, + index_bound, + query, + }); + Ok(()) + } + + async fn execute_hilbert_plan( + &self, + executor: &Arc, + plan: &Plan, + partitions: usize, + variables: &VecDeque, + tbl: &Arc, + ) -> Result { + let Plan::Query { + s_expr, + metadata, + bind_context, + .. + } = plan + else { + unreachable!() + }; + + let s_expr = replace_with_constant(s_expr, variables, partitions as u16); + metadata.write().replace_all_tables(tbl.clone()); + let mut builder = PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false); + let plan = builder.build(&s_expr, bind_context.column_set()).await?; + let data_blocks = executor.execute_query_with_physical_plan(&plan).await?; + DataBlock::concat(&data_blocks) + } + + fn add_commit_sink( + input: PhysicalPlan, + is_distributed: bool, + table_info: TableInfo, + snapshot: Arc, + merge_meta: bool, + recluster_info: Option, + table_meta_timestamps: TableMetaTimestamps, + ) -> PhysicalPlan { + let plan = if is_distributed { + PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(input), + kind: FragmentKind::Merge, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + }) + } else { + input + }; + + let kind = if recluster_info.is_some() { + MutationKind::Recluster + } else { + MutationKind::Compact + }; + PhysicalPlan::CommitSink(Box::new(CommitSink { + input: Box::new(plan), + table_info, + snapshot: Some(snapshot), + commit_type: CommitType::Mutation { kind, merge_meta }, + update_stream_meta: vec![], + deduplicated_label: None, + table_meta_timestamps, + plan_id: u32::MAX, + recluster_info, + })) + } +} + +struct HilbertBuildInfo { + keys_bound: Plan, + index_bound: Plan, + query: Plan, } diff --git a/src/query/service/src/pipelines/builders/builder_hilbert_partition.rs b/src/query/service/src/pipelines/builders/builder_hilbert_partition.rs new file mode 100644 index 0000000000000..0c1d0ed61c1b5 --- /dev/null +++ b/src/query/service/src/pipelines/builders/builder_hilbert_partition.rs @@ -0,0 +1,108 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; + +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::MemorySettings; +use databend_common_sql::executor::physical_plans::HilbertPartition; +use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_storages_fuse::operations::TransformSerializeBlock; +use databend_common_storages_fuse::statistics::ClusterStatsGenerator; +use databend_common_storages_fuse::FuseTable; +use databend_storages_common_cache::TempDirManager; +use opendal::services::Fs; +use opendal::Operator; + +use crate::pipelines::memory_settings::MemorySettingsExt; +use crate::pipelines::processors::transforms::CompactStrategy; +use crate::pipelines::processors::transforms::HilbertPartitionExchange; +use crate::pipelines::processors::transforms::TransformWindowPartitionCollect; +use crate::pipelines::PipelineBuilder; +use crate::spillers::SpillerDiskConfig; + +impl PipelineBuilder { + pub(crate) fn build_hilbert_partition(&mut self, partition: &HilbertPartition) -> Result<()> { + self.build_pipeline(&partition.input)?; + let num_processors = self.main_pipeline.output_len(); + let table = self + .ctx + .build_table_by_table_info(&partition.table_info, None)?; + let table = FuseTable::try_from_table(table.as_ref())?; + + self.main_pipeline.exchange( + num_processors, + HilbertPartitionExchange::create(partition.num_partitions), + ); + + let settings = self.ctx.get_settings(); + let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?; + let temp_dir_manager = TempDirManager::instance(); + + let enable_dio = settings.get_enable_dio()?; + let disk_spill = + match temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()) { + Some(temp_dir) if !enable_dio => { + let builder = Fs::default().root(temp_dir.path().to_str().unwrap()); + Some(SpillerDiskConfig { + temp_dir, + local_operator: Some(Operator::new(builder)?.finish()), + }) + } + Some(temp_dir) => Some(SpillerDiskConfig { + temp_dir, + local_operator: None, + }), + None => None, + }; + + let window_spill_settings = MemorySettings::from_window_settings(&self.ctx)?; + let processor_id = AtomicUsize::new(0); + let thresholds = table.get_block_thresholds(); + self.main_pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(Box::new( + TransformWindowPartitionCollect::new( + self.ctx.clone(), + input, + output, + &settings, + processor_id.fetch_add(1, atomic::Ordering::AcqRel), + num_processors, + partition.num_partitions, + window_spill_settings.clone(), + disk_spill.clone(), + CompactStrategy::new(thresholds), + )?, + ))) + })?; + + self.main_pipeline + .add_transform(|transform_input_port, transform_output_port| { + let proc = TransformSerializeBlock::try_create( + self.ctx.clone(), + transform_input_port, + transform_output_port, + table, + ClusterStatsGenerator::default(), + MutationKind::Recluster, + partition.table_meta_timestamps, + )?; + proc.into_processor() + }) + } +} diff --git a/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs b/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs deleted file mode 100644 index d46af383ed46e..0000000000000 --- a/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use databend_common_exception::Result; -use databend_common_pipeline_transforms::processors::build_compact_block_no_split_pipeline; -use databend_common_sql::executor::physical_plans::HilbertSerialize; -use databend_common_sql::executor::physical_plans::MutationKind; -use databend_common_storages_factory::Table; -use databend_common_storages_fuse::operations::TransformSerializeBlock; -use databend_common_storages_fuse::statistics::ClusterStatsGenerator; -use databend_common_storages_fuse::FuseTable; -use databend_common_storages_fuse::TableContext; - -use crate::pipelines::PipelineBuilder; - -impl PipelineBuilder { - pub(crate) fn build_hilbert_serialize(&mut self, serialize: &HilbertSerialize) -> Result<()> { - self.build_pipeline(&serialize.input)?; - let table = self - .ctx - .build_table_by_table_info(&serialize.table_info, None)?; - let table = FuseTable::try_from_table(table.as_ref())?; - - let block_thresholds = table.get_block_thresholds(); - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; - build_compact_block_no_split_pipeline( - &mut self.main_pipeline, - block_thresholds, - max_threads, - )?; - - self.main_pipeline - .add_transform(|transform_input_port, transform_output_port| { - let proc = TransformSerializeBlock::try_create( - self.ctx.clone(), - transform_input_port, - transform_output_port, - table, - ClusterStatsGenerator::default(), - MutationKind::Recluster, - serialize.table_meta_timestamps, - )?; - proc.into_processor() - }) - } -} diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index aa7b90c2534c2..ae0dffc1dc73b 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -32,6 +32,7 @@ use opendal::Operator; use crate::pipelines::memory_settings::MemorySettingsExt; use crate::pipelines::processors::transforms::FrameBound; +use crate::pipelines::processors::transforms::SortStrategy; use crate::pipelines::processors::transforms::TransformWindow; use crate::pipelines::processors::transforms::TransformWindowPartitionCollect; use crate::pipelines::processors::transforms::WindowFunctionInfo; @@ -216,6 +217,12 @@ impl PipelineBuilder { let processor_id = AtomicUsize::new(0); self.main_pipeline.add_transform(|input, output| { + let strategy = SortStrategy::try_create( + &settings, + sort_desc.clone(), + plan_schema.clone(), + have_order_col, + )?; Ok(ProcessorPtr::create(Box::new( TransformWindowPartitionCollect::new( self.ctx.clone(), @@ -227,9 +234,7 @@ impl PipelineBuilder { num_partitions, window_spill_settings.clone(), disk_spill.clone(), - sort_desc.clone(), - plan_schema.clone(), - have_order_col, + strategy, )?, ))) }) diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 707ed7c7e4b58..8a2e5a481a349 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -25,7 +25,7 @@ mod builder_distributed_insert_select; mod builder_exchange; mod builder_fill_missing_columns; mod builder_filter; -mod builder_hilbert_serialize; +mod builder_hilbert_partition; mod builder_insert_multi_table; mod builder_join; mod builder_limit; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index e796d2cb4d705..1763a9cc6dfc6 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -229,7 +229,7 @@ impl PipelineBuilder { // Recluster. PhysicalPlan::Recluster(recluster) => self.build_recluster(recluster), - PhysicalPlan::HilbertSerialize(serialize) => self.build_hilbert_serialize(serialize), + PhysicalPlan::HilbertPartition(partition) => self.build_hilbert_partition(partition), PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate), PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle), diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index 390f3eda48ad1..f357612ae7ed6 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -340,7 +340,7 @@ async fn create_memory_table_for_cte_scan( | PhysicalPlan::CompactSource(_) | PhysicalPlan::CommitSink(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::HilbertSerialize(_) + | PhysicalPlan::HilbertPartition(_) | PhysicalPlan::Duplicate(_) | PhysicalPlan::ChunkFilter(_) | PhysicalPlan::ChunkEvalScalar(_) diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/data_processor_strategy.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/data_processor_strategy.rs new file mode 100644 index 0000000000000..0635c071d75c1 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/data_processor_strategy.rs @@ -0,0 +1,145 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::BlockThresholds; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::SortColumnDescription; +use databend_common_pipeline_transforms::memory_size; +use databend_common_pipeline_transforms::sort_merge; +use databend_common_settings::Settings; + +pub trait DataProcessorStrategy: Send + Sync + 'static { + const NAME: &'static str; + fn process_data_blocks(&self, data_blocks: Vec) -> Result>; +} + +pub struct CompactStrategy { + thresholds: BlockThresholds, +} + +impl CompactStrategy { + pub fn new(thresholds: BlockThresholds) -> Self { + Self { thresholds } + } + + fn concat_blocks(blocks: Vec) -> Result { + DataBlock::concat(&blocks) + } +} + +impl DataProcessorStrategy for CompactStrategy { + const NAME: &'static str = "Compact"; + + fn process_data_blocks(&self, data_blocks: Vec) -> Result> { + let blocks_num = data_blocks.len(); + if blocks_num < 2 { + return Ok(data_blocks); + } + + let mut accumulated_rows = 0; + let mut accumulated_bytes = 0; + let mut pending_blocks = Vec::with_capacity(blocks_num); + let mut staged_blocks = Vec::with_capacity(blocks_num); + let mut result = Vec::with_capacity(blocks_num); + for block in data_blocks { + accumulated_rows += block.num_rows(); + accumulated_bytes += memory_size(&block); + if !self + .thresholds + .check_large_enough(accumulated_rows, accumulated_bytes) + { + pending_blocks.push(block); + continue; + } + + if !staged_blocks.is_empty() { + result.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?); + } + + if pending_blocks.is_empty() + || self + .thresholds + .check_for_compact(accumulated_rows, accumulated_bytes) + { + std::mem::swap(&mut staged_blocks, &mut pending_blocks); + } else { + result.push(Self::concat_blocks(std::mem::take(&mut pending_blocks))?); + } + + staged_blocks.push(block); + accumulated_rows = 0; + accumulated_bytes = 0; + } + + staged_blocks.append(&mut pending_blocks); + if !staged_blocks.is_empty() { + result.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?); + } + + Ok(result) + } +} + +pub struct SortStrategy { + sort_desc: Vec, + schema: DataSchemaRef, + max_block_size: usize, + sort_spilling_batch_bytes: usize, + enable_loser_tree: bool, + have_order_col: bool, +} + +impl SortStrategy { + pub fn try_create( + settings: &Settings, + sort_desc: Vec, + schema: DataSchemaRef, + have_order_col: bool, + ) -> Result { + let max_block_size = settings.get_max_block_size()? as usize; + let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; + let sort_spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?; + Ok(Self { + sort_desc, + schema, + max_block_size, + sort_spilling_batch_bytes, + enable_loser_tree, + have_order_col, + }) + } +} + +impl DataProcessorStrategy for SortStrategy { + const NAME: &'static str = "Sort"; + + fn process_data_blocks(&self, data_blocks: Vec) -> Result> { + let data_blocks = data_blocks + .into_iter() + .map(|data_block| DataBlock::sort(&data_block, &self.sort_desc, None)) + .collect::>>()?; + + sort_merge( + self.schema.clone(), + self.max_block_size, + self.sort_desc.clone(), + data_blocks, + self.sort_spilling_batch_bytes, + self.enable_loser_tree, + self.have_order_col, + ) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/hilbert_partition_exchange.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/hilbert_partition_exchange.rs new file mode 100644 index 0000000000000..93a6ce2aa4b6e --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/hilbert_partition_exchange.rs @@ -0,0 +1,74 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Some variables and functions are named and designed with reference to ClickHouse. +// - https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/Transforms/WindowTransform.h +// - https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/Transforms/WindowTransform.cpp + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::Exchange; + +use crate::pipelines::processors::transforms::WindowPartitionMeta; + +pub struct HilbertPartitionExchange { + num_partitions: usize, +} + +impl HilbertPartitionExchange { + pub fn create(num_partitions: usize) -> Arc { + Arc::new(HilbertPartitionExchange { num_partitions }) + } +} + +impl Exchange for HilbertPartitionExchange { + const NAME: &'static str = "Hilbert"; + fn partition(&self, data_block: DataBlock, n: usize) -> Result> { + let mut data_block = data_block; + let range_ids = data_block + .get_last_column() + .as_number() + .unwrap() + .as_u_int64() + .unwrap(); + + // Scatter the data block to different partitions. + let indices = range_ids + .iter() + .map(|&id| (id % self.num_partitions as u64) as u16) + .collect::>(); + data_block.pop_columns(1); + let scatter_indices = + DataBlock::divide_indices_by_scatter_size(&indices, self.num_partitions); + // Partition the data blocks to different processors. + let mut output_data_blocks = vec![vec![]; n]; + for (partition_id, indices) in scatter_indices.iter().take(self.num_partitions).enumerate() + { + if indices.is_empty() { + continue; + } + let block = data_block.take_with_optimize_size(indices)?; + output_data_blocks[partition_id % n].push((partition_id, block)); + } + + // Union data blocks for each processor. + Ok(output_data_blocks + .into_iter() + .map(WindowPartitionMeta::create) + .map(DataBlock::empty_with_meta) + .collect()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs index 3eebde7955a8b..5aa4562c98865 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod data_processor_strategy; +mod hilbert_partition_exchange; mod transform_window_partition_collect; mod window_partition_buffer; mod window_partition_exchange; mod window_partition_meta; mod window_partition_partial_top_n_exchange; +pub use data_processor_strategy::*; +pub use hilbert_partition_exchange::*; pub use transform_window_partition_collect::*; pub use window_partition_buffer::*; pub use window_partition_exchange::*; diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index b2252ccd2d1f4..3051a2f0f018c 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -23,19 +23,17 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchemaRef; -use databend_common_expression::SortColumnDescription; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_transforms::processors::sort_merge; use databend_common_pipeline_transforms::MemorySettings; use databend_common_settings::Settings; use databend_common_storage::DataOperator; use super::WindowPartitionBuffer; use super::WindowPartitionMeta; +use crate::pipelines::processors::transforms::DataProcessorStrategy; use crate::sessions::QueryContext; use crate::spillers::Spiller; use crate::spillers::SpillerConfig; @@ -52,7 +50,7 @@ pub enum Step { #[derive(Debug, Clone, Copy)] pub enum SyncStep { Collect, - Sort, + Process, } #[derive(Debug, Clone, Copy)] @@ -61,7 +59,7 @@ pub enum AsyncStep { Restore, } -pub struct TransformWindowPartitionCollect { +pub struct TransformWindowPartitionCollect { input: Arc, output: Arc, @@ -73,21 +71,14 @@ pub struct TransformWindowPartitionCollect { // The buffer is used to control the memory usage of the window operator. buffer: WindowPartitionBuffer, - // Sort variables. - sort_desc: Vec, - schema: DataSchemaRef, - max_block_size: usize, - sort_spilling_batch_bytes: usize, - enable_loser_tree: bool, - have_order_col: bool, + strategy: S, // Event variables. step: Step, is_collect_finished: bool, } -impl TransformWindowPartitionCollect { - #[expect(clippy::too_many_arguments)] +impl TransformWindowPartitionCollect { pub fn new( ctx: Arc, input: Arc, @@ -98,9 +89,7 @@ impl TransformWindowPartitionCollect { num_partitions: usize, memory_settings: MemorySettings, disk_spill: Option, - sort_desc: Vec, - schema: DataSchemaRef, - have_order_col: bool, + strategy: S, ) -> Result { // Calculate the partition ids collected by the processor. let partitions: Vec = (0..num_partitions) @@ -134,21 +123,12 @@ impl TransformWindowPartitionCollect { memory_settings, )?; - let max_block_size = settings.get_max_block_size()? as usize; - let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; - let sort_spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?; - Ok(Self { input, output, partition_id, buffer, - sort_desc, - schema, - max_block_size, - have_order_col, - enable_loser_tree, - sort_spilling_batch_bytes, + strategy, is_collect_finished: false, output_data_blocks: VecDeque::new(), restored_data_blocks: Vec::new(), @@ -229,9 +209,9 @@ impl TransformWindowPartitionCollect { } #[async_trait::async_trait] -impl Processor for TransformWindowPartitionCollect { +impl Processor for TransformWindowPartitionCollect { fn name(&self) -> String { - "TransformWindowPartitionCollect".to_string() + format!("TransformWindowPartitionCollect({})", S::NAME) } fn as_any(&mut self) -> &mut dyn Any { @@ -239,16 +219,16 @@ impl Processor for TransformWindowPartitionCollect { } fn event(&mut self) -> Result { - // (collect <--> spill) -> (sort <--> restore) -> finish + // (collect <--> spill) -> (process <--> restore) -> finish match self.step { Step::Sync(sync_step) => match sync_step { SyncStep::Collect => self.collect(), - SyncStep::Sort => self.output(), + SyncStep::Process => self.output(), }, Step::Async(async_step) => match async_step { AsyncStep::Spill => match self.is_collect_finished { true => { - self.step = Step::Sync(SyncStep::Sort); + self.step = Step::Sync(SyncStep::Process); self.output() } false => { @@ -259,7 +239,7 @@ impl Processor for TransformWindowPartitionCollect { }, AsyncStep::Restore => match self.restored_data_blocks.is_empty() { true => self.next_step(Step::Finish), - false => self.next_step(Step::Sync(SyncStep::Sort)), + false => self.next_step(Step::Sync(SyncStep::Process)), }, }, Step::Finish => Ok(Event::Finished), @@ -268,25 +248,10 @@ impl Processor for TransformWindowPartitionCollect { fn process(&mut self) -> Result<()> { match self.step { - Step::Sync(SyncStep::Sort) => { + Step::Sync(SyncStep::Process) => { let restored_data_blocks = std::mem::take(&mut self.restored_data_blocks); - - let data_blocks = restored_data_blocks - .into_iter() - .map(|data_block| DataBlock::sort(&data_block, &self.sort_desc, None)) - .collect::>>()?; - - let sorted_data_blocks = sort_merge( - self.schema.clone(), - self.max_block_size, - self.sort_desc.clone(), - data_blocks, - self.sort_spilling_batch_bytes, - self.enable_loser_tree, - self.have_order_col, - )?; - - self.output_data_blocks.extend(sorted_data_blocks); + let processed_blocks = self.strategy.process_data_blocks(restored_data_blocks)?; + self.output_data_blocks.extend(processed_blocks); } _ => unreachable!(), } @@ -306,7 +271,7 @@ impl Processor for TransformWindowPartitionCollect { } } -impl TransformWindowPartitionCollect { +impl TransformWindowPartitionCollect { fn collect_data_block( data_block: DataBlock, partition_ids: &[usize], diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 5cccee701d104..bd74d9f265b6a 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -159,8 +159,6 @@ pub struct QueryContext { fragment_id: Arc, // Used by synchronized generate aggregating indexes when new data written. written_segment_locs: Arc>>, - // Used by hilbert clustering when do recluster. - selected_segment_locs: Arc>>, // Temp table for materialized CTE, first string is the database_name, second string is the table_name // All temp tables' catalog is `CATALOG_DEFAULT`, so we don't need to store it. m_cte_temp_table: Arc>>, @@ -190,7 +188,6 @@ impl QueryContext { written_segment_locs: Default::default(), block_threshold: Default::default(), m_cte_temp_table: Default::default(), - selected_segment_locs: Default::default(), }) } @@ -1289,16 +1286,21 @@ impl TableContext for QueryContext { } fn add_selected_segment_location(&self, segment_loc: Location) { - let mut segment_locations = self.selected_segment_locs.write(); + let mut segment_locations = self.shared.selected_segment_locs.write(); segment_locations.insert(segment_loc); } fn get_selected_segment_locations(&self) -> Vec { - self.selected_segment_locs.read().iter().cloned().collect() + self.shared + .selected_segment_locs + .read() + .iter() + .cloned() + .collect() } fn clear_selected_segment_locations(&self) { - let mut segment_locations = self.selected_segment_locs.write(); + let mut segment_locations = self.shared.selected_segment_locs.write(); segment_locations.clear(); } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 240de47ba0bf1..cbaa01d22c3b9 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -14,6 +14,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -57,6 +58,7 @@ use databend_common_storage::MutationStatus; use databend_common_storage::StorageMetrics; use databend_common_storages_stream::stream_table::StreamTable; use databend_common_users::UserApiProvider; +use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use parking_lot::Mutex; use parking_lot::RwLock; @@ -162,6 +164,9 @@ pub struct QueryContextShared { pub(in crate::sessions) unload_callbacked: AtomicBool, pub(in crate::sessions) mem_stat: Arc>>>, pub(in crate::sessions) node_memory_usage: Arc>>>, + + // Used by hilbert clustering when do recluster. + pub(in crate::sessions) selected_segment_locs: Arc>>, } impl QueryContextShared { @@ -226,6 +231,7 @@ impl QueryContextShared { warehouse_cache: Arc::new(RwLock::new(None)), mem_stat: Arc::new(RwLock::new(None)), node_memory_usage: Arc::new(RwLock::new(HashMap::new())), + selected_segment_locs: Default::default(), })) } diff --git a/src/query/service/src/sessions/queue_mgr.rs b/src/query/service/src/sessions/queue_mgr.rs index 231e05aff5b3b..bd305c83cb7d5 100644 --- a/src/query/service/src/sessions/queue_mgr.rs +++ b/src/query/service/src/sessions/queue_mgr.rs @@ -411,7 +411,7 @@ impl QueryEntry { | Plan::VacuumTable(_) | Plan::VacuumTemporaryFiles(_) | Plan::RefreshIndex(_) - | Plan::ReclusterTable { .. } + | Plan::ReclusterTable(_) | Plan::TruncateTable(_) => { return true; } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index b84dc5aba20c7..098025eceac69 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1159,7 +1159,7 @@ impl DefaultSettings { range: None, }), ("hilbert_num_range_ids", DefaultSettingValue { - value: UserSettingValue::UInt64(1024), + value: UserSettingValue::UInt64(1000), desc: "Specifies the domain of range IDs in Hilbert clustering. A larger value provides finer granularity, but may incur a performance cost.", mode: SettingMode::Both, scope: SettingScope::Both, @@ -1172,6 +1172,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(1..=u64::MAX)), }), + ("hilbert_clustering_min_bytes", DefaultSettingValue { + value: UserSettingValue::UInt64(100 * 1024 * 1024 * 1024), + desc: "Sets the minimum byte size of blocks for Hilbert Clustering.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(1..=3200 * 1024 * 1024 * 1024)), + }), ("enable_prune_cache", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enable to cache the pruning result", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 6e77ce6ab1933..d1691ec244513 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -869,6 +869,10 @@ impl Settings { self.try_get_u64("hilbert_sample_size_per_block") } + pub fn get_hilbert_clustering_min_bytes(&self) -> Result { + self.try_get_u64("hilbert_clustering_min_bytes") + } + pub fn get_copy_dedup_full_path_by_default(&self) -> Result { Ok(self.try_get_u64("copy_dedup_full_path_by_default")? == 1) } diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index 0cdb7cf0e5a88..ba707dc16a72b 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -35,7 +35,6 @@ databend-common-storages-result-cache = { workspace = true } databend-common-storages-view = { workspace = true } databend-common-users = { workspace = true } databend-enterprise-data-mask-feature = { workspace = true } -databend-enterprise-hilbert-clustering = { workspace = true } databend-storages-common-cache = { workspace = true } databend-storages-common-io = { workspace = true } databend-storages-common-session = { workspace = true } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index 162205bf81a4d..e407d5fda3be8 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -367,8 +367,8 @@ fn to_format_tree( distributed_insert_to_format_tree(plan.as_ref(), metadata, profs) } PhysicalPlan::Recluster(_) => Ok(FormatTreeNode::new("Recluster".to_string())), - PhysicalPlan::HilbertSerialize(_) => { - Ok(FormatTreeNode::new("HilbertSerialize".to_string())) + PhysicalPlan::HilbertPartition(_) => { + Ok(FormatTreeNode::new("HilbertPartition".to_string())) } PhysicalPlan::CompactSource(_) => Ok(FormatTreeNode::new("CompactSource".to_string())), PhysicalPlan::CommitSink(plan) => commit_sink_to_format_tree(plan, metadata, profs), diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index e259d37be1375..a9126dc5e8f7a 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -25,7 +25,7 @@ use enum_as_inner::EnumAsInner; use itertools::Itertools; use super::physical_plans::AddStreamColumn; -use super::physical_plans::HilbertSerialize; +use super::physical_plans::HilbertPartition; use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSource; @@ -138,7 +138,7 @@ pub enum PhysicalPlan { /// Recluster Recluster(Box), - HilbertSerialize(Box), + HilbertPartition(Box), /// Multi table insert Duplicate(Box), @@ -356,7 +356,7 @@ impl PhysicalPlan { plan.plan_id = *next_id; *next_id += 1; } - PhysicalPlan::HilbertSerialize(plan) => { + PhysicalPlan::HilbertPartition(plan) => { plan.plan_id = *next_id; *next_id += 1; } @@ -450,7 +450,7 @@ impl PhysicalPlan { PhysicalPlan::ReplaceInto(v) => v.plan_id, PhysicalPlan::CompactSource(v) => v.plan_id, PhysicalPlan::Recluster(v) => v.plan_id, - PhysicalPlan::HilbertSerialize(v) => v.plan_id, + PhysicalPlan::HilbertPartition(v) => v.plan_id, PhysicalPlan::Duplicate(v) => v.plan_id, PhysicalPlan::Shuffle(v) => v.plan_id, PhysicalPlan::ChunkFilter(v) => v.plan_id, @@ -506,7 +506,7 @@ impl PhysicalPlan { | PhysicalPlan::CommitSink(_) | PhysicalPlan::DistributedInsertSelect(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::HilbertSerialize(_) => Ok(DataSchemaRef::default()), + | PhysicalPlan::HilbertPartition(_) => Ok(DataSchemaRef::default()), PhysicalPlan::Duplicate(plan) => plan.input.output_schema(), PhysicalPlan::Shuffle(plan) => plan.input.output_schema(), PhysicalPlan::ChunkFilter(plan) => plan.input.output_schema(), @@ -566,7 +566,7 @@ impl PhysicalPlan { PhysicalPlan::ExpressionScan(_) => "ExpressionScan".to_string(), PhysicalPlan::CacheScan(_) => "CacheScan".to_string(), PhysicalPlan::Recluster(_) => "Recluster".to_string(), - PhysicalPlan::HilbertSerialize(_) => "HilbertSerialize".to_string(), + PhysicalPlan::HilbertPartition(_) => "HilbertPartition".to_string(), PhysicalPlan::Udf(_) => "Udf".to_string(), PhysicalPlan::Duplicate(_) => "Duplicate".to_string(), PhysicalPlan::Shuffle(_) => "Shuffle".to_string(), @@ -590,7 +590,7 @@ impl PhysicalPlan { | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), - PhysicalPlan::HilbertSerialize(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::AggregateExpand(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -695,7 +695,7 @@ impl PhysicalPlan { | PhysicalPlan::CacheScan(_) | PhysicalPlan::RecursiveCteScan(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::HilbertSerialize(_) + | PhysicalPlan::HilbertPartition(_) | PhysicalPlan::Duplicate(_) | PhysicalPlan::Shuffle(_) | PhysicalPlan::ChunkFilter(_) diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index b03bc1ae32852..56289d465b4e0 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -128,9 +128,6 @@ impl PhysicalPlanBuilder { RelOperator::MutationSource(mutation_source) => { self.build_mutation_source(mutation_source).await } - RelOperator::Recluster(recluster) => { - self.build_recluster(s_expr, recluster, required).await - } RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await, } } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 66559493bc90a..35b4c0bc73178 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -17,7 +17,7 @@ use databend_common_exception::Result; use super::physical_plans::AddStreamColumn; use super::physical_plans::CacheScan; use super::physical_plans::ExpressionScan; -use super::physical_plans::HilbertSerialize; +use super::physical_plans::HilbertPartition; use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSplit; @@ -108,7 +108,7 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::ExpressionScan(plan) => self.replace_expression_scan(plan), PhysicalPlan::CacheScan(plan) => self.replace_cache_scan(plan), PhysicalPlan::Recluster(plan) => self.replace_recluster(plan), - PhysicalPlan::HilbertSerialize(plan) => self.replace_hilbert_serialize(plan), + PhysicalPlan::HilbertPartition(plan) => self.replace_hilbert_serialize(plan), PhysicalPlan::Udf(plan) => self.replace_udf(plan), PhysicalPlan::AsyncFunction(plan) => self.replace_async_function(plan), PhysicalPlan::Duplicate(plan) => self.replace_duplicate(plan), @@ -127,13 +127,11 @@ pub trait PhysicalPlanReplacer { Ok(PhysicalPlan::Recluster(Box::new(plan.clone()))) } - fn replace_hilbert_serialize(&mut self, plan: &HilbertSerialize) -> Result { + fn replace_hilbert_serialize(&mut self, plan: &HilbertPartition) -> Result { let input = self.replace(&plan.input)?; - Ok(PhysicalPlan::HilbertSerialize(Box::new(HilbertSerialize { - plan_id: plan.plan_id, + Ok(PhysicalPlan::HilbertPartition(Box::new(HilbertPartition { input: Box::new(input), - table_info: plan.table_info.clone(), - table_meta_timestamps: plan.table_meta_timestamps, + ..plan.clone() }))) } @@ -646,7 +644,7 @@ impl PhysicalPlan { | PhysicalPlan::ExpressionScan(_) | PhysicalPlan::CacheScan(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::HilbertSerialize(_) + | PhysicalPlan::HilbertPartition(_) | PhysicalPlan::ExchangeSource(_) | PhysicalPlan::CompactSource(_) | PhysicalPlan::MutationSource(_) => {} diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 5f3a01b27fdae..2ef35daca8bfa 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -90,7 +90,7 @@ pub use physical_mutation_source::*; pub use physical_project_set::ProjectSet; pub use physical_r_cte_scan::RecursiveCteScan; pub use physical_range_join::*; -pub use physical_recluster::HilbertSerialize; +pub use physical_recluster::HilbertPartition; pub use physical_recluster::Recluster; pub use physical_refresh_index::RefreshIndex; pub use physical_replace_async_source::ReplaceAsyncSourcer; diff --git a/src/query/sql/src/executor/physical_plans/physical_recluster.rs b/src/query/sql/src/executor/physical_plans/physical_recluster.rs index ed085ce940d4a..bd9b38c10e114 100644 --- a/src/query/sql/src/executor/physical_plans/physical_recluster.rs +++ b/src/query/sql/src/executor/physical_plans/physical_recluster.rs @@ -12,27 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_catalog::plan::PartInfoType; -use databend_common_catalog::plan::PushDownInfo; -use databend_common_catalog::plan::ReclusterInfoSideCar; -use databend_common_catalog::plan::ReclusterParts; use databend_common_catalog::plan::ReclusterTask; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; use databend_common_meta_app::schema::TableInfo; -use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler; use databend_storages_common_table_meta::meta::TableMetaTimestamps; -use crate::executor::physical_plans::CommitSink; -use crate::executor::physical_plans::CommitType; -use crate::executor::physical_plans::CompactSource; -use crate::executor::physical_plans::Exchange; -use crate::executor::physical_plans::FragmentKind; -use crate::executor::physical_plans::MutationKind; use crate::executor::PhysicalPlan; -use crate::executor::PhysicalPlanBuilder; -use crate::optimizer::SExpr; -use crate::ColumnSet; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Recluster { @@ -43,171 +27,10 @@ pub struct Recluster { } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -pub struct HilbertSerialize { +pub struct HilbertPartition { pub plan_id: u32, pub input: Box, pub table_info: TableInfo, + pub num_partitions: usize, pub table_meta_timestamps: TableMetaTimestamps, } - -impl PhysicalPlanBuilder { - pub async fn build_recluster( - &mut self, - s_expr: &SExpr, - recluster: &crate::plans::Recluster, - required: ColumnSet, - ) -> Result { - let crate::plans::Recluster { - catalog, - database, - table, - filters, - limit, - } = recluster; - - let tbl = self.ctx.get_table(catalog, database, table).await?; - let push_downs = filters.clone().map(|v| PushDownInfo { - filters: Some(v), - ..PushDownInfo::default() - }); - let table_info = tbl.get_table_info().clone(); - let is_hilbert = !s_expr.children.is_empty(); - let commit_type = CommitType::Mutation { - kind: MutationKind::Recluster, - merge_meta: false, - }; - let mut plan = if is_hilbert { - let handler = get_hilbert_clustering_handler(); - let Some((recluster_info, snapshot)) = handler - .do_hilbert_clustering(tbl.clone(), self.ctx.clone(), push_downs) - .await? - else { - return Err(ErrorCode::NoNeedToRecluster(format!( - "No need to do recluster for '{database}'.'{table}'" - ))); - }; - - let table_meta_timestamps = self - .ctx - .get_table_meta_timestamps(tbl.as_ref(), Some(snapshot.clone()))?; - - let plan = self.build(s_expr.child(0)?, required).await?; - let plan = PhysicalPlan::HilbertSerialize(Box::new(HilbertSerialize { - plan_id: 0, - input: Box::new(plan), - table_info: table_info.clone(), - table_meta_timestamps, - })); - PhysicalPlan::CommitSink(Box::new(CommitSink { - input: Box::new(plan), - table_info, - snapshot: Some(snapshot), - commit_type, - update_stream_meta: vec![], - deduplicated_label: None, - plan_id: u32::MAX, - recluster_info: Some(recluster_info), - table_meta_timestamps, - })) - } else { - let Some((parts, snapshot)) = - tbl.recluster(self.ctx.clone(), push_downs, *limit).await? - else { - return Err(ErrorCode::NoNeedToRecluster(format!( - "No need to do recluster for '{database}'.'{table}'" - ))); - }; - - let table_meta_timestamps = self - .ctx - .get_table_meta_timestamps(tbl.as_ref(), Some(snapshot.clone()))?; - - if parts.is_empty() { - return Err(ErrorCode::NoNeedToRecluster(format!( - "No need to do recluster for '{database}'.'{table}'" - ))); - } - - let is_distributed = parts.is_distributed(self.ctx.clone()); - match parts { - ReclusterParts::Recluster { - tasks, - remained_blocks, - removed_segment_indexes, - removed_segment_summary, - } => { - let mut root = PhysicalPlan::Recluster(Box::new(Recluster { - tasks, - table_info: table_info.clone(), - plan_id: u32::MAX, - table_meta_timestamps, - })); - - if is_distributed { - root = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(root), - kind: FragmentKind::Merge, - keys: vec![], - allow_adjust_parallelism: true, - ignore_exchange: false, - }); - } - PhysicalPlan::CommitSink(Box::new(CommitSink { - input: Box::new(root), - table_info, - snapshot: Some(snapshot), - commit_type, - update_stream_meta: vec![], - deduplicated_label: None, - plan_id: u32::MAX, - recluster_info: Some(ReclusterInfoSideCar { - merged_blocks: remained_blocks, - removed_segment_indexes, - removed_statistics: removed_segment_summary, - }), - table_meta_timestamps, - })) - } - ReclusterParts::Compact(parts) => { - let merge_meta = parts.partitions_type() == PartInfoType::LazyLevel; - let mut root = PhysicalPlan::CompactSource(Box::new(CompactSource { - parts, - table_info: table_info.clone(), - column_ids: snapshot.schema.to_leaf_column_id_set(), - plan_id: u32::MAX, - table_meta_timestamps, - })); - - if is_distributed { - root = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(root), - kind: FragmentKind::Merge, - keys: vec![], - allow_adjust_parallelism: true, - ignore_exchange: false, - }); - } - - PhysicalPlan::CommitSink(Box::new(CommitSink { - input: Box::new(root), - table_info, - snapshot: Some(snapshot), - commit_type: CommitType::Mutation { - kind: MutationKind::Compact, - merge_meta, - }, - update_stream_meta: vec![], - deduplicated_label: None, - plan_id: u32::MAX, - recluster_info: None, - table_meta_timestamps, - })) - } - } - }; - plan.adjust_plan_id(&mut 0); - Ok(plan) - } -} diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 00caa00765d97..0871b2285edbe 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -32,7 +32,6 @@ use databend_common_ast::ast::DescribeTableStmt; use databend_common_ast::ast::DropTableStmt; use databend_common_ast::ast::Engine; use databend_common_ast::ast::ExistsTableStmt; -use databend_common_ast::ast::Expr; use databend_common_ast::ast::Identifier; use databend_common_ast::ast::InvertedIndexDefinition; use databend_common_ast::ast::ModifyColumnAction; @@ -61,15 +60,12 @@ use databend_common_base::base::uuid::Uuid; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::lock::LockTableOption; -use databend_common_catalog::plan::Filters; use databend_common_catalog::table::CompactionLimits; -use databend_common_catalog::table::TableExt; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::infer_schema_type; use databend_common_expression::infer_table_schema; -use databend_common_expression::type_check::check_function; use databend_common_expression::types::DataType; use databend_common_expression::ComputedExpr; use databend_common_expression::DataField; @@ -90,7 +86,6 @@ use databend_common_storage::init_operator; use databend_common_storages_view::view_table::QUERY; use databend_common_storages_view::view_table::VIEW_ENGINE; use databend_storages_common_table_meta::table::is_reserved_opt_key; -use databend_storages_common_table_meta::table::ClusterType; use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; use databend_storages_common_table_meta::table::OPT_KEY_ENGINE_META; @@ -103,14 +98,12 @@ use derive_visitor::DriveMut; use log::debug; use opendal::Operator; -use crate::bind_table; use crate::binder::get_storage_params_from_options; use crate::binder::parse_storage_params_from_uri; use crate::binder::scalar::ScalarBinder; use crate::binder::Binder; use crate::binder::ColumnBindingBuilder; use crate::binder::Visibility; -use crate::executor::cast_expr_to_non_null_boolean; use crate::optimizer::SExpr; use crate::parse_computed_expr_to_string; use crate::parse_default_expr_to_string; @@ -134,7 +127,7 @@ use crate::plans::OptimizeCompactBlock; use crate::plans::OptimizeCompactSegmentPlan; use crate::plans::OptimizePurgePlan; use crate::plans::Plan; -use crate::plans::Recluster; +use crate::plans::ReclusterPlan; use crate::plans::RelOperator; use crate::plans::RenameTableColumnPlan; use crate::plans::RenameTablePlan; @@ -151,10 +144,8 @@ use crate::plans::VacuumTableOption; use crate::plans::VacuumTablePlan; use crate::plans::VacuumTemporaryFilesPlan; use crate::BindContext; -use crate::NameResolutionContext; use crate::Planner; use crate::SelectBuilder; -use crate::TypeChecker; impl Binder { #[async_backtrace::framed] @@ -1038,17 +1029,14 @@ impl Binder { is_final, selection, limit, - } => { - self.bind_recluster_table( - catalog, - database, - table, - limit.map(|v| v as usize), - selection, - *is_final, - ) - .await - } + } => Ok(Plan::ReclusterTable(Box::new(ReclusterPlan { + catalog, + database, + table, + limit: limit.map(|v| v as usize), + selection: selection.clone(), + is_final: *is_final, + }))), AlterTableAction::FlashbackTo { point } => { let point = self.resolve_data_travel_point(bind_context, point)?; Ok(Plan::RevertTable(Box::new(RevertTablePlan { @@ -1078,147 +1066,6 @@ impl Binder { } } - #[async_backtrace::framed] - pub async fn bind_recluster_table( - &mut self, - catalog: String, - database: String, - table: String, - limit: Option, - selection: &Option, - is_final: bool, - ) -> Result { - let tbl = self.ctx.get_table(&catalog, &database, &table).await?; - // check mutability - tbl.check_mutable()?; - let Some(cluster_type) = tbl.cluster_type() else { - return Err(ErrorCode::UnclusteredTable(format!( - "Unclustered table '{}.{}'", - database, table, - ))); - }; - - let filters = if let Some(expr) = selection { - let (mut context, metadata) = bind_table(tbl.clone())?; - let mut type_checker = TypeChecker::try_create( - &mut context, - self.ctx.clone(), - &self.name_resolution_ctx, - metadata, - &[], - true, - )?; - let (scalar, _) = *type_checker.resolve(expr)?; - - // prepare the filter expression - let filter = cast_expr_to_non_null_boolean( - scalar - .as_expr()? - .project_column_ref(|col| col.column_name.clone()), - )?; - // prepare the inverse filter expression - let inverted_filter = - check_function(None, "not", &[], &[filter.clone()], &BUILTIN_FUNCTIONS)?; - - Some(Filters { - filter: filter.as_remote_expr(), - inverted_filter: inverted_filter.as_remote_expr(), - }) - } else { - None - }; - - let hilbert_query = if matches!(cluster_type, ClusterType::Hilbert) { - LicenseManagerSwitch::instance() - .check_enterprise_enabled(self.ctx.get_license_key(), Feature::HilbertClustering)?; - let ast_exprs = tbl.resolve_cluster_keys(self.ctx.clone()).unwrap(); - let cluster_keys_len = ast_exprs.len(); - let settings = self.ctx.get_settings(); - let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; - let cluster_key_strs = ast_exprs.into_iter().fold( - Vec::with_capacity(cluster_keys_len), - |mut acc, mut ast| { - let mut normalizer = IdentifierNormalizer { - ctx: &name_resolution_ctx, - }; - ast.drive_mut(&mut normalizer); - acc.push(format!("{:#}", &ast)); - acc - }, - ); - - let partitions = settings.get_hilbert_num_range_ids()?; - let sample_size = settings.get_hilbert_sample_size_per_block()?; - let mut keys_bounds = Vec::with_capacity(cluster_key_strs.len()); - let mut hilbert_keys = Vec::with_capacity(cluster_key_strs.len()); - for (index, cluster_key_str) in cluster_key_strs.into_iter().enumerate() { - keys_bounds.push(format!( - "range_bound({partitions}, {sample_size})({cluster_key_str}) AS bound_{index}" - )); - hilbert_keys.push(format!( - "hilbert_key(cast(ifnull(range_partition_id({table}.{cluster_key_str}, \ - _keys_bound.bound_{index}), {partitions}) as uint16))" - )); - } - let keys_bounds_str = keys_bounds.join(", "); - let hilbert_keys_str = hilbert_keys.join(", "); - - let quote = settings.get_sql_dialect()?.default_ident_quote(); - let schema = tbl.schema_with_stream(); - let mut output_with_table = Vec::with_capacity(schema.fields.len()); - let mut output = Vec::with_capacity(schema.fields.len()); - for field in &schema.fields { - output_with_table.push(format!( - "{quote}{table}{quote}.{quote}{}{quote}", - field.name - )); - output.push(format!("{quote}{}{quote}", field.name)); - } - let output_with_table_str = output_with_table.join(", "); - let output_str = output.join(", "); - - let query = format!( - "WITH _keys_bound AS ( \ - SELECT \ - {keys_bounds_str} \ - FROM {database}.{table} \ - ), \ - _source_data AS ( \ - SELECT \ - {output_with_table_str}, \ - hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \ - FROM {database}.{table}, _keys_bound \ - ) \ - SELECT \ - {output_str} \ - FROM _source_data \ - ORDER BY _hilbert_index" - ); - let tokens = tokenize_sql(query.as_str())?; - let (stmt, _) = parse_sql(&tokens, self.dialect)?; - let Statement::Query(query) = stmt else { - unreachable!() - }; - Some(query) - } else { - None - }; - - let recluster = RelOperator::Recluster(Recluster { - catalog, - database, - table, - limit, - filters, - }); - - Ok(Plan::ReclusterTable { - s_expr: Box::new(SExpr::create_leaf(Arc::new(recluster))), - hilbert_query, - is_final, - }) - } - #[async_backtrace::framed] pub(in crate::planner::binder) async fn bind_rename_table( &mut self, diff --git a/src/query/sql/src/planner/binder/util.rs b/src/query/sql/src/planner/binder/util.rs index 82e9981bcb5c9..52d747eb13a8a 100644 --- a/src/query/sql/src/planner/binder/util.rs +++ b/src/query/sql/src/planner/binder/util.rs @@ -82,7 +82,6 @@ impl Binder { | RelOperator::Aggregate(_) | RelOperator::Window(_) | RelOperator::Mutation(_) - | RelOperator::Recluster(_) | RelOperator::MutationSource(_) | RelOperator::CompactBlock(_) => { return Err(ErrorCode::SyntaxException(format!( diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index b8c5e5c359bfc..0f7c845546278 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -72,7 +72,7 @@ impl Plan { Plan::DropTableColumn(_) => Ok("DropTableColumn".to_string()), Plan::AlterTableClusterKey(_) => Ok("AlterTableClusterKey".to_string()), Plan::DropTableClusterKey(_) => Ok("DropTableClusterKey".to_string()), - Plan::ReclusterTable { .. } => Ok("ReclusterTable".to_string()), + Plan::ReclusterTable(_) => Ok("ReclusterTable".to_string()), Plan::TruncateTable(_) => Ok("TruncateTable".to_string()), Plan::OptimizePurge(_) => Ok("OptimizePurge".to_string()), Plan::OptimizeCompactSegment(_) => Ok("OptimizeCompactSegment".to_string()), diff --git a/src/query/sql/src/planner/metadata.rs b/src/query/sql/src/planner/metadata.rs index f775e2374eb27..a2a8310a1552d 100644 --- a/src/query/sql/src/planner/metadata.rs +++ b/src/query/sql/src/planner/metadata.rs @@ -498,6 +498,12 @@ impl Metadata { } table_name } + + pub fn replace_all_tables(&mut self, table: Arc) { + for entry in self.tables.iter_mut() { + entry.table = table.clone(); + } + } } #[derive(Clone)] diff --git a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs index b325b167a84de..c4ca0048faa31 100644 --- a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs +++ b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs @@ -201,7 +201,6 @@ impl SubqueryRewriter { | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) - | RelOperator::Recluster(_) | RelOperator::CompactBlock(_) => Ok(s_expr.clone()), } } diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs index a7b21ae4579fe..1d531a64a2b7a 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -91,7 +91,6 @@ pub async fn dynamic_sample( | RelOperator::ExpressionScan(_) | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) - | RelOperator::Recluster(_) | RelOperator::CompactBlock(_) | RelOperator::MutationSource(_) => { s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)) diff --git a/src/query/sql/src/planner/optimizer/format.rs b/src/query/sql/src/planner/optimizer/format.rs index b7aaa58ece778..543524d439a86 100644 --- a/src/query/sql/src/planner/optimizer/format.rs +++ b/src/query/sql/src/planner/optimizer/format.rs @@ -73,7 +73,6 @@ pub fn display_rel_op(rel_op: &RelOperator) -> String { RelOperator::AsyncFunction(_) => "AsyncFunction".to_string(), RelOperator::Mutation(_) => "MergeInto".to_string(), RelOperator::MutationSource(_) => "MutationSource".to_string(), - RelOperator::Recluster(_) => "Recluster".to_string(), RelOperator::CompactBlock(_) => "CompactBlock".to_string(), } } diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index e741f6ac81959..c5e89758c1288 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -296,7 +296,6 @@ impl DPhpy { | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) - | RelOperator::Recluster(_) | RelOperator::CompactBlock(_) => Ok((Arc::new(s_expr.clone()), true)), } } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs index ac3e316e94139..e62af3f3d8184 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs @@ -152,7 +152,6 @@ fn find_group_by_keys( | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) - | RelOperator::Recluster(_) | RelOperator::CompactBlock(_) => {} } Ok(()) diff --git a/src/query/sql/src/planner/optimizer/s_expr.rs b/src/query/sql/src/planner/optimizer/s_expr.rs index 2092155cd4248..dfe9b21d277ff 100644 --- a/src/query/sql/src/planner/optimizer/s_expr.rs +++ b/src/query/sql/src/planner/optimizer/s_expr.rs @@ -343,7 +343,6 @@ impl SExpr { | RelOperator::CacheScan(_) | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) - | RelOperator::Recluster(_) | RelOperator::CompactBlock(_) => {} }; for child in &self.children { @@ -442,7 +441,6 @@ fn find_subquery(rel_op: &RelOperator) -> bool { | RelOperator::AsyncFunction(_) | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) - | RelOperator::Recluster(_) | RelOperator::CompactBlock(_) => false, RelOperator::Join(op) => { op.equi_conditions.iter().any(|condition| { diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index 66b738992daa3..e2063d02d41fb 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -85,7 +85,7 @@ impl Planner { #[fastrace::trace] pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, PlanExtras)> { let extras = self.parse_sql(sql)?; - let plan = self.plan_stmt(&extras.statement, true).await?; + let plan = self.plan_stmt(&extras.statement).await?; Ok((plan, extras)) } @@ -221,7 +221,7 @@ impl Planner { #[async_backtrace::framed] #[fastrace::trace] - pub async fn plan_stmt(&mut self, stmt: &Statement, attach_query: bool) -> Result { + pub async fn plan_stmt(&mut self, stmt: &Statement) -> Result { let start = Instant::now(); let query_kind = get_query_kind(stmt); let settings = self.ctx.get_settings(); @@ -242,10 +242,8 @@ impl Planner { ); if let Some(plan) = plan { info!("logical plan from cache, time used: {:?}", start.elapsed()); - if attach_query { - // update for clickhouse handler - self.ctx.attach_query_str(query_kind, stmt.to_mask_sql()); - } + // update for clickhouse handler + self.ctx.attach_query_str(query_kind, stmt.to_mask_sql()); return Ok(plan.plan); } enable_planner_cache = c; @@ -261,14 +259,10 @@ impl Planner { .with_subquery_executor(self.query_executor.clone()); // must attach before bind, because ParquetRSTable::create used it. - if attach_query { - self.ctx.attach_query_str(query_kind, stmt.to_mask_sql()); - } + self.ctx.attach_query_str(query_kind, stmt.to_mask_sql()); let plan = binder.bind(stmt).await?; // attach again to avoid the query kind is overwritten by the subquery - if attach_query { - self.ctx.attach_query_str(query_kind, stmt.to_mask_sql()); - } + self.ctx.attach_query_str(query_kind, stmt.to_mask_sql()); // Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr let opt_ctx = OptimizerContext::new(self.ctx.clone(), metadata.clone()) diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index 1bf8b9bc0ee3c..b01daefbd6550 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -40,7 +40,6 @@ use crate::plans::Limit; use crate::plans::Mutation; use crate::plans::OptimizeCompactBlock; use crate::plans::ProjectSet; -use crate::plans::Recluster; use crate::plans::Scan; use crate::plans::Sort; use crate::plans::Udf; @@ -116,7 +115,6 @@ pub enum RelOp { AsyncFunction, RecursiveCteScan, MergeInto, - Recluster, CompactBlock, MutationSource, @@ -153,7 +151,6 @@ pub enum RelOperator { RecursiveCteScan(RecursiveCteScan), AsyncFunction(AsyncFunction), Mutation(Mutation), - Recluster(Recluster), CompactBlock(OptimizeCompactBlock), MutationSource(MutationSource), } @@ -180,7 +177,6 @@ impl Operator for RelOperator { RelOperator::RecursiveCteScan(rel_op) => rel_op.rel_op(), RelOperator::AsyncFunction(rel_op) => rel_op.rel_op(), RelOperator::Mutation(rel_op) => rel_op.rel_op(), - RelOperator::Recluster(rel_op) => rel_op.rel_op(), RelOperator::CompactBlock(rel_op) => rel_op.rel_op(), RelOperator::MutationSource(rel_op) => rel_op.rel_op(), } @@ -207,7 +203,6 @@ impl Operator for RelOperator { RelOperator::RecursiveCteScan(rel_op) => rel_op.arity(), RelOperator::AsyncFunction(rel_op) => rel_op.arity(), RelOperator::Mutation(rel_op) => rel_op.arity(), - RelOperator::Recluster(rel_op) => rel_op.arity(), RelOperator::CompactBlock(rel_op) => rel_op.arity(), RelOperator::MutationSource(rel_op) => rel_op.arity(), } @@ -234,7 +229,6 @@ impl Operator for RelOperator { RelOperator::RecursiveCteScan(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::AsyncFunction(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::Mutation(rel_op) => rel_op.derive_relational_prop(rel_expr), - RelOperator::Recluster(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_relational_prop(rel_expr), } @@ -261,7 +255,6 @@ impl Operator for RelOperator { RelOperator::RecursiveCteScan(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::AsyncFunction(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::Mutation(rel_op) => rel_op.derive_physical_prop(rel_expr), - RelOperator::Recluster(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_physical_prop(rel_expr), } @@ -288,7 +281,6 @@ impl Operator for RelOperator { RelOperator::RecursiveCteScan(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::AsyncFunction(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::Mutation(rel_op) => rel_op.derive_stats(rel_expr), - RelOperator::Recluster(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_stats(rel_expr), } @@ -359,9 +351,6 @@ impl Operator for RelOperator { RelOperator::Mutation(rel_op) => { rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) } - RelOperator::Recluster(rel_op) => { - rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) - } RelOperator::CompactBlock(rel_op) => { rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) } @@ -435,9 +424,6 @@ impl Operator for RelOperator { RelOperator::Mutation(rel_op) => { rel_op.compute_required_prop_children(ctx, rel_expr, required) } - RelOperator::Recluster(rel_op) => { - rel_op.compute_required_prop_children(ctx, rel_expr, required) - } RelOperator::CompactBlock(rel_op) => { rel_op.compute_required_prop_children(ctx, rel_expr, required) } @@ -793,26 +779,6 @@ impl TryFrom for Mutation { } } -impl From for RelOperator { - fn from(v: Recluster) -> Self { - Self::Recluster(v) - } -} - -impl TryFrom for Recluster { - type Error = ErrorCode; - fn try_from(value: RelOperator) -> Result { - if let RelOperator::Recluster(value) = value { - Ok(value) - } else { - Err(ErrorCode::Internal(format!( - "Cannot downcast {:?} to Recluster", - value.rel_op() - ))) - } - } -} - impl From for RelOperator { fn from(v: OptimizeCompactBlock) -> Self { Self::CompactBlock(v) diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index bc7164efaca24..dd8c1022bc7f9 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -17,7 +17,6 @@ use std::fmt::Formatter; use std::sync::Arc; use databend_common_ast::ast::ExplainKind; -use databend_common_ast::ast::Query; use databend_common_catalog::query_kind::QueryKind; use databend_common_expression::types::DataType; use databend_common_expression::DataField; @@ -122,6 +121,7 @@ use crate::plans::ModifyTableCommentPlan; use crate::plans::OptimizeCompactSegmentPlan; use crate::plans::OptimizePurgePlan; use crate::plans::PresignPlan; +use crate::plans::ReclusterPlan; use crate::plans::RefreshIndexPlan; use crate::plans::RefreshTableIndexPlan; use crate::plans::RefreshVirtualColumnPlan; @@ -247,11 +247,7 @@ pub enum Plan { ModifyTableColumn(Box), AlterTableClusterKey(Box), DropTableClusterKey(Box), - ReclusterTable { - s_expr: Box, - hilbert_query: Option>, - is_final: bool, - }, + ReclusterTable(Box), RevertTable(Box), TruncateTable(Box), VacuumTable(Box), diff --git a/src/query/sql/src/planner/plans/recluster.rs b/src/query/sql/src/planner/plans/recluster.rs index efe48c6cacd5b..5bd247509c1d4 100644 --- a/src/query/sql/src/planner/plans/recluster.rs +++ b/src/query/sql/src/planner/plans/recluster.rs @@ -12,35 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; use std::sync::Arc; -use databend_common_catalog::plan::Filters; -use educe::Educe; +use databend_common_ast::ast::Expr; +use databend_common_ast::parser::parse_sql; +use databend_common_ast::parser::tokenize_sql; +use databend_common_catalog::catalog::CatalogManager; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::types::NumberScalar; +use databend_common_expression::Scalar; +use crate::optimizer::optimize; +use crate::optimizer::OptimizerContext; use crate::optimizer::SExpr; -use crate::plans::Operator; -use crate::plans::RelOp; +use crate::plans::ConstantExpr; +use crate::plans::Plan; use crate::plans::RelOperator; +use crate::Binder; +use crate::MetadataRef; +use crate::NameResolutionContext; +use crate::ScalarExpr; -#[derive(Debug, PartialEq, Clone, Educe)] -#[educe(Eq, Hash)] -pub struct Recluster { +#[derive(Debug, Clone)] +pub struct ReclusterPlan { pub catalog: String, pub database: String, pub table: String, pub limit: Option, - #[educe(Hash(ignore))] - pub filters: Option, + pub selection: Option, + pub is_final: bool, } -impl Operator for Recluster { - fn rel_op(&self) -> RelOp { - RelOp::Recluster - } -} - -pub fn set_update_stream_columns(s_expr: &SExpr) -> databend_common_exception::Result { +pub fn set_update_stream_columns(s_expr: &SExpr) -> Result { match s_expr.plan() { RelOperator::Scan(scan) if scan.table_index == 0 => { let mut scan = scan.clone(); @@ -57,3 +63,104 @@ pub fn set_update_stream_columns(s_expr: &SExpr) -> databend_common_exception::R } } } + +#[async_backtrace::framed] +#[fastrace::trace] +pub async fn plan_hilbert_sql( + ctx: Arc, + metadata: MetadataRef, + sql: &str, +) -> Result { + let settings = ctx.get_settings(); + let tokens = tokenize_sql(sql)?; + let sql_dialect = settings.get_sql_dialect().unwrap_or_default(); + let (stmt, _) = parse_sql(&tokens, sql_dialect)?; + + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let binder = Binder::new( + ctx.clone(), + CatalogManager::instance(), + name_resolution_ctx, + metadata.clone(), + ); + let plan = binder.bind(&stmt).await?; + + let opt_ctx = OptimizerContext::new(ctx.clone(), metadata) + .with_enable_distributed_optimization(!ctx.get_cluster().is_empty()) + .with_enable_join_reorder(unsafe { !settings.get_disable_join_reorder()? }) + .with_enable_dphyp(settings.get_enable_dphyp()?) + .with_max_push_down_limit(settings.get_max_push_down_limit()?); + optimize(opt_ctx, plan).await +} + +pub fn replace_with_constant(expr: &SExpr, variables: &VecDeque, partitions: u16) -> SExpr { + #[recursive::recursive] + fn visit_expr_column(expr: &mut ScalarExpr, variables: &mut VecDeque) { + if let ScalarExpr::FunctionCall(call) = expr { + match call.func_name.as_str() { + "range_partition_id" => { + let last = call.arguments.last_mut().unwrap(); + let value = variables.pop_front().unwrap(); + *last = ScalarExpr::ConstantExpr(ConstantExpr { span: None, value }); + + let first = call.arguments.first_mut().unwrap(); + visit_expr_column(first, variables); + } + "hilbert_range_index" => { + for idx in (1..call.arguments.len()).step_by(2) { + let val = call.arguments.get_mut(idx).unwrap(); + let value = variables.pop_front().unwrap(); + *val = ScalarExpr::ConstantExpr(ConstantExpr { span: None, value }); + } + } + _ => (), + } + } + } + + #[recursive::recursive] + fn replace_with_constant_into_child( + s_expr: &SExpr, + variables: &mut VecDeque, + partitions: u16, + ) -> SExpr { + let mut s_expr = s_expr.clone(); + s_expr.plan = match s_expr.plan.as_ref() { + RelOperator::EvalScalar(expr) if !variables.is_empty() => { + let mut expr = expr.clone(); + for item in &mut expr.items { + visit_expr_column(&mut item.scalar, variables); + } + Arc::new(expr.into()) + } + RelOperator::Aggregate(aggr) => { + let mut aggr = aggr.clone(); + for item in &mut aggr.aggregate_functions { + if let ScalarExpr::AggregateFunction(func) = &mut item.scalar { + if func.func_name == "range_bound" { + func.params[0] = Scalar::Number(NumberScalar::UInt16(partitions)); + } + } + } + Arc::new(aggr.into()) + } + _ => s_expr.plan.clone(), + }; + + if s_expr.children.is_empty() { + s_expr + } else { + let mut children = Vec::with_capacity(s_expr.children.len()); + for child in s_expr.children.iter() { + children.push(Arc::new(replace_with_constant_into_child( + child, variables, partitions, + ))); + } + s_expr.children = children; + s_expr + } + } + + let mut variables = variables.clone(); + replace_with_constant_into_child(expr, &mut variables, partitions) +} diff --git a/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test b/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test index 10405c1a2aa16..35b4cb72bd6ed 100644 --- a/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test +++ b/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test @@ -19,10 +19,10 @@ statement ok USE test_hilbert statement ok -set enable_parallel_multi_merge_sort = 0; +create or replace table t(a int, b int) cluster by hilbert(a, b) row_per_block=2 block_per_segment=2 block_size_threshold = 18; statement ok -create or replace table t(a int, b int) cluster by hilbert(a, b) row_per_block=2 block_per_segment=2; +set hilbert_clustering_min_bytes = 35; statement ok insert into t values(1, 1), (3, 3); diff --git a/tests/sqllogictests/suites/mode/standalone/ee/explain_hilbert_clustering.test b/tests/sqllogictests/suites/mode/standalone/ee/explain_hilbert_clustering.test index a2926ad814ead..12fbf29fe9c89 100644 --- a/tests/sqllogictests/suites/mode/standalone/ee/explain_hilbert_clustering.test +++ b/tests/sqllogictests/suites/mode/standalone/ee/explain_hilbert_clustering.test @@ -12,9 +12,6 @@ ## See the License for the specific language governing permissions and ## limitations under the License. -statement ok -set enable_parallel_multi_merge_sort = 0; - statement ok CREATE OR REPLACE TABLE test_hilbert(a int, b int) cluster by hilbert(a,b) row_per_block = 2; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/window.test b/tests/sqllogictests/suites/mode/standalone/explain/window.test index dcfac800f61c2..f4dadf7600324 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/window.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/window.test @@ -56,7 +56,7 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -78,7 +78,7 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -368,7 +368,7 @@ explain pipeline select a, sum(a) over (partition by a order by a desc) from t l CompoundBlockOperator(Project) × 1 LimitTransform × 1 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -386,7 +386,7 @@ explain pipeline select a, sum(a) over (partition by a order by a desc) from t l CompoundBlockOperator(Project) × 1 LimitTransform × 1 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -405,7 +405,7 @@ explain pipeline select a, dense_rank() over (partition by a order by a desc) fr CompoundBlockOperator(Project) × 1 LimitTransform × 1 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -419,7 +419,7 @@ explain pipeline select a, sum(a) over (partition by a order by a desc rows betw CompoundBlockOperator(Project) × 1 LimitTransform × 1 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -433,7 +433,7 @@ explain pipeline select a, sum(a) over (partition by a order by a desc rows betw CompoundBlockOperator(Project) × 1 LimitTransform × 1 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -453,7 +453,7 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 DeserializeDataTransform × 1 @@ -472,7 +472,7 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 TransformFilter × 1 @@ -536,7 +536,7 @@ CompoundBlockOperator(Project) × 1 LimitTransform × 1 TransformFilter × 1 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(WindowTopN) × 1 ShufflePartition(WindowTopN) × 1 DeserializeDataTransform × 1 @@ -574,7 +574,7 @@ explain pipeline select number, lead(number,1, 0) over (partition by number % 3 CompoundBlockOperator(Project) × 1 Transform Window × 1 Transform Window × 1 - TransformWindowPartitionCollect × 1 + TransformWindowPartitionCollect(Sort) × 1 ShuffleMergePartition(Window) × 1 ShufflePartition(Window) × 1 CompoundBlockOperator(Map) × 1 diff --git a/tests/sqllogictests/suites/query/functions/02_0077_function_hilbert.test b/tests/sqllogictests/suites/query/functions/02_0077_function_hilbert.test index c2593e1beba37..bbf979ca057aa 100644 --- a/tests/sqllogictests/suites/query/functions/02_0077_function_hilbert.test +++ b/tests/sqllogictests/suites/query/functions/02_0077_function_hilbert.test @@ -1,53 +1,67 @@ query T -select to_hex(hilbert_key('abc')); +select hilbert_range_index(1, [1,2], 2, [1,2]); ---- -616263 +00000003 query T -select to_hex(hilbert_key(-1)); +select hilbert_range_index(null, [1,2], 2, [1,2]); ---- -7f +0000000D query T -select to_hex(hilbert_key(1)); +select hilbert_range_index(3, [1,2], 2, [1,2]); ---- -01 +0000000D query T -select to_hex(hilbert_key(-1.5)); +select hilbert_range_index(1, [], 2, [1,2]); ---- -4007ffffffffffff +00000003 query T -select to_hex(hilbert_key(1.5)); +select hilbert_range_index(1, [], 2, null); ---- -bff8000000000000 +00000000 statement error 1006 -SELECT hilbert_index([to_binary('6tkz2eT'), to_binary('3wnehCt')], 17165323156917005607); +select hilbert_range_index(1, ['a'], 2, [1,3]); -statement error 1006 -SELECT to_hex(hilbert_index([hilbert_key(1), hilbert_key(1), hilbert_key(1), hilbert_key(1),hilbert_key(1), hilbert_key(1)], 1)); +statement ok +create table t_hilbert(a int, b int); -statement error 1006 -SELECT to_hex(hilbert_index([hilbert_key(1)], 1)); +statement ok +insert into t_hilbert values(0, 0), (1, 2), (4, 3), (5, 6), (8, 7), (9, 10), (null, 8), (3, null); query T -SELECT to_hex(hilbert_index([to_binary('6tkz2eT'), to_binary('3wnehCt')], 7)); +select range_bound(4)(a) as a_bound, range_bound(4)(b) as b_bound from t_hilbert; ---- -0a1b808f82118177b9e68c34848a +[1,4,8] [2,6,8] query T -SELECT to_hex(hilbert_index([hilbert_key(1), hilbert_key(-1)], 1)); +with tt as (select range_bound(4)(a) as a_bound, range_bound(4)(b) as b_bound from t_hilbert) +select a, range_partition_id(a, tt.a_bound), b, range_partition_id(b, tt.b_bound) from t_hilbert, tt order by t_hilbert.a; ---- -3ffe +0 0 0 0 +1 0 2 0 +3 1 NULL 3 +4 1 3 1 +5 2 6 1 +8 2 7 2 +9 3 10 3 +NULL 3 8 2 query T -SELECT to_hex(hilbert_index([hilbert_key(1.5), hilbert_key(-1.5)], 8)); +with tt as (select range_bound(4)(a) as a_bound, range_bound(4)(b) as b_bound from t_hilbert) +select t.a, t.b, hilbert_range_index(t.a, tt.a_bound, t.b, tt.b_bound) as index from t_hilbert t, tt order by index; ---- -dfffffd5555555555555555555555555 +0 0 00000000 +1 2 00000000 +4 3 00000002 +3 NULL 00000006 +8 7 00000008 +9 10 0000000A +NULL 8 0000000B +5 6 0000000D -query T -SELECT to_hex(hilbert_index([hilbert_key(to_uint64(1)), hilbert_key('abc')], 8)); ----- -3c02140ebeafaaaaaaaaaaaaaaaaaaaa +statement ok +drop table t_hilbert all;