Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ serde_with = { workspace = true }
strum = { workspace = true, features = ["derive"] }
thrift = { workspace = true }
tokio = { workspace = true, optional = false, features = ["sync"] }
tracing = { workspace = true }
typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
Expand Down
28 changes: 27 additions & 1 deletion crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ impl Snapshot {
}

/// Get parent snapshot.
#[cfg(test)]
pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
match self.parent_snapshot_id {
Some(id) => table_metadata.snapshot_by_id(id).cloned(),
Expand Down Expand Up @@ -393,6 +392,33 @@ impl SnapshotRetention {
}
}

/// An iterator over the ancestors of a snapshot.
pub struct AncestorIterator<'a> {
current: Option<SnapshotRef>,
table_metadata: &'a TableMetadata,
}

impl Iterator for AncestorIterator<'_> {
type Item = SnapshotRef;

fn next(&mut self) -> Option<Self::Item> {
let current = self.current.take()?;

let next = current.parent_snapshot(self.table_metadata);
self.current = next;

Some(current)
}
}

/// Returns an iterator over the ancestors of a snapshot.
pub fn ancestors_of(snapshot: SnapshotRef, table_metadata: &TableMetadata) -> AncestorIterator<'_> {
AncestorIterator {
current: Some(snapshot),
table_metadata,
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
12 changes: 12 additions & 0 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
/// Default value for the max number of partitions to keep summary stats for.
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
/// Property key for max snapshot age in milliseconds.
pub const MAX_SNAPSHOT_AGE_MS: &str = "history.expire.max-snapshot-age-ms";
/// Default value for max snapshot age in milliseconds.
pub const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 5 * 24 * 60 * 60 * 1000; // 5 days
/// Property key for min snapshots to keep.
pub const MIN_SNAPSHOTS_TO_KEEP: &str = "history.expire.min-snapshots-to-keep";
/// Default value for min snapshots to keep.
pub const MIN_SNAPSHOTS_TO_KEEP_DEFAULT: i32 = 1;
/// Property key for max reference age in milliseconds.
pub const MAX_REF_AGE_MS: &str = "history.expire.max-ref-age-ms";
/// Default value for max reference age in milliseconds.
pub const MAX_REF_AGE_MS_DEFAULT: i64 = i64::MAX;

/// Reserved Iceberg table properties list.
///
Expand Down
11 changes: 9 additions & 2 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use std::collections::HashMap;

pub use action::*;
mod append;
pub mod remove_snapshots;
mod snapshot;
mod sort_order;
mod update_location;
Expand All @@ -78,6 +79,7 @@ use crate::spec::{
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
use crate::transaction::remove_snapshots::RemoveSnapshotAction;
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
Expand Down Expand Up @@ -158,11 +160,16 @@ impl Transaction {
UpdateLocationAction::new()
}

/// Creates remove snapshot action.
pub fn expire_snapshot(&self) -> RemoveSnapshotAction {
RemoveSnapshotAction::new()
}

/// Update the statistics of table
pub fn update_statistics(&self) -> UpdateStatisticsAction {
UpdateStatisticsAction::new()
}

/// Commit transaction.
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
if self.actions.is_empty() {
Expand Down Expand Up @@ -276,8 +283,8 @@ mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use crate::catalog::MockCatalog;
use crate::io::FileIOBuilder;
Expand Down
Loading
Loading