Skip to content

Commit 38080d3

Browse files
Li0kZENOTME
andauthored
feat: support merge append (#29) (#78)
Signed-off-by: xxchan <[email protected]> Co-authored-by: ZENOTME <[email protected]> Co-authored-by: ZENOTME <[email protected]>
1 parent 3c63a85 commit 38080d3

File tree

8 files changed

+794
-19
lines changed

8 files changed

+794
-19
lines changed

crates/iceberg/src/spec/manifest/entry.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ impl ManifestEntry {
147147
pub fn data_file(&self) -> &DataFile {
148148
&self.data_file
149149
}
150+
151+
/// File sequence number indicating when the file was added. Inherited when null and status is 1 (added).
152+
#[inline]
153+
pub fn file_sequence_number(&self) -> Option<i64> {
154+
self.file_sequence_number
155+
}
150156
}
151157

152158
/// Used to track additions and deletions in ManifestEntry.

crates/iceberg/src/transaction/append.rs

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,22 @@ use crate::error::Result;
2525
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
2626
use crate::table::Table;
2727
use crate::transaction::snapshot::{
28-
generate_unique_snapshot_id, DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
28+
generate_unique_snapshot_id, DefaultManifestProcess, MergeManifestProcess,
29+
SnapshotProduceOperation, SnapshotProducer,
2930
};
3031
use crate::transaction::{ActionCommit, TransactionAction};
3132
use crate::{Error, ErrorKind};
3233

34+
/// Target size of manifest file when merging manifests.
35+
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
36+
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
37+
/// Minimum number of manifests to merge.
38+
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
39+
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
40+
/// Whether allow to merge manifests.
41+
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
42+
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
43+
3344
/// FastAppendAction is a transaction action for fast append data files to the table.
3445
pub struct FastAppendAction {
3546
check_duplicate: bool,
@@ -189,6 +200,141 @@ impl SnapshotProduceOperation for FastAppendOperation {
189200
}
190201
}
191202

203+
/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests
204+
/// based on the target size.
205+
pub struct MergeAppendAction {
206+
// snapshot_produce_action: SnapshotProducer<'_>,
207+
target_size_bytes: u32,
208+
min_count_to_merge: u32,
209+
merge_enabled: bool,
210+
211+
check_duplicate: bool,
212+
// below are properties used to create SnapshotProducer when commit
213+
commit_uuid: Option<Uuid>,
214+
key_metadata: Option<Vec<u8>>,
215+
snapshot_properties: HashMap<String, String>,
216+
added_data_files: Vec<DataFile>,
217+
added_delete_files: Vec<DataFile>,
218+
snapshot_id: Option<i64>,
219+
}
220+
221+
impl MergeAppendAction {
222+
#[allow(clippy::too_many_arguments)]
223+
pub(crate) fn new() -> Self {
224+
Self {
225+
target_size_bytes: MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
226+
min_count_to_merge: MANIFEST_MIN_MERGE_COUNT_DEFAULT,
227+
merge_enabled: MANIFEST_MERGE_ENABLED_DEFAULT,
228+
check_duplicate: true,
229+
commit_uuid: None,
230+
key_metadata: None,
231+
snapshot_properties: HashMap::default(),
232+
added_data_files: vec![],
233+
added_delete_files: vec![],
234+
snapshot_id: None,
235+
}
236+
}
237+
238+
pub fn set_target_size_bytes(mut self, v: u32) -> Self {
239+
self.target_size_bytes = v;
240+
self
241+
}
242+
243+
pub fn set_min_count_to_merge(mut self, v: u32) -> Self {
244+
self.min_count_to_merge = v;
245+
self
246+
}
247+
248+
pub fn set_merge_enabled(mut self, v: bool) -> Self {
249+
self.merge_enabled = v;
250+
self
251+
}
252+
253+
pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap<String, String>) -> Self {
254+
let target_size_bytes: u32 = snapshot_properties
255+
.get(MANIFEST_TARGET_SIZE_BYTES)
256+
.and_then(|s| s.parse().ok())
257+
.unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
258+
let min_count_to_merge: u32 = snapshot_properties
259+
.get(MANIFEST_MIN_MERGE_COUNT)
260+
.and_then(|s| s.parse().ok())
261+
.unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT);
262+
let merge_enabled = snapshot_properties
263+
.get(MANIFEST_MERGE_ENABLED)
264+
.and_then(|s| s.parse().ok())
265+
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);
266+
267+
self.snapshot_properties = snapshot_properties;
268+
self.target_size_bytes = target_size_bytes;
269+
self.min_count_to_merge = min_count_to_merge;
270+
self.merge_enabled = merge_enabled;
271+
272+
self
273+
}
274+
275+
/// Add data files to the snapshot.
276+
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
277+
self.added_data_files.extend(data_files);
278+
self
279+
}
280+
}
281+
282+
#[async_trait]
283+
impl TransactionAction for MergeAppendAction {
284+
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
285+
let snapshot_id = if let Some(snapshot_id) = self.snapshot_id {
286+
if table
287+
.metadata()
288+
.snapshots()
289+
.any(|s| s.snapshot_id() == snapshot_id)
290+
{
291+
return Err(Error::new(
292+
ErrorKind::DataInvalid,
293+
format!("Snapshot id {} already exists", snapshot_id),
294+
));
295+
}
296+
snapshot_id
297+
} else {
298+
generate_unique_snapshot_id(table)
299+
};
300+
301+
let snapshot_producer = SnapshotProducer::new(
302+
table,
303+
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
304+
self.key_metadata.clone(),
305+
self.snapshot_properties.clone(),
306+
self.added_data_files.clone(),
307+
self.added_delete_files.clone(),
308+
snapshot_id,
309+
);
310+
311+
// validate added files
312+
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
313+
snapshot_producer.validate_added_data_files(&self.added_delete_files)?;
314+
315+
// Checks duplicate files
316+
if self.check_duplicate {
317+
snapshot_producer
318+
.validate_duplicate_files(&self.added_data_files)
319+
.await?;
320+
321+
snapshot_producer
322+
.validate_duplicate_files(&self.added_delete_files)
323+
.await?;
324+
}
325+
326+
if self.merge_enabled {
327+
let process =
328+
MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge);
329+
snapshot_producer.commit(FastAppendOperation, process).await
330+
} else {
331+
snapshot_producer
332+
.commit(FastAppendOperation, DefaultManifestProcess)
333+
.await
334+
}
335+
}
336+
}
337+
192338
#[cfg(test)]
193339
mod tests {
194340
use std::collections::HashMap;

crates/iceberg/src/transaction/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use std::collections::HashMap;
5656

5757
pub use action::*;
5858
mod append;
59-
pub mod remove_snapshots;
59+
mod remove_snapshots;
6060
mod snapshot;
6161
mod sort_order;
6262
mod update_location;
@@ -67,7 +67,9 @@ mod upgrade_format_version;
6767
use std::sync::Arc;
6868
use std::time::Duration;
6969

70+
pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES};
7071
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
72+
use remove_snapshots::RemoveSnapshotAction;
7173

