Skip to content

Commit 5036ca8

Browse files
rokadamreeve
andauthored
Support multi-threaded writing of Parquet files with modular encryption (#8029)
- Closes #7359. # Rationale for this change This is to enable concurrent column writing with encryption downstream (e.g. with datafusion). See #7359 for more. See https://github.com/apache/arrow-rs/pull/7111/files#r2015196618 # What changes are included in this PR? * `ArrowWriter` now has a `pub get_column_writers` method that can be used to write columns concurrently. * Minor change to how encryption tests read test data. # Are these changes tested? Yes. # Are there any user-facing changes? `pub ArrowWriter.get_column_writers` and `pub ArrowWriter.append_row_group` are added. Both to enable concurrent use of column writers. `WriterPropertiesBuilder` now implements `Default`. --------- Co-authored-by: Adam Reeve <[email protected]>
1 parent 554cafa commit 5036ca8

File tree

6 files changed

+328
-102
lines changed

6 files changed

+328
-102
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 58 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,12 @@ impl<W: Write + Send> ArrowWriter<W> {
236236

237237
let max_row_group_size = props.max_row_group_size();
238238

239+
let props_ptr = Arc::new(props);
239240
let file_writer =
240-
SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
241+
SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
241242

242-
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
243+
let row_group_writer_factory =
244+
ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
243245

244246
Ok(Self {
245247
writer: file_writer,
@@ -310,12 +312,10 @@ impl<W: Write + Send> ArrowWriter<W> {
310312

311313
let in_progress = match &mut self.in_progress {
312314
Some(in_progress) => in_progress,
313-
x => x.insert(self.row_group_writer_factory.create_row_group_writer(
314-
self.writer.schema_descr(),
315-
self.writer.properties(),
316-
&self.arrow_schema,
317-
self.writer.flushed_row_groups().len(),
318-
)?),
315+
x => x.insert(
316+
self.row_group_writer_factory
317+
.create_row_group_writer(self.writer.flushed_row_groups().len())?,
318+
),
319319
};
320320

321321
// If would exceed max_row_group_size, split batch
@@ -402,6 +402,25 @@ impl<W: Write + Send> ArrowWriter<W> {
402402
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
403403
self.finish()
404404
}
405+
406+
/// Create a new row group writer and return its column writers.
407+
pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
408+
self.flush()?;
409+
let in_progress = self
410+
.row_group_writer_factory
411+
.create_row_group_writer(self.writer.flushed_row_groups().len())?;
412+
Ok(in_progress.writers)
413+
}
414+
415+
/// Append the given column chunks to the file as a new row group.
416+
pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
417+
let mut row_group_writer = self.writer.next_row_group()?;
418+
for chunk in chunks {
419+
chunk.append_to_row_group(&mut row_group_writer)?;
420+
}
421+
row_group_writer.close()?;
422+
Ok(())
423+
}
405424
}
406425

407426
impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
@@ -828,51 +847,59 @@ impl ArrowRowGroupWriter {
828847
}
829848

830849
struct ArrowRowGroupWriterFactory {
850+
schema: SchemaDescriptor,
851+
arrow_schema: SchemaRef,
852+
props: WriterPropertiesPtr,
831853
#[cfg(feature = "encryption")]
832854
file_encryptor: Option<Arc<FileEncryptor>>,
833855
}
834856

835857
impl ArrowRowGroupWriterFactory {
836858
#[cfg(feature = "encryption")]
837-
fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
859+
fn new<W: Write + Send>(
860+
file_writer: &SerializedFileWriter<W>,
861+
schema: SchemaDescriptor,
862+
arrow_schema: SchemaRef,
863+
props: WriterPropertiesPtr,
864+
) -> Self {
838865
Self {
866+
schema,
867+
arrow_schema,
868+
props,
839869
file_encryptor: file_writer.file_encryptor(),
840870
}
841871
}
842872

843873
#[cfg(not(feature = "encryption"))]
844-
fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
845-
Self {}
874+
fn new<W: Write + Send>(
875+
_file_writer: &SerializedFileWriter<W>,
876+
schema: SchemaDescriptor,
877+
arrow_schema: SchemaRef,
878+
props: WriterPropertiesPtr,
879+
) -> Self {
880+
Self {
881+
schema,
882+
arrow_schema,
883+
props,
884+
}
846885
}
847886

848887
#[cfg(feature = "encryption")]
849-
fn create_row_group_writer(
850-
&self,
851-
parquet: &SchemaDescriptor,
852-
props: &WriterPropertiesPtr,
853-
arrow: &SchemaRef,
854-
row_group_index: usize,
855-
) -> Result<ArrowRowGroupWriter> {
888+
fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
856889
let writers = get_column_writers_with_encryptor(
857-
parquet,
858-
props,
859-
arrow,
890+
&self.schema,
891+
&self.props,
892+
&self.arrow_schema,
860893
self.file_encryptor.clone(),
861894
row_group_index,
862895
)?;
863-
Ok(ArrowRowGroupWriter::new(writers, arrow))
896+
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
864897
}
865898

866899
#[cfg(not(feature = "encryption"))]
867-
fn create_row_group_writer(
868-
&self,
869-
parquet: &SchemaDescriptor,
870-
props: &WriterPropertiesPtr,
871-
arrow: &SchemaRef,
872-
_row_group_index: usize,
873-
) -> Result<ArrowRowGroupWriter> {
874-
let writers = get_column_writers(parquet, props, arrow)?;
875-
Ok(ArrowRowGroupWriter::new(writers, arrow))
900+
fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
901+
let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
902+
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
876903
}
877904
}
878905

parquet/src/file/properties.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl WriterProperties {
190190
/// Returns a new default [`WriterPropertiesBuilder`] for creating writer
191191
/// properties.
192192
pub fn builder() -> WriterPropertiesBuilder {
193-
WriterPropertiesBuilder::with_defaults()
193+
WriterPropertiesBuilder::default()
194194
}
195195

196196
/// Returns data page size limit.
@@ -455,9 +455,9 @@ pub struct WriterPropertiesBuilder {
455455
file_encryption_properties: Option<FileEncryptionProperties>,
456456
}
457457

458-
impl WriterPropertiesBuilder {
458+
impl Default for WriterPropertiesBuilder {
459459
/// Returns default state of the builder.
460-
fn with_defaults() -> Self {
460+
fn default() -> Self {
461461
Self {
462462
data_page_size_limit: DEFAULT_PAGE_SIZE,
463463
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
@@ -478,7 +478,9 @@ impl WriterPropertiesBuilder {
478478
file_encryption_properties: None,
479479
}
480480
}
481+
}
481482

483+
impl WriterPropertiesBuilder {
482484
/// Finalizes the configuration and returns immutable writer properties struct.
483485
pub fn build(self) -> WriterProperties {
484486
WriterProperties {

parquet/src/file/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ fn write_bloom_filters<W: Write + Send>(
486486
/// more columns are available to write.
487487
/// - Once done writing a column, close column writer with `close`
488488
/// - Once all columns have been written, close row group writer with `close`
489-
/// method. THe close method will return row group metadata and is no-op
489+
/// method. The close method will return row group metadata and is no-op
490490
/// on already closed row group.
491491
pub struct SerializedRowGroupWriter<'a, W: Write> {
492492
descr: SchemaDescPtr,

parquet/tests/encryption/encryption.rs

Lines changed: 8 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
//! This module contains tests for reading encrypted Parquet files with the Arrow API
1919
2020
use crate::encryption_util::{
21-
verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
21+
read_and_roundtrip_to_encrypted_file, verify_column_indexes, verify_encryption_test_file_read,
22+
TestKeyRetriever,
2223
};
2324
use arrow::array::*;
2425
use arrow::error::Result as ArrowResult;
@@ -377,21 +378,6 @@ fn test_uniform_encryption_with_key_retriever() {
377378
verify_encryption_test_file_read(file, decryption_properties);
378379
}
379380

380-
fn verify_encryption_test_file_read(file: File, decryption_properties: FileDecryptionProperties) {
381-
let options =
382-
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
383-
let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
384-
let metadata = reader_metadata.metadata();
385-
386-
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
387-
let record_reader = builder.build().unwrap();
388-
let record_batches = record_reader
389-
.map(|x| x.unwrap())
390-
.collect::<Vec<RecordBatch>>();
391-
392-
verify_encryption_test_data(record_batches, metadata);
393-
}
394-
395381
fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
396382
metadata.row_groups().iter().map(|x| x.num_rows()).collect()
397383
}
@@ -630,6 +616,7 @@ fn uniform_encryption_page_skipping(page_index: bool) -> parquet::errors::Result
630616
fn test_write_non_uniform_encryption() {
631617
let testdata = arrow::util::test_util::parquet_test_data();
632618
let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
619+
let file = File::open(path).unwrap();
633620

634621
let footer_key = b"0123456789012345".to_vec(); // 128bit/16
635622
let column_names = vec!["double_field", "float_field"];
@@ -647,13 +634,14 @@ fn test_write_non_uniform_encryption() {
647634
.build()
648635
.unwrap();
649636

650-
read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties);
637+
read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties);
651638
}
652639

653640
#[test]
654641
fn test_write_uniform_encryption_plaintext_footer() {
655642
let testdata = arrow::util::test_util::parquet_test_data();
656643
let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
644+
let file = File::open(path).unwrap();
657645

658646
let footer_key = b"0123456789012345".to_vec(); // 128bit/16
659647
let wrong_footer_key = b"0000000000000000".to_vec(); // 128bit/16
@@ -679,7 +667,7 @@ fn test_write_uniform_encryption_plaintext_footer() {
679667

680668
// Try writing plaintext footer and then reading it with the correct footer key
681669
read_and_roundtrip_to_encrypted_file(
682-
&path,
670+
&file,
683671
decryption_properties.clone(),
684672
file_encryption_properties.clone(),
685673
);
@@ -688,7 +676,6 @@ fn test_write_uniform_encryption_plaintext_footer() {
688676
let temp_file = tempfile::tempfile().unwrap();
689677

690678
// read example data
691-
let file = File::open(path).unwrap();
692679
let options = ArrowReaderOptions::default()
693680
.with_file_decryption_properties(decryption_properties.clone());
694681
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
@@ -730,6 +717,7 @@ fn test_write_uniform_encryption_plaintext_footer() {
730717
fn test_write_uniform_encryption() {
731718
let testdata = arrow::util::test_util::parquet_test_data();
732719
let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
720+
let file = File::open(path).unwrap();
733721

734722
let footer_key = b"0123456789012345".to_vec(); // 128bit/16
735723

@@ -741,7 +729,7 @@ fn test_write_uniform_encryption() {
741729
.build()
742730
.unwrap();
743731

744-
read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties);
732+
read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties);
745733
}
746734

747735
#[test]
@@ -1061,43 +1049,3 @@ fn test_decrypt_page_index(
10611049

10621050
Ok(())
10631051
}
1064-
1065-
fn read_and_roundtrip_to_encrypted_file(
1066-
path: &str,
1067-
decryption_properties: FileDecryptionProperties,
1068-
encryption_properties: FileEncryptionProperties,
1069-
) {
1070-
let temp_file = tempfile::tempfile().unwrap();
1071-
1072-
// read example data
1073-
let file = File::open(path).unwrap();
1074-
let options = ArrowReaderOptions::default()
1075-
.with_file_decryption_properties(decryption_properties.clone());
1076-
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
1077-
1078-
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
1079-
let batch_reader = builder.build().unwrap();
1080-
let batches = batch_reader
1081-
.collect::<parquet::errors::Result<Vec<RecordBatch>, _>>()
1082-
.unwrap();
1083-
1084-
// write example data
1085-
let props = WriterProperties::builder()
1086-
.with_file_encryption_properties(encryption_properties)
1087-
.build();
1088-
1089-
let mut writer = ArrowWriter::try_new(
1090-
temp_file.try_clone().unwrap(),
1091-
metadata.schema().clone(),
1092-
Some(props),
1093-
)
1094-
.unwrap();
1095-
for batch in batches {
1096-
writer.write(&batch).unwrap();
1097-
}
1098-
1099-
writer.close().unwrap();
1100-
1101-
// check re-written example data
1102-
verify_encryption_test_file_read(temp_file, decryption_properties);
1103-
}

0 commit comments

Comments
 (0)