Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
33951cc
fix
zhyass Feb 7, 2025
9cf5738
fix
zhyass Feb 7, 2025
068005c
fix
zhyass Feb 7, 2025
2c5dabb
fix
zhyass Feb 7, 2025
323a9b3
fix
zhyass Feb 8, 2025
09bd1c3
fix
zhyass Feb 8, 2025
40dcb5f
update
zhyass Feb 10, 2025
af376e2
for test
zhyass Feb 10, 2025
41e75ae
for test
zhyass Feb 10, 2025
68ef653
for test
zhyass Feb 11, 2025
a94c18b
for test
zhyass Feb 11, 2025
98a6c3f
fix
zhyass Feb 12, 2025
01b3d2c
fix
zhyass Feb 12, 2025
a33c780
fix
zhyass Feb 12, 2025
f5d0d77
remove m_cte
zhyass Feb 12, 2025
886e257
fix
zhyass Feb 13, 2025
f5e4e92
fix
zhyass Feb 14, 2025
82cecfd
fix
zhyass Feb 14, 2025
06852d7
fix
zhyass Feb 14, 2025
5304e7d
fix
zhyass Feb 14, 2025
c7abf46
restore m cte
zhyass Feb 14, 2025
da49488
fix
zhyass Feb 15, 2025
662859e
fix
zhyass Feb 15, 2025
a0812d7
fix
zhyass Feb 15, 2025
3965e34
remove m_cte
zhyass Feb 15, 2025
d779cc2
fix
zhyass Feb 16, 2025
40a5565
fix
zhyass Feb 16, 2025
4b0f544
fix
zhyass Feb 16, 2025
5ab214d
fix
zhyass Feb 16, 2025
2de7307
fix
zhyass Feb 16, 2025
612ab71
fix
zhyass Feb 16, 2025
6b171a3
fix
zhyass Feb 17, 2025
2e2ceb4
fix
zhyass Feb 18, 2025
dc693c6
for test
zhyass Feb 19, 2025
06d7501
fix
zhyass Feb 21, 2025
196aef2
fix
zhyass Feb 25, 2025
50df9ce
fix
zhyass Feb 25, 2025
9b4560e
fix
zhyass Feb 26, 2025
58ebdea
for test
zhyass Feb 26, 2025
9eb7065
fix
zhyass Feb 26, 2025
f0da210
fix memory size
zhyass Mar 2, 2025
7303712
fix
zhyass Mar 2, 2025
ea14b64
fix
zhyass Mar 3, 2025
cecbf72
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 3, 2025
6f056e5
fix
zhyass Mar 3, 2025
2c42fe5
fix
zhyass Mar 3, 2025
a4066a8
fix
zhyass Mar 3, 2025
d6e5e11
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 3, 2025
acba086
fix
zhyass Mar 3, 2025
40a37e2
fix
zhyass Mar 4, 2025
29ce651
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 4, 2025
d0207c1
recover
zhyass Mar 4, 2025
5f08d01
fix
zhyass Mar 4, 2025
fcc46ae
fix
zhyass Mar 4, 2025
498dab7
fix
zhyass Mar 4, 2025
e084d6f
fix
zhyass Mar 5, 2025
9dd0fbd
fix
zhyass Mar 5, 2025
c4bd2e9
fix
zhyass Mar 5, 2025
9aabddd
fix test
zhyass Mar 6, 2025
1b18a8c
fix test
zhyass Mar 6, 2025
ce07f1c
fix test
zhyass Mar 6, 2025
5069c7d
fix test
zhyass Mar 6, 2025
ee427ce
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 6, 2025
a29ecc9
add hilbert_range_index
zhyass Mar 9, 2025
963c0f7
fix
zhyass Mar 9, 2025
1e14bc8
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 9, 2025
8623d2b
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 10, 2025
4916ec3
fix
zhyass Mar 10, 2025
9cc2caa
fix
zhyass Mar 10, 2025
a26cad4
fix
zhyass Mar 10, 2025
e9aec97
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 12, 2025
df874d0
fix
zhyass Mar 12, 2025
a7d42d5
Merge remote-tracking branch 'upstream/main' into feature_cluster
zhyass Mar 13, 2025
70c0d28
fix
zhyass Mar 13, 2025
3b32360
Merge branch 'main' into feature_cluster
zhyass Mar 17, 2025
add55c2
Merge branch 'main' into feature_cluster
zhyass Mar 18, 2025
871da0e
fix
zhyass Mar 19, 2025
3084292
chore: add some extra code comments
dantengsky Mar 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ build_exceptions! {
InvalidSessionState(4004),

// recluster error codes
NoNeedToRecluster(4011),
NoNeedToCompact(4012),
UnsupportedClusterType(4013),

Expand Down
110 changes: 42 additions & 68 deletions src/query/ee/src/hilbert_clustering/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ use databend_common_catalog::plan::ReclusterInfoSideCar;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::SegmentLocation;
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use databend_enterprise_hilbert_clustering::HilbertClusteringHandler;
use databend_enterprise_hilbert_clustering::HilbertClusteringHandlerWrapper;
use databend_storages_common_table_meta::meta::ClusterStatistics;
Expand Down Expand Up @@ -53,26 +56,26 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
return Ok(None);
};

