Skip to content

Commit 1535d08

Browse files
Li0kZENOTME
andauthored
feat: support append delete file (#19) (#68)
Signed-off-by: xxchan <[email protected]> Co-authored-by: ZENOTME <[email protected]> Co-authored-by: ZENOTME <[email protected]>
1 parent fc3a964 commit 1535d08

File tree

2 files changed

+63
-21
lines changed

2 files changed

+63
-21
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct FastAppendAction {
3737
key_metadata: Option<Vec<u8>>,
3838
snapshot_properties: HashMap<String, String>,
3939
added_data_files: Vec<DataFile>,
40+
added_delete_files: Vec<DataFile>,
4041
}
4142

4243
impl FastAppendAction {
@@ -47,6 +48,7 @@ impl FastAppendAction {
4748
key_metadata: None,
4849
snapshot_properties: HashMap::default(),
4950
added_data_files: vec![],
51+
added_delete_files: vec![],
5052
}
5153
}
5254

@@ -58,7 +60,16 @@ impl FastAppendAction {
5860

5961
/// Add data files to the snapshot.
6062
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
61-
self.added_data_files.extend(data_files);
63+
for file in data_files {
64+
match file.content_type() {
65+
crate::spec::DataContentType::Data => self.added_data_files.push(file),
66+
crate::spec::DataContentType::PositionDeletes
67+
| crate::spec::DataContentType::EqualityDeletes => {
68+
self.added_delete_files.push(file)
69+
}
70+
}
71+
}
72+
6273
self
6374
}
6475

@@ -90,16 +101,22 @@ impl TransactionAction for FastAppendAction {
90101
self.key_metadata.clone(),
91102
self.snapshot_properties.clone(),
92103
self.added_data_files.clone(),
104+
self.added_delete_files.clone(),
93105
);
94106

95107
// validate added files
96108
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
109+
snapshot_producer.validate_added_data_files(&self.added_delete_files)?;
97110

98111
// Checks duplicate files
99112
if self.check_duplicate {
100113
snapshot_producer
101114
.validate_duplicate_files(&self.added_data_files)
102115
.await?;
116+
117+
snapshot_producer
118+
.validate_duplicate_files(&self.added_delete_files)
119+
.await?;
103120
}
104121

105122
snapshot_producer
@@ -152,7 +169,7 @@ mod tests {
152169
use std::sync::Arc;
153170

154171
use crate::spec::{
155-
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
172+
DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, MAIN_BRANCH,
156173
};
157174
use crate::transaction::tests::make_v2_minimal_table;
158175
use crate::transaction::{Transaction, TransactionAction};

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ use uuid::Uuid;
2323

2424
use crate::error::Result;
2525
use crate::spec::{
26-
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
27-
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation,
26+
update_snapshot_summaries, DataContentType, DataFile, DataFileFormat, FormatVersion,
27+
ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter,
28+
ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention,
29+
SnapshotSummaryCollector, Struct, StructType, Summary, MAIN_BRANCH,
2830
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
29-
Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType,
30-
Summary, update_snapshot_summaries,
3131
};
3232
use crate::table::Table;
3333
use crate::transaction::ActionCommit;
@@ -74,7 +74,8 @@ pub(crate) struct SnapshotProducer<'a> {
7474
commit_uuid: Uuid,
7575
key_metadata: Option<Vec<u8>>,
7676
snapshot_properties: HashMap<String, String>,
77-
added_data_files: Vec<DataFile>,
77+
pub added_data_files: Vec<DataFile>,
78+
added_delete_files: Vec<DataFile>,
7879
// A counter used to generate unique manifest file names.
7980
// It starts from 0 and increments for each new manifest file.
8081
// Note: This counter is limited to the range of (0..u64::MAX).
@@ -88,6 +89,7 @@ impl<'a> SnapshotProducer<'a> {
8889
key_metadata: Option<Vec<u8>>,
8990
snapshot_properties: HashMap<String, String>,
9091
added_data_files: Vec<DataFile>,
92+
added_delete_files: Vec<DataFile>,
9193
) -> Self {
9294
Self {
9395
table,
@@ -96,18 +98,13 @@ impl<'a> SnapshotProducer<'a> {
9698
key_metadata,
9799
snapshot_properties,
98100
added_data_files,
101+
added_delete_files,
99102
manifest_counter: (0..),
100103
}
101104
}
102105

103106
pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> {
104107
for data_file in added_data_files {
105-
if data_file.content_type() != crate::spec::DataContentType::Data {
106-
return Err(Error::new(
107-
ErrorKind::DataInvalid,
108-
"Only data content type is allowed for fast append",
109-
));
110-
}
111108
// Check if the data file partition spec id matches the table default partition spec id.
112109
if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
113110
return Err(Error::new(
@@ -249,18 +246,42 @@ impl<'a> SnapshotProducer<'a> {
249246
}
250247

251248
// Write manifest file for added data files and return the ManifestFile for ManifestList.
252-
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
253-
let added_data_files = std::mem::take(&mut self.added_data_files);
254-
if added_data_files.is_empty() {
249+
async fn write_added_manifest(&mut self, added_files: Vec<DataFile>) -> Result<ManifestFile> {
250+
if added_files.is_empty() {
255251
return Err(Error::new(
256252
ErrorKind::PreconditionFailed,
257253
"No added data files found when write an added manifest file",
258254
));
259255
}
260256

257+
let file_count = added_files.len();
258+
259+
let manifest_content_type = {
260+
let mut data_num = 0;
261+
let mut delete_num = 0;
262+
for f in &added_files {
263+
match f.content_type() {
264+
DataContentType::Data => data_num += 1,
265+
DataContentType::PositionDeletes | DataContentType::EqualityDeletes => {
266+
delete_num += 1
267+
}
268+
}
269+
}
270+
if data_num == file_count {
271+
ManifestContentType::Data
272+
} else if delete_num == file_count {
273+
ManifestContentType::Deletes
274+
} else {
275+
return Err(Error::new(
276+
ErrorKind::DataInvalid,
277+
"added DataFile for a ManifestFile should be same type (Data or Delete)",
278+
));
279+
}
280+
};
281+
261282
let snapshot_id = self.snapshot_id;
262283
let format_version = self.table.metadata().format_version();
263-
let manifest_entries = added_data_files.into_iter().map(|data_file| {
284+
let manifest_entries = added_files.into_iter().map(|data_file| {
264285
let builder = ManifestEntry::builder()
265286
.status(crate::spec::ManifestStatus::Added)
266287
.data_file(data_file);
@@ -272,7 +293,7 @@ impl<'a> SnapshotProducer<'a> {
272293
builder.build()
273294
}
274295
});
275-
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
296+
let mut writer = self.new_manifest_writer(manifest_content_type)?;
276297
for entry in manifest_entries {
277298
writer.add_entry(entry)?;
278299
}
@@ -301,12 +322,16 @@ impl<'a> SnapshotProducer<'a> {
301322

302323
// Process added entries.
303324
if !self.added_data_files.is_empty() {
304-
let added_manifest = self.write_added_manifest().await?;
325+
let added_data_files = std::mem::take(&mut self.added_data_files);
326+
let added_manifest = self.write_added_manifest(added_data_files).await?;
305327
manifest_files.push(added_manifest);
306328
}
307329

308-
// # TODO
309-
// Support process delete entries.
330+
if !self.added_delete_files.is_empty() {
331+
let added_delete_files = std::mem::take(&mut self.added_delete_files);
332+
let added_manifest = self.write_added_manifest(added_delete_files).await?;
333+
manifest_files.push(added_manifest);
334+
}
310335

311336
let manifest_files = manifest_process.process_manifests(self, manifest_files);
312337
Ok(manifest_files)

0 commit comments

Comments
 (0)