Skip to content

Commit 2473c3d

Browse files
committed
refine SnapshotProduceAction
1 parent 328fa5f commit 2473c3d

File tree

2 files changed

+36
-72
lines changed

2 files changed

+36
-72
lines changed

crates/e2e_test/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717

1818
[package]
19-
name = "iceberg-e2e_test"
19+
name = "iceberg-e2e-test"
2020
version = { workspace = true }
2121
edition = { workspace = true }
2222
homepage = { workspace = true }

crates/iceberg/src/transaction.rs

Lines changed: 35 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,16 @@ use std::collections::HashMap;
2222
use std::future::Future;
2323
use std::mem::discriminant;
2424
use std::ops::RangeFrom;
25-
use std::sync::Arc;
2625

2726
use uuid::Uuid;
2827

2928
use crate::error::Result;
3029
use crate::io::OutputFile;
3130
use crate::spec::{
32-
BoundPartitionSpec, DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry,
33-
ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation,
34-
Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
35-
Struct, StructType, Summary, Transform, MAIN_BRANCH,
31+
DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile,
32+
ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot,
33+
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType,
34+
Summary, Transform, MAIN_BRANCH,
3635
};
3736
use crate::table::Table;
3837
use crate::TableUpdate::UpgradeFormatVersion;
@@ -137,28 +136,12 @@ impl<'a> Transaction<'a> {
137136
commit_uuid: Option<Uuid>,
138137
key_metadata: Vec<u8>,
139138
) -> Result<FastAppendAction<'a>> {
140-
let parent_snapshot_id = self
141-
.table
142-
.metadata()
143-
.current_snapshot()
144-
.map(|s| s.snapshot_id());
145139
let snapshot_id = self.generate_unique_snapshot_id();
146-
let schema = self.table.metadata().current_schema().as_ref().clone();
147-
let schema_id = schema.schema_id();
148-
let format_version = self.table.metadata().format_version();
149-
let partition_spec = self.table.metadata().default_partition_spec().clone();
150-
let commit_uuid = commit_uuid.unwrap_or_else(Uuid::new_v4);
151-
152140
FastAppendAction::new(
153141
self,
154-
parent_snapshot_id,
155142
snapshot_id,
156-
schema,
157-
schema_id,
158-
format_version,
159-
partition_spec,
143+
commit_uuid.unwrap_or_else(Uuid::new_v4),
160144
key_metadata,
161-
commit_uuid,
162145
HashMap::new(),
163146
)
164147
}
@@ -198,25 +181,15 @@ impl<'a> FastAppendAction<'a> {
198181
#[allow(clippy::too_many_arguments)]
199182
pub(crate) fn new(
200183
tx: Transaction<'a>,
201-
parent_snapshot_id: Option<i64>,
202184
snapshot_id: i64,
203-
schema: Schema,
204-
schema_id: i32,
205-
format_version: FormatVersion,
206-
partition_spec: Arc<BoundPartitionSpec>,
207-
key_metadata: Vec<u8>,
208185
commit_uuid: Uuid,
186+
key_metadata: Vec<u8>,
209187
snapshot_properties: HashMap<String, String>,
210188
) -> Result<Self> {
211189
Ok(Self {
212190
snapshot_produce_action: SnapshotProduceAction::new(
213191
tx,
214192
snapshot_id,
215-
parent_snapshot_id,
216-
schema_id,
217-
format_version,
218-
partition_spec,
219-
schema,
220193
key_metadata,
221194
commit_uuid,
222195
snapshot_properties,
@@ -259,10 +232,7 @@ impl SnapshotProduceOperation for FastAppendOperation {
259232
&self,
260233
snapshot_produce: &SnapshotProduceAction<'_>,
261234
) -> Result<Vec<ManifestFile>> {
262-
let Some(snapshot) = snapshot_produce
263-
.parent_snapshot_id
264-
.and_then(|id| snapshot_produce.tx.table.metadata().snapshot_by_id(id))
265-
else {
235+
let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
266236
return Ok(vec![]);
267237
};
268238

@@ -309,52 +279,32 @@ trait ManifestProcess: Send + Sync {
309279

310280
struct SnapshotProduceAction<'a> {
311281
tx: Transaction<'a>,
312-
313-
parent_snapshot_id: Option<i64>,
314282
snapshot_id: i64,
315-
schema_id: i32,
316-
format_version: FormatVersion,
317-
partition_spec: Arc<BoundPartitionSpec>,
318-
schema: Schema,
319283
key_metadata: Vec<u8>,
320-
321284
commit_uuid: Uuid,
322-
323285
snapshot_properties: HashMap<String, String>,
324286
added_data_files: Vec<DataFile>,
325-
326287
// A counter used to generate unique manifest file names.
327288
// It starts from 0 and increments for each new manifest file.
328289
// Note: This counter is limited to the range of (0..u64::MAX).
329290
manifest_counter: RangeFrom<u64>,
330291
}
331292

332293
impl<'a> SnapshotProduceAction<'a> {
333-
#[allow(clippy::too_many_arguments)]
334294
pub(crate) fn new(
335295
tx: Transaction<'a>,
336296
snapshot_id: i64,
337-
parent_snapshot_id: Option<i64>,
338-
schema_id: i32,
339-
format_version: FormatVersion,
340-
partition_spec: Arc<BoundPartitionSpec>,
341-
schema: Schema,
342297
key_metadata: Vec<u8>,
343298
commit_uuid: Uuid,
344299
snapshot_properties: HashMap<String, String>,
345300
) -> Result<Self> {
346301
Ok(Self {
347302
tx,
348-
parent_snapshot_id,
349303
snapshot_id,
350-
schema_id,
351-
format_version,
352304
commit_uuid,
353305
snapshot_properties,
354306
added_data_files: vec![],
355307
manifest_counter: (0..),
356-
partition_spec,
357-
schema,
358308
key_metadata,
359309
})
360310
}
@@ -374,12 +324,12 @@ impl<'a> SnapshotProduceAction<'a> {
374324
.fields()
375325
.iter()
376326
.zip(partition_type.fields())
377-
.any(|(field_from_value, field_from_type)| {
378-
!field_from_type
327+
.any(|(value, field)| {
328+
!field
379329
.field_type
380330
.as_primitive_type()
381331
.unwrap()
382-
.compatible(&field_from_value.as_primitive_literal().unwrap())
332+
.compatible(&value.as_primitive_literal().unwrap())
383333
})
384334
{
385335
return Err(Error::new(
@@ -405,7 +355,11 @@ impl<'a> SnapshotProduceAction<'a> {
405355
}
406356
Self::validate_partition_value(
407357
data_file.partition(),
408-
self.partition_spec.partition_type(),
358+
self.tx
359+
.table
360+
.metadata()
361+
.default_partition_spec()
362+
.partition_type(),
409363
)?;
410364
}
411365
self.added_data_files.extend(data_files);
@@ -433,7 +387,7 @@ impl<'a> SnapshotProduceAction<'a> {
433387
let builder = ManifestEntry::builder()
434388
.status(crate::spec::ManifestStatus::Added)
435389
.data_file(data_file);
436-
if self.format_version as u8 == 1u8 {
390+
if self.tx.table.metadata().format_version() as u8 == 1u8 {
437391
builder.snapshot_id(self.snapshot_id).build()
438392
} else {
439393
// For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when
@@ -442,11 +396,19 @@ impl<'a> SnapshotProduceAction<'a> {
442396
}
443397
})
444398
.collect();
399+
let schema = self.tx.table.metadata().current_schema();
445400
let manifest_meta = ManifestMetadata::builder()
446-
.schema(self.schema.clone().into())
447-
.schema_id(self.schema_id)
448-
.format_version(self.format_version)
449-
.partition_spec(self.partition_spec.as_ref().clone())
401+
.schema(schema.clone())
402+
.schema_id(schema.schema_id())
403+
.format_version(self.tx.table.metadata().format_version())
404+
.partition_spec(
405+
self.tx
406+
.table
407+
.metadata()
408+
.default_partition_spec()
409+
.as_ref()
410+
.clone(),
411+
)
450412
.content(crate::spec::ManifestContentType::Data)
451413
.build();
452414
let manifest = Manifest::new(manifest_meta, manifest_entries);
@@ -465,6 +427,8 @@ impl<'a> SnapshotProduceAction<'a> {
465427
) -> Result<Vec<ManifestFile>> {
466428
let added_manifest = self.write_added_manifest().await?;
467429
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
430+
// # TODO
431+
// Support process delete entries.
468432

469433
let mut manifest_files = vec![added_manifest];
470434
manifest_files.extend(existing_manifests);
@@ -515,15 +479,15 @@ impl<'a> SnapshotProduceAction<'a> {
515479
.file_io()
516480
.new_output(manifest_list_path.clone())?,
517481
self.snapshot_id,
518-
self.parent_snapshot_id,
482+
self.tx.table.metadata().current_snapshot_id(),
519483
),
520484
FormatVersion::V2 => ManifestListWriter::v2(
521485
self.tx
522486
.table
523487
.file_io()
524488
.new_output(manifest_list_path.clone())?,
525489
self.snapshot_id,
526-
self.parent_snapshot_id,
490+
self.tx.table.metadata().current_snapshot_id(),
527491
next_seq_num,
528492
),
529493
};
@@ -534,10 +498,10 @@ impl<'a> SnapshotProduceAction<'a> {
534498
let new_snapshot = Snapshot::builder()
535499
.with_manifest_list(manifest_list_path)
536500
.with_snapshot_id(self.snapshot_id)
537-
.with_parent_snapshot_id(self.parent_snapshot_id)
501+
.with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
538502
.with_sequence_number(next_seq_num)
539503
.with_summary(summary)
540-
.with_schema_id(self.schema_id)
504+
.with_schema_id(self.tx.table.metadata().current_schema_id())
541505
.with_timestamp_ms(commit_ts)
542506
.build();
543507

@@ -559,7 +523,7 @@ impl<'a> SnapshotProduceAction<'a> {
559523
},
560524
TableRequirement::RefSnapshotIdMatch {
561525
r#ref: MAIN_BRANCH.to_string(),
562-
snapshot_id: self.parent_snapshot_id,
526+
snapshot_id: self.tx.table.metadata().current_snapshot_id(),
563527
},
564528
])?;
565529
Ok(self.tx)

0 commit comments

Comments
 (0)