Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ runtime-format = "0.1.3"

# other dependencies
anyhow = "1.0.79"
bitmask-enum = "2.2.3"
bytes = "1.5.0"
bytesize = "1.3.0"
chrono = { version = "0.4.33", default-features = false, features = ["clock", "serde"] }
Expand Down
27 changes: 24 additions & 3 deletions crates/core/src/blob/packer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,33 @@ impl PackSizer {
/// * `size` - The size to check
#[must_use]
pub fn size_ok(&self, size: u32) -> bool {
!self.is_too_small(size) && !self.is_too_large(size)
}

/// Evaluates whether the given size is too small
///
/// # Arguments
///
/// * `size` - The size to check
#[must_use]
pub fn is_too_small(&self, size: u32) -> bool {
let target_size = self.pack_size();
// Note: we cast to u64 so that no overflow can occur in the multiplications
u64::from(size) * 100
< u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent)
}

/// Evaluates whether the given size is too large
///
/// # Arguments
///
/// * `size` - The size to check
#[must_use]
pub fn is_too_large(&self, size: u32) -> bool {
let target_size = self.pack_size();
// Note: we cast to u64 so that no overflow can occur in the multiplications
u64::from(size) * 100
>= u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent)
&& u64::from(size) * 100
<= u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent)
> u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent)
}

/// Adds the given size to the current size.
Expand Down
147 changes: 120 additions & 27 deletions crates/core/src/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use std::{
sync::{Arc, Mutex},
};

use bitmask_enum::bitmask;
use bytesize::ByteSize;
use chrono::{DateTime, Duration, Local};

use derive_more::Add;
use itertools::Itertools;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
Expand Down Expand Up @@ -269,6 +269,52 @@ impl FromStr for LimitOption {
}
}

#[bitmask(u8)]
#[bitmask_config(vec_debug)]
pub enum PackStatus {
NotCompressed,
TooYoung,
TimeNotSet,
TooLarge,
TooSmall,
HasUnusedBlobs,
HasUsedBlobs,
Marked,
}

#[derive(Debug, Clone, Copy)]
pub struct DebugDetailedStats {
pub packs: u64,
pub unused_blobs: u64,
pub unused_size: u64,
pub used_blobs: u64,
pub used_size: u64,
}

#[derive(Debug, Default)]
pub struct DebugStats(pub BTreeMap<(PackToDo, BlobType, PackStatus), DebugDetailedStats>);

impl DebugStats {
fn add(&mut self, pi: &PackInfo, todo: PackToDo, status: PackStatus) {
let blob_type = pi.blob_type;
let details = self
.0
.entry((todo, blob_type, status))
.or_insert(DebugDetailedStats {
packs: 0,
unused_blobs: 0,
unused_size: 0,
used_blobs: 0,
used_size: 0,
});
details.packs += 1;
details.unused_blobs += u64::from(pi.unused_blobs);
details.unused_size += u64::from(pi.unused_size);
details.used_blobs += u64::from(pi.used_blobs);
details.used_size += u64::from(pi.used_size);
}
}

