diff --git a/Cargo.lock b/Cargo.lock index 8eaa2d1eb0..457540c01c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3551,6 +3551,7 @@ dependencies = [ "tera", "thrift", "tokio", + "tracing", "typed-builder 0.20.1", "url", "uuid", @@ -3692,6 +3693,7 @@ version = "0.6.0" dependencies = [ "arrow-array", "arrow-schema", + "chrono", "ctor", "datafusion", "futures", diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 8e562487b8..99d716141b 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -89,6 +89,7 @@ serde_with = { workspace = true } strum = { workspace = true, features = ["derive"] } thrift = { workspace = true } tokio = { workspace = true, optional = false, features = ["sync"] } +tracing = { workspace = true } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 04a9e15b34..1f587bac4f 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -175,7 +175,6 @@ impl Snapshot { } /// Get parent snapshot. - #[cfg(test)] pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option { match self.parent_snapshot_id { Some(id) => table_metadata.snapshot_by_id(id).cloned(), @@ -393,6 +392,33 @@ impl SnapshotRetention { } } +/// An iterator over the ancestors of a snapshot. +pub struct AncestorIterator<'a> { + current: Option, + table_metadata: &'a TableMetadata, +} + +impl Iterator for AncestorIterator<'_> { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let current = self.current.take()?; + + let next = current.parent_snapshot(self.table_metadata); + self.current = next; + + Some(current) + } +} + +/// Returns an iterator over the ancestors of a snapshot. +pub fn ancestors_of(snapshot: SnapshotRef, table_metadata: &TableMetadata) -> AncestorIterator<'_> { + AncestorIterator { + current: Some(snapshot), + table_metadata, + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 3b89f54674..b04c7fb43c 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -82,6 +82,18 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit"; /// Default value for the max number of partitions to keep summary stats for. pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; +/// Property key for max snapshot age in milliseconds. +pub const MAX_SNAPSHOT_AGE_MS: &str = "history.expire.max-snapshot-age-ms"; +/// Default value for max snapshot age in milliseconds. +pub const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 5 * 24 * 60 * 60 * 1000; // 5 days +/// Property key for min snapshots to keep. +pub const MIN_SNAPSHOTS_TO_KEEP: &str = "history.expire.min-snapshots-to-keep"; +/// Default value for min snapshots to keep. +pub const MIN_SNAPSHOTS_TO_KEEP_DEFAULT: i32 = 1; +/// Property key for max reference age in milliseconds. +pub const MAX_REF_AGE_MS: &str = "history.expire.max-ref-age-ms"; +/// Default value for max reference age in milliseconds. +pub const MAX_REF_AGE_MS_DEFAULT: i64 = i64::MAX; /// Reserved Iceberg table properties list. /// diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 06549a95c5..07d864f81b 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -56,6 +56,7 @@ use std::collections::HashMap; pub use action::*; mod append; +pub mod remove_snapshots; mod snapshot; mod sort_order; mod update_location; @@ -78,6 +79,7 @@ use crate::spec::{ use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::remove_snapshots::RemoveSnapshotAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -158,11 +160,16 @@ impl Transaction { UpdateLocationAction::new() } + /// Creates remove snapshot action. + pub fn expire_snapshot(&self) -> RemoveSnapshotAction { + RemoveSnapshotAction::new() + } + /// Update the statistics of table pub fn update_statistics(&self) -> UpdateStatisticsAction { UpdateStatisticsAction::new() } - + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { @@ -276,8 +283,8 @@ mod tests { use std::collections::HashMap; use std::fs::File; use std::io::BufReader; - use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; use crate::catalog::MockCatalog; use crate::io::FileIOBuilder; diff --git a/crates/iceberg/src/transaction/remove_snapshots.rs b/crates/iceberg/src/transaction/remove_snapshots.rs new file mode 100644 index 0000000000..0e18f9ece7 --- /dev/null +++ b/crates/iceberg/src/transaction/remove_snapshots.rs @@ -0,0 +1,507 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Transaction action for removing snapshot. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use itertools::Itertools; + +use crate::error::Result; +use crate::spec::{ + ancestors_of, SnapshotReference, SnapshotRetention, TableMetadata, MAIN_BRANCH, + MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP_DEFAULT, +}; +use crate::table::Table; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; + +/// RemoveSnapshotAction is a transaction action for removing snapshot. +pub struct RemoveSnapshotAction { + clear_expire_files: bool, + ids_to_remove: HashSet, + default_expired_older_than: i64, + default_min_num_snapshots: i32, + default_max_ref_age_ms: i64, + clear_expired_meta_data: bool, + + now: i64, +} + +impl Default for RemoveSnapshotAction { + fn default() -> Self { + Self::new() + } +} + +impl RemoveSnapshotAction { + /// Creates a new action. + pub fn new() -> Self { + let now = chrono::Utc::now().timestamp_millis(); + + Self { + clear_expire_files: false, + ids_to_remove: HashSet::new(), + default_expired_older_than: now - MAX_SNAPSHOT_AGE_MS_DEFAULT, + default_min_num_snapshots: MIN_SNAPSHOTS_TO_KEEP_DEFAULT, + default_max_ref_age_ms: MAX_REF_AGE_MS_DEFAULT, + now, + clear_expired_meta_data: false, + } + } + + /// Finished building the action and apply it to the transaction. + pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self { + self.clear_expire_files = clear_expire_files; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn expire_snapshot_id(mut self, expire_snapshot_id: i64) -> Self { + self.ids_to_remove.insert(expire_snapshot_id); + self + } + + /// Finished building the action and apply it to the transaction. + pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { + self.default_expired_older_than = timestamp_ms; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn retain_last(mut self, min_num_snapshots: i32) -> Self { + self.default_min_num_snapshots = min_num_snapshots; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn clear_expired_meta_data(mut self, clear_expired_meta_data: bool) -> Self { + self.clear_expired_meta_data = clear_expired_meta_data; + self + } + + fn compute_retained_refs( + &self, + snapshot_refs: &HashMap, + table_meta: &TableMetadata, + ) -> HashMap { + let mut retained_refs = HashMap::new(); + + for (ref_name, snapshot_ref) in snapshot_refs { + if ref_name == MAIN_BRANCH { + retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); + continue; + } + + let snapshot = table_meta.snapshot_by_id(snapshot_ref.snapshot_id); + let max_ref_age_ms = match &snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep: _, + max_snapshot_age_ms: _, + max_ref_age_ms, + } => max_ref_age_ms, + SnapshotRetention::Tag { max_ref_age_ms } => max_ref_age_ms, + } + .unwrap_or(self.default_max_ref_age_ms); + + if let Some(snapshot) = snapshot { + let ref_age_ms = self.now - snapshot.timestamp_ms(); + if ref_age_ms <= max_ref_age_ms { + retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); + } + } else { + tracing::warn!( + "Snapshot {} for reference {} not found, removing the reference", + snapshot_ref.snapshot_id, + ref_name + ); + } + } + + retained_refs + } + + fn compute_all_branch_snapshots_to_retain( + &self, + refs: impl Iterator, + table_meta: &TableMetadata, + ) -> HashSet { + let mut branch_snapshots_to_retain = HashSet::new(); + for snapshot_ref in refs { + if snapshot_ref.is_branch() { + let max_snapshot_age_ms = match snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep: _, + max_snapshot_age_ms, + max_ref_age_ms: _, + } => max_snapshot_age_ms, + SnapshotRetention::Tag { max_ref_age_ms: _ } => None, + }; + + let expire_snapshot_older_than = + if let Some(max_snapshot_age_ms) = max_snapshot_age_ms { + self.now - max_snapshot_age_ms + } else { + self.default_expired_older_than + }; + + let min_snapshots_to_keep = match snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms: _, + max_ref_age_ms: _, + } => min_snapshots_to_keep, + SnapshotRetention::Tag { max_ref_age_ms: _ } => None, + } + .unwrap_or(self.default_min_num_snapshots); + + branch_snapshots_to_retain.extend(self.compute_branch_snapshots_to_retain( + snapshot_ref.snapshot_id, + expire_snapshot_older_than, + min_snapshots_to_keep as usize, + table_meta, + )); + } + } + + branch_snapshots_to_retain + } + + fn compute_branch_snapshots_to_retain( + &self, + snapshot_id: i64, + expire_snapshots_older_than: i64, + min_snapshots_to_keep: usize, + table_meta: &TableMetadata, + ) -> HashSet { + let mut ids_to_retain = HashSet::new(); + if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_id) { + let ancestors = ancestors_of(snapshot.clone(), table_meta); + for ancestor in ancestors { + if ids_to_retain.len() < min_snapshots_to_keep + || ancestor.timestamp_ms() >= expire_snapshots_older_than + { + ids_to_retain.insert(ancestor.snapshot_id()); + } else { + return ids_to_retain; + } + } + } + + ids_to_retain + } + + fn unreferenced_snapshots_to_retain( + &self, + refs: impl Iterator, + table_meta: &TableMetadata, + ) -> HashSet { + let mut ids_to_retain = HashSet::new(); + let mut referenced_snapshots = HashSet::new(); + + for snapshot_ref in refs { + if snapshot_ref.is_branch() { + if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_ref.snapshot_id) { + let ancestors = ancestors_of(snapshot.clone(), table_meta); + for ancestor in ancestors { + referenced_snapshots.insert(ancestor.snapshot_id()); + } + } + } else { + referenced_snapshots.insert(snapshot_ref.snapshot_id); + } + } + + for snapshot in table_meta.snapshots() { + if !referenced_snapshots.contains(&snapshot.snapshot_id()) + && snapshot.timestamp_ms() >= self.default_expired_older_than + { + ids_to_retain.insert(snapshot.snapshot_id()); + } + } + + ids_to_retain + } +} + +#[async_trait] +impl TransactionAction for RemoveSnapshotAction { + async fn commit(self: Arc, table: &Table) -> Result { + if table.metadata().refs.is_empty() { + return Ok(ActionCommit::new(vec![], vec![])); + } + + let table_meta = table.metadata().clone(); + + let mut ids_to_retain = HashSet::new(); + let retained_refs = self.compute_retained_refs(&table_meta.refs, &table_meta); + let mut retained_id_to_refs = HashMap::new(); + for (ref_name, snapshot_ref) in &retained_refs { + let snapshot_id = snapshot_ref.snapshot_id; + retained_id_to_refs + .entry(snapshot_id) + .or_insert_with(Vec::new) + .push(ref_name.clone()); + + ids_to_retain.insert(snapshot_id); + } + + for id_to_remove in &self.ids_to_remove { + if let Some(refs_for_id) = retained_id_to_refs.get(id_to_remove) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot remove snapshot {:?} with retained references: {:?}", + id_to_remove, refs_for_id + ), + )); + } + } + + ids_to_retain.extend(self.compute_all_branch_snapshots_to_retain( + table_meta.refs.values().cloned(), + &table_meta, + )); + ids_to_retain.extend( + self.unreferenced_snapshots_to_retain(table_meta.refs.values().cloned(), &table_meta), + ); + + let mut updates = vec![]; + let mut requirements = vec![]; + + for ref_name in table_meta.refs.keys() { + if !retained_refs.contains_key(ref_name) { + updates.push(TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.clone(), + }); + } + } + + let mut snapshot_to_remove = Vec::from_iter(self.ids_to_remove.iter().cloned()); + for snapshot in table_meta.snapshots() { + if !ids_to_retain.contains(&snapshot.snapshot_id()) { + snapshot_to_remove.push(snapshot.snapshot_id()); + } + } + + if !snapshot_to_remove.is_empty() { + // TODO: batch remove when server supports it + for snapshot_id in snapshot_to_remove { + updates.push(TableUpdate::RemoveSnapshots { + snapshot_ids: vec![snapshot_id], + }); + } + } + + if self.clear_expired_meta_data { + let mut reachable_specs = HashSet::new(); + reachable_specs.insert(table_meta.current_schema_id()); + let mut reachable_schemas = HashSet::new(); + reachable_schemas.insert(table_meta.current_schema_id()); + + // TODO: parallelize loading manifest list and get reachable specs and schemas to reduce latency + for snapshot in table_meta.snapshots() { + if ids_to_retain.contains(&snapshot.snapshot_id()) { + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table_meta) + .await?; + + for manifest in manifest_list.entries() { + reachable_specs.insert(manifest.partition_spec_id); + } + + if let Some(schema_id) = snapshot.schema_id() { + reachable_schemas.insert(schema_id); + } + } + } + + let spec_to_remove: Vec = table_meta + .partition_specs_iter() + .filter_map(|spec| { + if !reachable_specs.contains(&spec.spec_id()) { + Some(spec.spec_id()) + } else { + None + } + }) + .unique() + .collect(); + + if !spec_to_remove.is_empty() { + updates.push(TableUpdate::RemovePartitionSpecs { + spec_ids: spec_to_remove, + }); + } + + let schema_to_remove: Vec = table_meta + .schemas_iter() + .filter_map(|schema| { + if !reachable_schemas.contains(&schema.schema_id()) { + Some(schema.schema_id()) + } else { + None + } + }) + .unique() + .collect(); + + if !schema_to_remove.is_empty() { + updates.push(TableUpdate::RemoveSchemas { + schema_ids: schema_to_remove, + }); + } + } + + requirements.push(TableRequirement::UuidMatch { + uuid: table_meta.uuid(), + }); + requirements.push(TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table_meta.current_snapshot_id(), + }); + + Ok(ActionCommit::new(updates, requirements)) + } +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::BufReader; + use std::sync::Arc; + + use crate::io::FileIOBuilder; + use crate::spec::{TableMetadata, MAIN_BRANCH}; + use crate::table::Table; + use crate::transaction::{Transaction, TransactionAction}; + use crate::{TableIdent, TableRequirement}; + + fn make_v2_table_with_mutli_snapshot() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2ValidMultiSnapshot.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_remove_snapshot_action() { + let table = make_v2_table_with_mutli_snapshot(); + let table_meta = table.metadata().clone(); + assert_eq!(5, table_meta.snapshots().count()); + { + let tx = Transaction::new(&table); + let act = tx.expire_snapshot(); + let mut v = Arc::new(act).commit(&table).await.unwrap(); + + let updates = v.take_updates(); + assert_eq!(4, updates.len()); + + let requirements = v.take_requirements(); + assert_eq!(2, requirements.len()); + // assert_eq!(4, tx.updates.len()); + + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id() + } + ], + requirements + ); + } + + { + let tx = Transaction::new(&table); + let act = tx.expire_snapshot().retain_last(2); + let mut action_commit = Arc::new(act).commit(&table).await.unwrap(); + + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + assert_eq!(3, updates.len()); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id(), + } + ], + requirements + ); + } + + { + let tx = Transaction::new(&table); + let act = tx.expire_snapshot().retain_last(100).expire_older_than(100); + + let mut action_commit = Arc::new(act).commit(&table).await.unwrap(); + + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + assert_eq!(0, updates.len()); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id(), + } + ], + requirements + ); + } + + { + // test remove main current snapshot + let tx = Transaction::new(&table); + let act = tx + .expire_snapshot() + .expire_snapshot_id(table.metadata().current_snapshot_id().unwrap()); + + let err = Arc::new(act).commit(&table).await.err().unwrap(); + assert_eq!( + "DataInvalid => Cannot remove snapshot 3067729675574597004 with retained references: [\"main\"]", + err.to_string() + ) + } + } +} diff --git a/crates/iceberg/testdata/table_metadata/TableMetadataV2ValidMultiSnapshot.json b/crates/iceberg/testdata/table_metadata/TableMetadataV2ValidMultiSnapshot.json new file mode 100644 index 0000000000..a52fa2a906 --- /dev/null +++ b/crates/iceberg/testdata/table_metadata/TableMetadataV2ValidMultiSnapshot.json @@ -0,0 +1,167 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1675100955770, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } + ], + "properties": {}, + "current-snapshot-id": 3067729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/1.avro" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3059729675574597004, + "parent-snapshot-id": 3055729675574597004, + "timestamp-ms": 1595100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/3.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3063729675574597004, + "parent-snapshot-id": 3059729675574597004, + "timestamp-ms": 1635100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/4.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3067729675574597004, + "parent-snapshot-id": 3063729675574597004, + "timestamp-ms": 1675100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/5.avro", + "schema-id": 1 + } + ], + "snapshot-log": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770 + }, + { + "snapshot-id": 3055729675574597004, + "timestamp-ms": 1555100955770 + }, + { + "snapshot-id": 3059729675574597004, + "timestamp-ms": 1595100955770 + }, + { + "snapshot-id": 3063729675574597004, + "timestamp-ms": 1635100955770 + }, + { + "snapshot-id": 3067729675574597004, + "timestamp-ms": 1675100955770 + } + ], + "metadata-log": [] +} \ No newline at end of file diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 07eea5f375..0f2a7563aa 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,6 +27,7 @@ version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +chrono = { workspace = true } ctor = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 4121c5210b..7755b745dd 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -76,7 +76,7 @@ async fn test_append_data_file() { file_name_generator.clone(), ); let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); - let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); @@ -131,7 +131,13 @@ async fn test_append_data_file() { // commit result again let tx = Transaction::new(&table); - let append_action = tx.fast_append().add_data_files(data_file.clone()); + + // Create a new data file for the second commit + let mut data_file_writer_2 = data_file_writer_builder.clone().build().await.unwrap(); + data_file_writer_2.write(batch.clone()).await.unwrap(); + let data_file_2 = data_file_writer_2.close().await.unwrap(); + + let append_action = tx.fast_append().add_data_files(data_file_2); let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index feb1c4e585..4bab534ee6 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -29,6 +29,7 @@ mod conflict_commit_test; mod datafusion; mod read_evolved_schema; mod read_positional_deletes; +mod remove_snapshots_test; mod scan_all_type; pub async fn random_ns() -> Namespace { diff --git a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs new file mode 100644 index 0000000000..fddb2dc37e --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; +use crate::shared_tests::{random_ns, test_schema}; + +#[tokio::test] +async fn test_expire_snapshots_by_count() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // commit result + for i in 0..10 { + // Create a new data file writer for each iteration + let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap(); + + // Create different data for each iteration + let col1 = StringArray::from(vec![ + Some(format!("foo_{}", i)), + Some(format!("bar_{}", i)), + None, + Some(format!("baz_{}", i)) + ]); + let col2 = Int32Array::from(vec![Some(i), Some(i+1), Some(i+2), Some(i+3)]); + let col3 = BooleanArray::from(vec![Some(i % 2 == 0), Some(i % 2 == 1), None, Some(i % 3 == 0)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + + // Write the unique data and get the data file + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + let tx = Transaction::new(&table); + let append_action = tx.fast_append(); + let tx = append_action + .add_data_files(data_file) + .apply(tx) + .unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + } + + // check snapshot count + let snapshot_counts = table.metadata().snapshots().count(); + assert_eq!(10, snapshot_counts); + + let tx = Transaction::new(&table); + let now = chrono::Utc::now().timestamp_millis(); + let remove_action = tx.expire_snapshot().retain_last(5).expire_older_than(now); + let tx = remove_action.apply(tx).unwrap(); + let t = tx.commit(&rest_catalog).await.unwrap(); + assert_eq!(5, t.metadata().snapshots().count()); +}