let block_thresholds = fuse_table.get_block_thresholds();
let thresholds = BlockThresholds {
max_rows_per_block: block_thresholds.block_per_segment
* block_thresholds.max_rows_per_block,
min_rows_per_block: block_thresholds.block_per_segment
* block_thresholds.min_rows_per_block,
max_bytes_per_block: block_thresholds.block_per_segment
* block_thresholds.max_bytes_per_block,
max_bytes_per_file: block_thresholds.block_per_segment
* block_thresholds.max_bytes_per_file,
block_per_segment: block_thresholds.block_per_segment,
};
let block_per_seg =
fuse_table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
let hilbert_clustering_min_bytes =
ctx.get_settings().get_hilbert_clustering_min_bytes()? as usize;
let max_bytes_per_block = fuse_table.get_option(
FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD,
DEFAULT_BLOCK_BUFFER_SIZE,
);
let hilbert_min_bytes = std::cmp::max(
hilbert_clustering_min_bytes,
max_bytes_per_block * block_per_seg,
);
let segment_locations = snapshot.segments.clone();
let segment_locations = create_segment_location_vector(segment_locations, None);

let max_threads = ctx.get_settings().get_max_threads()? as usize;
let chunk_size = max_threads * 4;
let mut checker = ReclusterChecker::new(
cluster_key_id,
thresholds,
hilbert_min_bytes,
push_downs.as_ref().is_none_or(|v| v.filters.is_none()),
);
'FOR: for chunk in segment_locations.chunks(chunk_size) {
Expand All @@ -99,12 +102,6 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
return Ok(None);
}

let rows_per_block =
block_thresholds.calc_rows_per_block(checker.total_size, checker.total_rows, 0) as u64;
let block_size = ctx.get_settings().get_max_block_size()?;
ctx.get_settings()
.set_max_block_size(rows_per_block.min(block_size))?;

let mut removed_statistics = Statistics::default();
let mut removed_segment_indexes = Vec::with_capacity(target_segments.len());
for (segment_loc, segment) in target_segments {
Expand Down Expand Up @@ -137,82 +134,60 @@ impl RealHilbertClusteringHandler {

struct ReclusterChecker {
segments: Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>,
last_segment: Option<(SegmentLocation, Arc<CompactSegmentInfo>)>,
default_cluster_id: u32,
thresholds: BlockThresholds,

total_rows: usize,
total_size: usize,
hilbert_min_bytes: usize,
total_bytes: usize,

finished: bool,
// Whether the target segments is at the head of snapshot.
head_of_snapshot: bool,
}

impl ReclusterChecker {
fn new(default_cluster_id: u32, thresholds: BlockThresholds, head_of_snapshot: bool) -> Self {
fn new(default_cluster_id: u32, hilbert_min_bytes: usize, head_of_snapshot: bool) -> Self {
Self {
segments: vec![],
last_segment: None,
default_cluster_id,
thresholds,
total_rows: 0,
total_size: 0,
hilbert_min_bytes,
total_bytes: 0,
finished: false,
head_of_snapshot,
}
}

fn add(&mut self, location: SegmentLocation, segment: Arc<CompactSegmentInfo>) -> bool {
let row_count = segment.summary.row_count as usize;
let byte_size = segment.summary.uncompressed_byte_size as usize;
self.total_rows += row_count;
self.total_size += byte_size;
if !self
.thresholds
.check_large_enough(self.total_rows, self.total_size)
{
// totals < N
self.segments.push((location, segment));
return false;
}

let segment_should_recluster = self.should_recluster(&segment, |v| {
v.cluster_key_id != self.default_cluster_id || v.level != -1
});
let mut retained = false;
if !self.head_of_snapshot || segment_should_recluster {
if self
.thresholds
.check_for_compact(self.total_rows, self.total_size)
{
// N <= totals < 2N
self.segments.push((location, segment));
retained = true;
} else if segment_should_recluster {
// totals >= 2N
self.segments = vec![(location, segment)];
self.total_rows = row_count;
self.total_size = byte_size;
self.finished = true;
return true;
}

if segment_should_recluster || !self.head_of_snapshot {
self.total_bytes += segment.summary.uncompressed_byte_size as usize;
self.segments.push((location.clone(), segment.clone()));
}

if self.check_for_recluster() {
if !retained {
self.total_rows -= row_count;
self.total_size -= byte_size;
if !segment_should_recluster || self.total_bytes >= self.hilbert_min_bytes {
if self.check_for_recluster() {
self.finished = true;
return true;
}
self.finished = true;
return true;
self.last_segment = Some((location, segment));
self.reset();
}

self.reset();
false
}

fn finalize(&mut self) -> Vec<(SegmentLocation, Arc<CompactSegmentInfo>)> {
if !self.finished && !self.check_for_recluster() {
return vec![];
if !self.finished {
if let Some((location, segment)) = self.last_segment.take() {
self.segments.push((location, segment));
}
if !self.check_for_recluster() {
return vec![];
}
}
std::mem::take(&mut self.segments)
}
Expand All @@ -233,8 +208,7 @@ impl ReclusterChecker {
}

fn reset(&mut self) {
self.total_rows = 0;
self.total_size = 0;
self.total_bytes = 0;
self.head_of_snapshot = false;
self.segments.clear();
}
Expand Down
Loading