Skip to content

Commit fe85402

Browse files
authored
cherry pick fast append api (#25)
* allow specify snapshot id for fast append (#14) * allow specify snapshot id for fast append * fix tests * fix
1 parent 5ed930f commit fe85402

File tree

5 files changed

+29
-12
lines changed

5 files changed

+29
-12
lines changed

crates/iceberg/src/transaction.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ impl<'a> Transaction<'a> {
111111
Ok(self)
112112
}
113113

114-
fn generate_unique_snapshot_id(&self) -> i64 {
114+
/// Generate a unique snapshot id.
115+
pub fn generate_unique_snapshot_id(&self) -> i64 {
115116
let generate_random_id = || -> i64 {
116117
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
117118
let snapshot_id = (lhs ^ rhs) as i64;
@@ -136,10 +137,26 @@ impl<'a> Transaction<'a> {
136137
/// Creates a fast append action.
137138
pub fn fast_append(
138139
self,
140+
snapshot_id: Option<i64>,
139141
commit_uuid: Option<Uuid>,
140142
key_metadata: Vec<u8>,
141143
) -> Result<FastAppendAction<'a>> {
142-
let snapshot_id = self.generate_unique_snapshot_id();
144+
let snapshot_id = if let Some(snapshot_id) = snapshot_id {
145+
if self
146+
.table
147+
.metadata()
148+
.snapshots()
149+
.any(|s| s.snapshot_id() == snapshot_id)
150+
{
151+
return Err(Error::new(
152+
ErrorKind::DataInvalid,
153+
format!("Snapshot id {} already exists", snapshot_id),
154+
));
155+
}
156+
snapshot_id
157+
} else {
158+
self.generate_unique_snapshot_id()
159+
};
143160
FastAppendAction::new(
144161
self,
145162
snapshot_id,
@@ -894,7 +911,7 @@ mod tests {
894911
async fn test_fast_append_action() {
895912
let table = make_v2_minimal_table();
896913
let tx = Transaction::new(&table);
897-
let mut action = tx.fast_append(None, vec![]).unwrap();
914+
let mut action = tx.fast_append(None, None, vec![]).unwrap();
898915

899916
// check add data file with incompatible partition value
900917
let data_file = DataFileBuilder::default()
@@ -1001,7 +1018,7 @@ mod tests {
10011018
format!("{}/3.parquet", &fixture.table_location),
10021019
];
10031020

1004-
let fast_append_action = tx.fast_append(None, vec![]).unwrap();
1021+
let fast_append_action = tx.fast_append(None, None, vec![]).unwrap();
10051022

10061023
// Attempt to add the existing Parquet files with fast append.
10071024
let new_tx = fast_append_action

crates/integration_tests/tests/shared_tests/append_data_file_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ async fn test_append_data_file() {
112112

113113
// commit result
114114
let tx = Transaction::new(&table);
115-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
115+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
116116
append_action.add_data_files(data_file.clone()).unwrap();
117117
let tx = append_action.apply().await.unwrap();
118118
let table = tx.commit(&rest_catalog).await.unwrap();
@@ -132,7 +132,7 @@ async fn test_append_data_file() {
132132

133133
// commit result again
134134
let tx = Transaction::new(&table);
135-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
135+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
136136
append_action.add_data_files(data_file.clone()).unwrap();
137137
let tx = append_action.apply().await.unwrap();
138138
let table = tx.commit(&rest_catalog).await.unwrap();

crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async fn test_append_partition_data_file() {
119119

120120
// commit result
121121
let tx = Transaction::new(&table);
122-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
122+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
123123
append_action
124124
.add_data_files(data_file_valid.clone())
125125
.unwrap();
@@ -178,7 +178,7 @@ async fn test_schema_incompatible_partition_type(
178178
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
179179

180180
let tx = Transaction::new(&table);
181-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
181+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
182182
if append_action
183183
.add_data_files(data_file_invalid.clone())
184184
.is_ok()
@@ -217,7 +217,7 @@ async fn test_schema_incompatible_partition_fields(
217217
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
218218

219219
let tx = Transaction::new(&table);
220-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
220+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
221221
if append_action
222222
.add_data_files(data_file_invalid.clone())
223223
.is_ok()

crates/integration_tests/tests/shared_tests/conflict_commit_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ async fn test_append_data_file_conflict() {
9090

9191
// start two transaction and commit one of them
9292
let tx1 = Transaction::new(&table);
93-
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
93+
let mut append_action = tx1.fast_append(None, None, vec![]).unwrap();
9494
append_action.add_data_files(data_file.clone()).unwrap();
9595
let tx1 = append_action.apply().await.unwrap();
9696

9797
let tx2 = Transaction::new(&table);
98-
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
98+
let mut append_action = tx2.fast_append(None, None, vec![]).unwrap();
9999
append_action.add_data_files(data_file.clone()).unwrap();
100100
let tx2 = append_action.apply().await.unwrap();
101101
let table = tx2

crates/integration_tests/tests/shared_tests/scan_all_type.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ async fn test_scan_all_type() {
309309

310310
// commit result
311311
let tx = Transaction::new(&table);
312-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
312+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
313313
append_action.add_data_files(data_file.clone()).unwrap();
314314
let tx = append_action.apply().await.unwrap();
315315
let table = tx.commit(&rest_catalog).await.unwrap();

0 commit comments

Comments
 (0)