Skip to content

Commit 0a2d86a

Browse files
xxchanLi0k
authored andcommitted
feat: allow specify snapshot id for fast append (#25)(#14)
Signed-off-by: xxchan <[email protected]> typo
1 parent 1535d08 commit 0a2d86a

File tree

6 files changed

+37
-7
lines changed

6 files changed

+37
-7
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub struct FastAppendAction {
3838
snapshot_properties: HashMap<String, String>,
3939
added_data_files: Vec<DataFile>,
4040
added_delete_files: Vec<DataFile>,
41+
snapshot_id: Option<i64>,
4142
}
4243

4344
impl FastAppendAction {
@@ -49,6 +50,7 @@ impl FastAppendAction {
4950
snapshot_properties: HashMap::default(),
5051
added_data_files: vec![],
5152
added_delete_files: vec![],
53+
snapshot_id: None,
5254
}
5355
}
5456

@@ -90,6 +92,12 @@ impl FastAppendAction {
9092
self.snapshot_properties = snapshot_properties;
9193
self
9294
}
95+
96+
/// Set snapshot id
97+
pub fn set_snapshot_id(mut self, snapshot_id: i64) -> Self {
98+
self.snapshot_id = Some(snapshot_id);
99+
self
100+
}
93101
}
94102

95103
#[async_trait]
@@ -102,6 +110,7 @@ impl TransactionAction for FastAppendAction {
102110
self.snapshot_properties.clone(),
103111
self.added_data_files.clone(),
104112
self.added_delete_files.clone(),
113+
self.snapshot_id,
105114
);
106115

107116
// validate added files

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,11 @@ impl<'a> SnapshotProducer<'a> {
9090
snapshot_properties: HashMap<String, String>,
9191
added_data_files: Vec<DataFile>,
9292
added_delete_files: Vec<DataFile>,
93+
snapshot_id: Option<i64>,
9394
) -> Self {
9495
Self {
9596
table,
96-
snapshot_id: Self::generate_unique_snapshot_id(table),
97+
snapshot_id: snapshot_id.unwrap_or_else(|| Self::generate_unique_snapshot_id(table)),
9798
commit_uuid,
9899
key_metadata,
99100
snapshot_properties,

crates/integration_tests/tests/shared_tests/append_data_file_test.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2323
use futures::TryStreamExt;
2424
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2525
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
26-
use iceberg::writer::file_writer::ParquetWriterBuilder;
2726
use iceberg::writer::file_writer::location_generator::{
2827
DefaultFileNameGenerator, DefaultLocationGenerator,
2928
};
29+
use iceberg::writer::file_writer::ParquetWriterBuilder;
3030
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
3131
use iceberg::{Catalog, TableCreation};
3232
use iceberg_catalog_rest::RestCatalog;
@@ -128,4 +128,24 @@ async fn test_append_data_file() {
128128
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
129129
assert_eq!(batches.len(), 1);
130130
assert_eq!(batches[0], batch);
131+
132+
// commit result again
133+
let tx = Transaction::new(&table);
134+
let append_action = tx.fast_append().add_data_files(data_file.clone());
135+
let tx = append_action.apply(tx).unwrap();
136+
let table = tx.commit(&rest_catalog).await.unwrap();
137+
138+
// check result again
139+
let batch_stream = table
140+
.scan()
141+
.select_all()
142+
.build()
143+
.unwrap()
144+
.to_arrow()
145+
.await
146+
.unwrap();
147+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
148+
assert_eq!(batches.len(), 2);
149+
assert_eq!(batches[0], batch);
150+
assert_eq!(batches[1], batch);
131151
}

crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartiti
2525
use iceberg::table::Table;
2626
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2727
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28-
use iceberg::writer::file_writer::ParquetWriterBuilder;
2928
use iceberg::writer::file_writer::location_generator::{
3029
DefaultFileNameGenerator, DefaultLocationGenerator,
3130
};
31+
use iceberg::writer::file_writer::ParquetWriterBuilder;
3232
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
3333
use iceberg::{Catalog, TableCreation};
3434
use iceberg_catalog_rest::RestCatalog;

crates/integration_tests/tests/shared_tests/conflict_commit_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2323
use futures::TryStreamExt;
2424
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2525
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
26-
use iceberg::writer::file_writer::ParquetWriterBuilder;
2726
use iceberg::writer::file_writer::location_generator::{
2827
DefaultFileNameGenerator, DefaultLocationGenerator,
2928
};
29+
use iceberg::writer::file_writer::ParquetWriterBuilder;
3030
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
3131
use iceberg::{Catalog, TableCreation};
3232
use iceberg_catalog_rest::RestCatalog;

crates/integration_tests/tests/shared_tests/scan_all_type.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ use arrow_schema::{DataType, Field, Fields};
3030
use futures::TryStreamExt;
3131
use iceberg::arrow::{DEFAULT_MAP_FIELD_NAME, UTC_TIME_ZONE};
3232
use iceberg::spec::{
33-
LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField,
34-
PrimitiveType, Schema, StructType, Type,
33+
ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, LIST_FIELD_NAME,
34+
MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME,
3535
};
3636
use iceberg::transaction::{ApplyTransactionAction, Transaction};
3737
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
38-
use iceberg::writer::file_writer::ParquetWriterBuilder;
3938
use iceberg::writer::file_writer::location_generator::{
4039
DefaultFileNameGenerator, DefaultLocationGenerator,
4140
};
41+
use iceberg::writer::file_writer::ParquetWriterBuilder;
4242
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
4343
use iceberg::{Catalog, TableCreation};
4444
use iceberg_catalog_rest::RestCatalog;

0 commit comments

Comments
 (0)