7274
use crate::error::Result;
7375
use crate::spec::{
@@ -77,9 +79,7 @@ use crate::spec::{
7779
PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
7880
};
7981
use crate::table::Table;
80-
use crate::transaction::action::BoxedTransactionAction;
81-
use crate::transaction::append::FastAppendAction;
82-
use crate::transaction::remove_snapshots::RemoveSnapshotAction;
82+
use crate::transaction::append::{FastAppendAction, MergeAppendAction};
8383
use crate::transaction::sort_order::ReplaceSortOrderAction;
8484
use crate::transaction::update_location::UpdateLocationAction;
8585
use crate::transaction::update_properties::UpdatePropertiesAction;
@@ -150,6 +150,11 @@ impl Transaction {
150150
FastAppendAction::new()
151151
}
152152

153+
/// Creates a merge append action.
154+
pub fn merge_append(&self) -> MergeAppendAction {
155+
MergeAppendAction::new()
156+
}
157+
153158
/// Creates replace sort order action.
154159
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
155160
ReplaceSortOrderAction::new()
@@ -169,7 +174,7 @@ impl Transaction {
169174
pub fn update_statistics(&self) -> UpdateStatisticsAction {
170175
UpdateStatisticsAction::new()
171176
}
172-
177+
173178
/// Commit transaction.
174179
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
175180
if self.actions.is_empty() {

0 commit comments

Comments
 (0)