Skip to content

Commit 3ce36e9

Browse files
committed
feat(iceberg): introduce remove snapshot action (#21)
* feat(iceberg): basic remove snapshot * feat(iceberg): introduce new properties for remove snapshots * feat(iceberg): support remove schemas * refactor(iceberg): refactor file org * address comments * refactor(iceberg): refactor and ut * fix(iceberg): fix integration-test typo fix: fix ut fix: fix test address comments address comments
1 parent 16a4a21 commit 3ce36e9

File tree

11 files changed

+857
-5
lines changed

11 files changed

+857
-5
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ serde_with = { workspace = true }
8989
strum = { workspace = true, features = ["derive"] }
9090
thrift = { workspace = true }
9191
tokio = { workspace = true, optional = false, features = ["sync"] }
92+
tracing = { workspace = true }
9293
typed-builder = { workspace = true }
9394
url = { workspace = true }
9495
uuid = { workspace = true }

crates/iceberg/src/spec/snapshot.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ impl Snapshot {
175175
}
176176

177177
/// Get parent snapshot.
178-
#[cfg(test)]
179178
pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
180179
match self.parent_snapshot_id {
181180
Some(id) => table_metadata.snapshot_by_id(id).cloned(),
@@ -393,6 +392,33 @@ impl SnapshotRetention {
393392
}
394393
}
395394

395+
/// An iterator over the ancestors of a snapshot.
396+
pub struct AncestorIterator<'a> {
397+
current: Option<SnapshotRef>,
398+
table_metadata: &'a TableMetadata,
399+
}
400+
401+
impl Iterator for AncestorIterator<'_> {
402+
type Item = SnapshotRef;
403+
404+
fn next(&mut self) -> Option<Self::Item> {
405+
let current = self.current.take()?;
406+
407+
let next = current.parent_snapshot(self.table_metadata);
408+
self.current = next;
409+
410+
Some(current)
411+
}
412+
}
413+
414+
/// Returns an iterator over the ancestors of a snapshot.
415+
pub fn ancestors_of(snapshot: SnapshotRef, table_metadata: &TableMetadata) -> AncestorIterator<'_> {
416+
AncestorIterator {
417+
current: Some(snapshot),
418+
table_metadata,
419+
}
420+
}
421+
396422
#[cfg(test)]
397423
mod tests {
398424
use std::collections::HashMap;

crates/iceberg/src/spec/table_metadata.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
8282
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
8383
/// Default value for the max number of partitions to keep summary stats for.
8484
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
85+
/// Property key for max snapshot age in milliseconds.
86+
pub const MAX_SNAPSHOT_AGE_MS: &str = "history.expire.max-snapshot-age-ms";
87+
/// Default value for max snapshot age in milliseconds.
88+
pub const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 5 * 24 * 60 * 60 * 1000; // 5 days
89+
/// Property key for min snapshots to keep.
90+
pub const MIN_SNAPSHOTS_TO_KEEP: &str = "history.expire.min-snapshots-to-keep";
91+
/// Default value for min snapshots to keep.
92+
pub const MIN_SNAPSHOTS_TO_KEEP_DEFAULT: i32 = 1;
93+
/// Property key for max reference age in milliseconds.
94+
pub const MAX_REF_AGE_MS: &str = "history.expire.max-ref-age-ms";
95+
/// Default value for max reference age in milliseconds.
96+
pub const MAX_REF_AGE_MS_DEFAULT: i64 = i64::MAX;
8597

8698
/// Reserved Iceberg table properties list.
8799
///

crates/iceberg/src/transaction/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use std::collections::HashMap;
5656

5757
pub use action::*;
5858
mod append;
59+
pub mod remove_snapshots;
5960
mod snapshot;
6061
mod sort_order;
6162
mod update_location;
@@ -78,6 +79,7 @@ use crate::spec::{
7879
use crate::table::Table;
7980
use crate::transaction::action::BoxedTransactionAction;
8081
use crate::transaction::append::FastAppendAction;
82+
use crate::transaction::remove_snapshots::RemoveSnapshotAction;
8183
use crate::transaction::sort_order::ReplaceSortOrderAction;
8284
use crate::transaction::update_location::UpdateLocationAction;
8385
use crate::transaction::update_properties::UpdatePropertiesAction;
@@ -158,11 +160,16 @@ impl Transaction {
158160
UpdateLocationAction::new()
159161
}
160162

163+
/// Creates remove snapshot action.
164+
pub fn expire_snapshot(&self) -> RemoveSnapshotAction {
165+
RemoveSnapshotAction::new()
166+
}
167+
161168
/// Update the statistics of table
162169
pub fn update_statistics(&self) -> UpdateStatisticsAction {
163170
UpdateStatisticsAction::new()
164171
}
165-
172+
166173
/// Commit transaction.
167174
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
168175
if self.actions.is_empty() {
@@ -276,8 +283,8 @@ mod tests {
276283
use std::collections::HashMap;
277284
use std::fs::File;
278285
use std::io::BufReader;
279-
use std::sync::Arc;
280286
use std::sync::atomic::{AtomicU32, Ordering};
287+
use std::sync::Arc;
281288

282289
use crate::catalog::MockCatalog;
283290
use crate::io::FileIOBuilder;

0 commit comments

Comments
 (0)