/// Statistics about what is deleted or kept within `prune`
#[derive(Default, Debug, Clone, Copy)]
pub struct DeleteStats {
Expand Down Expand Up @@ -354,6 +400,8 @@ pub struct PruneStats {
pub index_files: u64,
/// Number of index files which will be rebuilt during the prune
pub index_files_rebuild: u64,
/// Number of index files which will be rebuilt during the prune
pub debug: DebugStats,
}

impl PruneStats {
Expand Down Expand Up @@ -393,8 +441,8 @@ impl PruneIndex {
}

/// Task to be executed by a `PrunePlan` on Packs
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PackToDo {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum PackToDo {
// TODO: Add documentation
Undecided,
/// The pack should be kept
Expand Down Expand Up @@ -506,8 +554,15 @@ impl PrunePack {
/// * `todo` - The task to be executed on the pack
/// * `pi` - The `PackInfo` of the pack
/// * `stats` - The `PruneStats` of the `PrunePlan`
fn set_todo(&mut self, todo: PackToDo, pi: &PackInfo, stats: &mut PruneStats) {
fn set_todo(
&mut self,
todo: PackToDo,
pi: &PackInfo,
status: PackStatus,
stats: &mut PruneStats,
) {
let tpe = self.blob_type;
stats.debug.add(pi, todo, status);
match todo {
PackToDo::Undecided => panic!("not possible"),
PackToDo::Keep => {
Expand Down Expand Up @@ -570,7 +625,7 @@ pub struct PrunePlan {
/// The ids of the existing packs
existing_packs: BTreeMap<Id, u32>,
/// The packs which should be repacked
repack_candidates: Vec<(PackInfo, RepackReason, usize, usize)>,
repack_candidates: Vec<(PackInfo, PackStatus, RepackReason, usize, usize)>,
/// The index files
index_files: Vec<PruneIndex>,
/// `prune` statistics
Expand Down Expand Up @@ -726,79 +781,116 @@ impl PrunePlan {
self.stats.blobs[pi.blob_type].unused += u64::from(pi.unused_blobs);
self.stats.size[pi.blob_type].used += u64::from(pi.used_size);
self.stats.size[pi.blob_type].unused += u64::from(pi.unused_size);
let mut status = PackStatus::none();

// Various checks to determine if packs need to be kept
let too_young = pack.time > Some(self.time - keep_pack);
if too_young && !pack.delete_mark {
status |= PackStatus::TooYoung;
}
let keep_uncacheable = repack_cacheable_only && !pack.blob_type.is_cacheable();

let to_compress = repack_uncompressed && !pack.is_compressed();
if to_compress {
status |= PackStatus::NotCompressed;
}
let size_mismatch = !pack_sizer[pack.blob_type].size_ok(pack.size);

if pack_sizer[pack.blob_type].is_too_small(pack.size) {
status |= PackStatus::TooSmall;
}
if pack_sizer[pack.blob_type].is_too_large(pack.size) {
status |= PackStatus::TooLarge;
}
match (pack.delete_mark, pi.used_blobs, pi.unused_blobs) {
(false, 0, _) => {
// unused pack
self.stats.packs.unused += 1;
status |= PackStatus::HasUnusedBlobs;
if too_young {
// keep packs which are too young
pack.set_todo(PackToDo::Keep, &pi, &mut self.stats);
pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats);
} else {
pack.set_todo(PackToDo::MarkDelete, &pi, &mut self.stats);
pack.set_todo(PackToDo::MarkDelete, &pi, status, &mut self.stats);
}
}
(false, 1.., 0) => {
// used pack
self.stats.packs.used += 1;
status |= PackStatus::HasUsedBlobs;
if too_young || keep_uncacheable {
pack.set_todo(PackToDo::Keep, &pi, &mut self.stats);
pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats);
} else if to_compress || repack_all {
self.repack_candidates.push((
pi,
status,
RepackReason::ToCompress,
index_num,
pack_num,
));
} else if size_mismatch {
self.repack_candidates.push((
pi,
status,
RepackReason::SizeMismatch,
index_num,
pack_num,
));
} else {
pack.set_todo(PackToDo::Keep, &pi, &mut self.stats);
pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats);
}
}

(false, 1.., 1..) => {
// partly used pack
self.stats.packs.partly_used += 1;
status |= PackStatus::HasUsedBlobs | PackStatus::HasUnusedBlobs;

if too_young || keep_uncacheable {
// keep packs which are too young and non-cacheable packs if requested
pack.set_todo(PackToDo::Keep, &pi, &mut self.stats);
pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats);
} else {
// other partly used pack => candidate for repacking
self.repack_candidates.push((
pi,
status,
RepackReason::PartlyUsed,
index_num,
pack_num,
));
}
}
(true, 0, _) => match pack.time {
Some(local_date_time) if self.time - local_date_time >= keep_delete => {
pack.set_todo(PackToDo::Delete, &pi, &mut self.stats);
(true, 0, _) => {
status |= PackStatus::Marked;
match pack.time {
// unneeded and marked pack => check if we can remove it.
Some(local_date_time)
if self.time - local_date_time >= keep_delete =>
{
status |= PackStatus::TooYoung;
pack.set_todo(PackToDo::Delete, &pi, status, &mut self.stats);
}
None => {
warn!("pack to delete {}: no time set, this should not happen! Keeping this pack.", pack.id);
status |= PackStatus::TimeNotSet;
pack.set_todo(
PackToDo::KeepMarkedAndCorrect,
&pi,
status,
&mut self.stats,
);
}
Some(_) => pack.set_todo(
PackToDo::KeepMarked,
&pi,
status,
&mut self.stats,
),
}
None => {
warn!("pack to delete {}: no time set, this should not happen! Keeping this pack.", pack.id);
pack.set_todo(PackToDo::KeepMarkedAndCorrect, &pi, &mut self.stats);
}
Some(_) => pack.set_todo(PackToDo::KeepMarked, &pi, &mut self.stats),
},
}
(true, 1.., _) => {
status |= PackStatus::Marked | PackStatus::HasUsedBlobs;
// needed blobs; mark this pack for recovery
pack.set_todo(PackToDo::Recover, &pi, &mut self.stats);
pack.set_todo(PackToDo::Recover, &pi, status, &mut self.stats);
}
}
}
Expand Down Expand Up @@ -849,7 +941,8 @@ impl PrunePlan {
let mut do_repack = BlobTypeMap::default();
let mut repack_size = BlobTypeMap::<u64>::default();

for (pi, repack_reason, index_num, pack_num) in std::mem::take(&mut self.repack_candidates)
for (pi, status, repack_reason, index_num, pack_num) in
std::mem::take(&mut self.repack_candidates)
{
let pack = &mut self.index_files[index_num].packs[pack_num];
let blob_type = pi.blob_type;
Expand All @@ -861,12 +954,12 @@ impl PrunePlan {
&& blob_type == BlobType::Data)
|| (repack_reason == RepackReason::SizeMismatch && no_resize)
{
pack.set_todo(PackToDo::Keep, &pi, &mut self.stats);
pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats);
} else if repack_reason == RepackReason::SizeMismatch {
resize_packs[blob_type].push((pi, index_num, pack_num));
resize_packs[blob_type].push((pi, status, index_num, pack_num));
repack_size[blob_type] += u64::from(pi.used_size);
} else {
pack.set_todo(PackToDo::Repack, &pi, &mut self.stats);
pack.set_todo(PackToDo::Repack, &pi, status, &mut self.stats);
repack_size[blob_type] += u64::from(pi.used_size);
do_repack[blob_type] = true;
}
Expand All @@ -881,9 +974,9 @@ impl PrunePlan {
} else {
PackToDo::Keep
};
for (pi, index_num, pack_num) in resize_packs {
for (pi, status, index_num, pack_num) in resize_packs {
let pack = &mut self.index_files[index_num].packs[pack_num];
pack.set_todo(todo, &pi, &mut self.stats);
pack.set_todo(todo, &pi, status, &mut self.stats);
}
}
}
Expand Down