Skip to content

Commit 0527497

Browse files
Li0kZENOTMECopilot
authored
feat: add process delete enrty in snapshot produce (#33) (#79)
* support spec id in data file * support proccess delete entry * fullfill partition spec id * fix * fix spelling mistake --------- Signed-off-by: xxchan <[email protected]> Co-authored-by: ZENOTME <[email protected]> Co-authored-by: ZENOTME <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 38080d3 commit 0527497

File tree

10 files changed

+209
-88
lines changed

10 files changed

+209
-88
lines changed

crates/iceberg/src/arrow/record_batch_partition_spliter.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2727

2828
use super::record_batch_projector::RecordBatchProjector;
2929
use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type};
30-
use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
30+
use crate::spec::{Literal, PartitionSpec, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
3131
use crate::transform::{create_transform_function, BoxedTransformFunction};
3232
use crate::{Error, ErrorKind, Result};
3333

@@ -186,6 +186,10 @@ impl RecordBatchPartitionSpliter {
186186
})
187187
}
188188

189+
pub(crate) fn partition_spec(&self) -> &PartitionSpec {
190+
self.partition_spec.as_ref()
191+
}
192+
189193
/// Split the record batch into multiple record batches by the partition spec.
190194
pub(crate) fn split(&self, batch: &RecordBatch) -> Result<Vec<(OwnedRow, RecordBatch)>> {
191195
// get array using partition spec

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,10 @@ impl DataFile {
284284
pub(crate) fn rewrite_partition(&mut self, partition: Struct) {
285285
self.partition = partition;
286286
}
287+
288+
pub(crate) fn rewrite_partition_id(&mut self, partition_spec_id: i32) {
289+
self.partition_spec_id = partition_spec_id;
290+
}
287291
}
288292

289293
/// Convert data files to avro bytes and write to writer.

crates/iceberg/src/transaction/append.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ mod tests {
435435

436436
// check add data file with incompatible partition value
437437
let data_file = DataFileBuilder::default()
438+
.partition_spec_id(0)
438439
.content(DataContentType::Data)
439440
.file_path("test/3.parquet".to_string())
440441
.file_format(DataFileFormat::Parquet)
@@ -457,6 +458,7 @@ mod tests {
457458
let action = tx.fast_append();
458459

459460
let data_file = DataFileBuilder::default()
461+
.partition_spec_id(0)
460462
.content(DataContentType::Data)
461463
.file_path("test/3.parquet".to_string())
462464
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ const META_ROOT_PATH: &str = "metadata";
4141

4242
pub(crate) trait SnapshotProduceOperation: Send + Sync {
4343
fn operation(&self) -> Operation;
44-
#[allow(unused)]
4544
fn delete_entries(
4645
&self,
4746
snapshot_produce: &SnapshotProducer,
@@ -168,7 +167,11 @@ impl<'a> SnapshotProducer<'a> {
168167
Ok(())
169168
}
170169

171-
fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
170+
fn new_manifest_writer(
171+
&mut self,
172+
content: ManifestContentType,
173+
partition_spec_id: i32,
174+
) -> Result<ManifestWriter> {
172175
let new_manifest_path = format!(
173176
"{}/{}/{}-m{}.{}",
174177
self.table.metadata().location(),
@@ -185,7 +188,14 @@ impl<'a> SnapshotProducer<'a> {
185188
self.table.metadata().current_schema().clone(),
186189
self.table
187190
.metadata()
188-
.default_partition_spec()
191+
.partition_spec_by_id(partition_spec_id)
192+
.ok_or_else(|| {
193+
Error::new(
194+
ErrorKind::DataInvalid,
195+
"Invalid partition spec id for new manifest writer",
196+
)
197+
.with_context("partition spec id", partition_spec_id.to_string())
198+
})?
189199
.as_ref()
190200
.clone(),
191201
);
@@ -278,13 +288,72 @@ impl<'a> SnapshotProducer<'a> {
278288
builder.build()
279289
}
280290
});
281-
let mut writer = self.new_manifest_writer(manifest_content_type)?;
291+
292+
let mut writer = self.new_manifest_writer(
293+
manifest_content_type,
294+
self.table.metadata().default_partition_spec_id(),
295+
)?;
282296
for entry in manifest_entries {
283297
writer.add_entry(entry)?;
284298
}
285299
writer.write_manifest_file().await
286300
}
287301

302+
async fn write_delete_manifest(
303+
&mut self,
304+
deleted_entries: Vec<ManifestEntry>,
305+
) -> Result<Vec<ManifestFile>> {
306+
if deleted_entries.is_empty() {
307+
return Ok(vec![]);
308+
}
309+
310+
// Group deleted entries by spec_id
311+
let mut partition_groups = HashMap::new();
312+
for entry in deleted_entries {
313+
partition_groups
314+
.entry(entry.data_file().partition_spec_id)
315+
.or_insert_with(Vec::new)
316+
.push(entry);
317+
}
318+
319+
// Write a delete manifest per spec_id group
320+
let mut deleted_manifests = Vec::new();
321+
for (spec_id, entries) in partition_groups {
322+
let mut data_file_writer: Option<ManifestWriter> = None;
323+
let mut delete_file_writer: Option<ManifestWriter> = None;
324+
for entry in entries {
325+
match entry.content_type() {
326+
DataContentType::Data => {
327+
if data_file_writer.is_none() {
328+
data_file_writer =
329+
Some(self.new_manifest_writer(ManifestContentType::Data, spec_id)?);
330+
}
331+
data_file_writer.as_mut().unwrap().add_delete_entry(entry)?;
332+
}
333+
DataContentType::EqualityDeletes | DataContentType::PositionDeletes => {
334+
if delete_file_writer.is_none() {
335+
delete_file_writer = Some(
336+
self.new_manifest_writer(ManifestContentType::Deletes, spec_id)?,
337+
);
338+
}
339+
delete_file_writer
340+
.as_mut()
341+
.unwrap()
342+
.add_delete_entry(entry)?;
343+
}
344+
}
345+
}
346+
if let Some(writer) = data_file_writer {
347+
deleted_manifests.push(writer.write_manifest_file().await?);
348+
}
349+
if let Some(writer) = delete_file_writer {
350+
deleted_manifests.push(writer.write_manifest_file().await?);
351+
}
352+
}
353+
354+
Ok(deleted_manifests)
355+
}
356+
288357
async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
289358
&mut self,
290359
snapshot_produce_operation: &OP,
@@ -318,6 +387,11 @@ impl<'a> SnapshotProducer<'a> {
318387
manifest_files.push(added_manifest);
319388
}
320389

390+
let delete_manifests = self
391+
.write_delete_manifest(snapshot_produce_operation.delete_entries(self).await?)
392+
.await?;
393+
manifest_files.extend(delete_manifests);
394+
321395
manifest_process
322396
.process_manifests(self, manifest_files)
323397
.await
@@ -618,7 +692,7 @@ impl MergeManifestManager {
618692
Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>,
619693
>)
620694
} else {
621-
let writer = snapshot_produce.new_manifest_writer(self.content)?;
695+
let writer = snapshot_produce.new_manifest_writer(self.content, snapshot_produce.table.metadata().default_partition_spec_id())?;
622696
let snapshot_id = snapshot_produce.snapshot_id;
623697
let file_io = snapshot_produce.table.file_io().clone();
624698
Ok((Box::pin(async move {

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 52 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ mod test {
182182
use arrow_schema::{DataType, Field, Fields};
183183
use arrow_select::concat::concat_batches;
184184
use itertools::Itertools;
185-
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
186185
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
186+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
187187
use parquet::file::properties::WriterProperties;
188188
use tempfile::TempDir;
189189
use uuid::Uuid;
@@ -197,9 +197,9 @@ mod test {
197197
use crate::writer::base_writer::equality_delete_writer::{
198198
EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
199199
};
200-
use crate::writer::file_writer::ParquetWriterBuilder;
201-
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
202200
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
201+
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
202+
use crate::writer::file_writer::ParquetWriterBuilder;
203203
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
204204

205205
async fn check_parquet_data_file_with_equality_delete_write(
@@ -296,10 +296,12 @@ mod test {
296296
NestedField::required(
297297
1,
298298
"col1",
299-
Type::Struct(StructType::new(vec![
300-
NestedField::required(5, "sub_col", Type::Primitive(PrimitiveType::Int))
301-
.into(),
302-
])),
299+
Type::Struct(StructType::new(vec![NestedField::required(
300+
5,
301+
"sub_col",
302+
Type::Primitive(PrimitiveType::Int),
303+
)
304+
.into()])),
303305
)
304306
.into(),
305307
NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
@@ -315,21 +317,17 @@ mod test {
315317
NestedField::required(
316318
4,
317319
"col4",
318-
Type::Struct(StructType::new(vec![
319-
NestedField::required(
320-
7,
321-
"sub_col",
322-
Type::Struct(StructType::new(vec![
323-
NestedField::required(
324-
8,
325-
"sub_sub_col",
326-
Type::Primitive(PrimitiveType::Int),
327-
)
328-
.into(),
329-
])),
320+
Type::Struct(StructType::new(vec![NestedField::required(
321+
7,
322+
"sub_col",
323+
Type::Struct(StructType::new(vec![NestedField::required(
324+
8,
325+
"sub_sub_col",
326+
Type::Primitive(PrimitiveType::Int),
330327
)
331-
.into(),
332-
])),
328+
.into()])),
329+
)
330+
.into()])),
333331
)
334332
.into(),
335333
])
@@ -441,27 +439,23 @@ mod test {
441439
NestedField::required(
442440
3,
443441
"col3",
444-
Type::Struct(StructType::new(vec![
445-
NestedField::required(
446-
4,
447-
"sub_col",
448-
Type::Primitive(PrimitiveType::Int),
449-
)
450-
.into(),
451-
])),
442+
Type::Struct(StructType::new(vec![NestedField::required(
443+
4,
444+
"sub_col",
445+
Type::Primitive(PrimitiveType::Int),
446+
)
447+
.into()])),
452448
)
453449
.into(),
454450
NestedField::optional(
455451
5,
456452
"col4",
457-
Type::Struct(StructType::new(vec![
458-
NestedField::required(
459-
6,
460-
"sub_col2",
461-
Type::Primitive(PrimitiveType::Int),
462-
)
463-
.into(),
464-
])),
453+
Type::Struct(StructType::new(vec![NestedField::required(
454+
6,
455+
"sub_col2",
456+
Type::Primitive(PrimitiveType::Int),
457+
)
458+
.into()])),
465459
)
466460
.into(),
467461
NestedField::required(
@@ -680,30 +674,28 @@ mod test {
680674
NestedField::optional(
681675
1,
682676
"col1",
683-
Type::Struct(StructType::new(vec![
684-
NestedField::optional(2, "sub_col", Type::Primitive(PrimitiveType::Int))
685-
.into(),
686-
])),
677+
Type::Struct(StructType::new(vec![NestedField::optional(
678+
2,
679+
"sub_col",
680+
Type::Primitive(PrimitiveType::Int),
681+
)
682+
.into()])),
687683
)
688684
.into(),
689685
NestedField::optional(
690686
3,
691687
"col2",
692-
Type::Struct(StructType::new(vec![
693-
NestedField::optional(
694-
4,
695-
"sub_struct_col",
696-
Type::Struct(StructType::new(vec![
697-
NestedField::optional(
698-
5,
699-
"sub_sub_col",
700-
Type::Primitive(PrimitiveType::Int),
701-
)
702-
.into(),
703-
])),
688+
Type::Struct(StructType::new(vec![NestedField::optional(
689+
4,
690+
"sub_struct_col",
691+
Type::Struct(StructType::new(vec![NestedField::optional(
692+
5,
693+
"sub_sub_col",
694+
Type::Primitive(PrimitiveType::Int),
704695
)
705-
.into(),
706-
])),
696+
.into()])),
697+
)
698+
.into()])),
707699
)
708700
.into(),
709701
])
@@ -730,14 +722,11 @@ mod test {
730722
let inner_col = {
731723
let nulls = NullBuffer::from(vec![true, false, true]);
732724
Arc::new(StructArray::new(
733-
Fields::from(vec![
734-
Field::new("sub_sub_col", DataType::Int32, true).with_metadata(
735-
HashMap::from([(
736-
PARQUET_FIELD_ID_META_KEY.to_string(),
737-
"5".to_string(),
738-
)]),
739-
),
740-
]),
725+
Fields::from(vec![Field::new("sub_sub_col", DataType::Int32, true)
726+
.with_metadata(HashMap::from([(
727+
PARQUET_FIELD_ID_META_KEY.to_string(),
728+
"5".to_string(),
729+
)]))]),
741730
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))],
742731
Some(nulls),
743732
))

0 commit comments

Comments
 (0)