Skip to content

Commit 64e7e21

Browse files
committed
fix(iceberg): fix rewrite-files partition-spec-id (#54)
* fix(iceberg): fix rewrite-files partition-spec-id * fix(docker): update docker file * add test * update minio * Revert "update minio" This reverts commit 4464d90.
1 parent d5efb44 commit 64e7e21

File tree

5 files changed

+122
-3
lines changed

5 files changed

+122
-3
lines changed

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,12 @@ impl DataFile {
274274
pub fn referenced_data_file(&self) -> Option<String> {
275275
self.referenced_data_file.clone()
276276
}
277+
278+
/// Get the partition spec id of the data file.
279+
pub fn partition_spec_id(&self) -> i32 {
280+
self.partition_spec_id
281+
}
282+
277283
/// Get the offset in the file where the blob content starts.
278284
/// Only meaningful for puffin blobs, and required for deletion vectors.
279285
pub fn content_offset(&self) -> Option<i64> {

crates/iceberg/src/transaction/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl Transaction {
183183
}
184184

185185
/// Creates rewrite files action.
186-
pub fn rewrite_files(self) -> RewriteFilesAction {
186+
pub fn rewrite_files(&self) -> RewriteFilesAction {
187187
RewriteFilesAction::new()
188188
}
189189

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
266266
{
267267
let mut manifest_writer = snapshot_produce.new_manifest_writer(
268268
ManifestContentType::Data,
269-
table_metadata_ref.default_partition_spec_id(),
269+
manifest_file.partition_spec_id,
270270
)?;
271271

272272
for entry in manifest.entries() {

crates/integration_tests/testdata/spark/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ENV SPARK_VERSION=3.5.7
3131
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
3232
ENV ICEBERG_VERSION=1.10.0
3333

34-
RUN curl --retry 5 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
34+
RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
3535
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
3636
&& rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz
3737

crates/integration_tests/tests/shared_tests/rewrite_files_test.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::sync::Arc;
2121

2222
use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2323
use futures::TryStreamExt;
24+
use iceberg::spec::DataFile;
25+
use iceberg::table::Table;
2426
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2527
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
2628
use iceberg::writer::file_writer::ParquetWriterBuilder;
@@ -453,3 +455,114 @@ async fn test_sequence_number_in_manifest_entry() {
453455
}
454456
}
455457
}
458+
459+
#[tokio::test]
460+
async fn test_partition_spec_id_in_manifest() {
461+
let fixture = get_shared_containers();
462+
let rest_catalog = RestCatalogBuilder::default()
463+
.load("rest", fixture.catalog_config.clone())
464+
.await
465+
.unwrap();
466+
let ns = random_ns().await;
467+
let schema = test_schema();
468+
469+
let table_creation = TableCreation::builder()
470+
.name("t1".to_string())
471+
.schema(schema.clone())
472+
.build();
473+
474+
let mut table = rest_catalog
475+
.create_table(ns.name(), table_creation)
476+
.await
477+
.unwrap();
478+
479+
// Create the writer and write the data
480+
let schema: Arc<arrow_schema::Schema> = Arc::new(
481+
table
482+
.metadata()
483+
.current_schema()
484+
.as_ref()
485+
.try_into()
486+
.unwrap(),
487+
);
488+
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
489+
let file_name_generator = DefaultFileNameGenerator::new(
490+
"test".to_string(),
491+
None,
492+
iceberg::spec::DataFileFormat::Parquet,
493+
);
494+
495+
// commit result
496+
let mut data_files_vec = Vec::default();
497+
498+
async fn build_data_file_f(
499+
schema: Arc<arrow_schema::Schema>,
500+
table: &Table,
501+
location_generator: DefaultLocationGenerator,
502+
file_name_generator: DefaultFileNameGenerator,
503+
) -> DataFile {
504+
let parquet_writer_builder = ParquetWriterBuilder::new(
505+
WriterProperties::default(),
506+
table.metadata().current_schema().clone(),
507+
);
508+
let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
509+
parquet_writer_builder,
510+
table.file_io().clone(),
511+
location_generator.clone(),
512+
file_name_generator.clone(),
513+
);
514+
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
515+
516+
let mut data_file_writer = data_file_writer_builder.build(None).await.unwrap();
517+
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
518+
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
519+
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
520+
let batch = RecordBatch::try_new(schema.clone(), vec![
521+
Arc::new(col1) as ArrayRef,
522+
Arc::new(col2) as ArrayRef,
523+
Arc::new(col3) as ArrayRef,
524+
])
525+
.unwrap();
526+
data_file_writer.write(batch.clone()).await.unwrap();
527+
data_file_writer.close().await.unwrap()[0].clone()
528+
}
529+
530+
for _ in 0..10 {
531+
let data_file = build_data_file_f(
532+
schema.clone(),
533+
&table,
534+
location_generator.clone(),
535+
file_name_generator.clone(),
536+
)
537+
.await;
538+
data_files_vec.push(data_file.clone());
539+
let tx = Transaction::new(&table);
540+
let append_action = tx.fast_append().add_data_files(vec![data_file]);
541+
let tx = append_action.apply(tx).unwrap();
542+
table = tx.commit(&rest_catalog).await.unwrap();
543+
}
544+
545+
let last_data_files = data_files_vec.last().unwrap();
546+
let partition_spec_id = last_data_files.partition_spec_id();
547+
548+
// remove the data files by RewriteAction
549+
for data_file in &data_files_vec {
550+
let tx = Transaction::new(&table);
551+
let rewrite_action = tx.rewrite_files().delete_files(vec![data_file.clone()]);
552+
let tx = rewrite_action.apply(tx).unwrap();
553+
table = tx.commit(&rest_catalog).await.unwrap();
554+
}
555+
556+
// TODO: test update partition spec
557+
// Verify that the partition spec ID is correctly set
558+
559+
let last_snapshot = table.metadata().current_snapshot().unwrap();
560+
let manifest_list = last_snapshot
561+
.load_manifest_list(table.file_io(), table.metadata())
562+
.await
563+
.unwrap();
564+
assert_eq!(manifest_list.entries().len(), 1);
565+
for manifest_file in manifest_list.entries() {
566+
assert_eq!(manifest_file.partition_spec_id, partition_spec_id);
567+
}
568+
}

0 commit comments

Comments
 (0)