diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 1cf53baeedd9..28d690d353b5 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -155,6 +155,11 @@ name = "arrow_reader" required-features = ["arrow"] path = "./tests/arrow_reader/mod.rs" +[[test]] +name = "encryption" +required-features = ["arrow"] +path = "./tests/encryption/mod.rs" + [[bin]] name = "parquet-read" required-features = ["cli"] diff --git a/parquet/README.md b/parquet/README.md index 9245664b4ef0..8fc72bfbc32a 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -84,7 +84,7 @@ The `parquet` crate provides the following features which may be enabled in your - [ ] Row record writer - [x] Arrow record writer - [x] Async support - - [ ] Encrypted files + - [x] Encrypted files - [x] Predicate pushdown - [x] Parquet format 4.0.0 support diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index e0392f65d418..1e1054c9a063 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -35,11 +35,14 @@ use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_pr use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; use crate::arrow::ArrowSchemaConverter; use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; +use crate::column::page_encryption::PageEncryptor; use crate::column::writer::encoder::ColumnValueEncoder; use crate::column::writer::{ get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter, }; use crate::data_type::{ByteArray, FixedLenByteArray}; +#[cfg(feature = "encryption")] +use crate::encryption::encrypt::FileEncryptor; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{KeyValue, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; @@ -137,6 +140,9 @@ pub struct ArrowWriter { /// The schema is used to verify that each record batch written has the correct schema arrow_schema: SchemaRef, + /// Creates new [`ArrowRowGroupWriter`] instances as required + row_group_writer_factory: ArrowRowGroupWriterFactory, + /// The length of arrays to write to each row group max_row_group_size: usize, } @@ -195,10 +201,13 @@ impl ArrowWriter { let file_writer = SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?; + let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer); + Ok(Self { writer: file_writer, in_progress: None, arrow_schema, + row_group_writer_factory, max_row_group_size, }) } @@ -263,10 +272,11 @@ impl ArrowWriter { let in_progress = match &mut self.in_progress { Some(in_progress) => in_progress, - x => x.insert(ArrowRowGroupWriter::new( + x => x.insert(self.row_group_writer_factory.create_row_group_writer( self.writer.schema_descr(), self.writer.properties(), &self.arrow_schema, + self.writer.flushed_row_groups().len(), )?), }; @@ -458,19 +468,57 @@ type SharedColumnChunk = Arc>; #[derive(Default)] struct ArrowPageWriter { buffer: SharedColumnChunk, + #[cfg(feature = "encryption")] + page_encryptor: Option, +} + +impl ArrowPageWriter { + #[cfg(feature = "encryption")] + pub fn with_encryptor(mut self, page_encryptor: Option) -> Self { + self.page_encryptor = page_encryptor; + self + } + + #[cfg(feature = "encryption")] + fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> { + self.page_encryptor.as_mut() + } + + #[cfg(not(feature = "encryption"))] + fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> { + None + } } impl PageWriter for ArrowPageWriter { fn write_page(&mut self, page: CompressedPage) -> Result { - let mut buf = self.buffer.try_lock().unwrap(); + let page = match self.page_encryptor_mut() { + Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?, + None => page, + }; + let page_header = page.to_thrift_header(); let header = { let mut header = Vec::with_capacity(1024); - let mut protocol = TCompactOutputProtocol::new(&mut header); - page_header.write_to_out_protocol(&mut protocol)?; + + match self.page_encryptor_mut() { + Some(page_encryptor) => { + page_encryptor.encrypt_page_header(&page_header, &mut header)?; + if page.compressed_page().is_data_page() { + page_encryptor.increment_page(); + } + } + None => { + let mut protocol = TCompactOutputProtocol::new(&mut header); + page_header.write_to_out_protocol(&mut protocol)?; + } + }; + Bytes::from(header) }; + let mut buf = self.buffer.try_lock().unwrap(); + let data = page.compressed_page().buffer().clone(); let compressed_size = data.len() + header.len(); @@ -702,17 +750,12 @@ struct ArrowRowGroupWriter { } impl ArrowRowGroupWriter { - fn new( - parquet: &SchemaDescriptor, - props: &WriterPropertiesPtr, - arrow: &SchemaRef, - ) -> Result { - let writers = get_column_writers(parquet, props, arrow)?; - Ok(Self { + fn new(writers: Vec, arrow: &SchemaRef) -> Self { + Self { writers, schema: arrow.clone(), buffered_rows: 0, - }) + } } fn write(&mut self, batch: &RecordBatch) -> Result<()> { @@ -734,6 +777,55 @@ impl ArrowRowGroupWriter { } } +struct ArrowRowGroupWriterFactory { + #[cfg(feature = "encryption")] + file_encryptor: Option>, +} + +impl ArrowRowGroupWriterFactory { + #[cfg(feature = "encryption")] + fn new(file_writer: &SerializedFileWriter) -> Self { + Self { + file_encryptor: file_writer.file_encryptor(), + } + } + + #[cfg(not(feature = "encryption"))] + fn new(_file_writer: &SerializedFileWriter) -> Self { + Self {} + } + + #[cfg(feature = "encryption")] + fn create_row_group_writer( + &self, + parquet: &SchemaDescriptor, + props: &WriterPropertiesPtr, + arrow: &SchemaRef, + row_group_index: usize, + ) -> Result { + let writers = get_column_writers_with_encryptor( + parquet, + props, + arrow, + self.file_encryptor.clone(), + row_group_index, + )?; + Ok(ArrowRowGroupWriter::new(writers, arrow)) + } + + #[cfg(not(feature = "encryption"))] + fn create_row_group_writer( + &self, + parquet: &SchemaDescriptor, + props: &WriterPropertiesPtr, + arrow: &SchemaRef, + _row_group_index: usize, + ) -> Result { + let writers = get_column_writers(parquet, props, arrow)?; + Ok(ArrowRowGroupWriter::new(writers, arrow)) + } +} + /// Returns the [`ArrowColumnWriter`] for a given schema pub fn get_column_writers( parquet: &SchemaDescriptor, @@ -742,85 +834,173 @@ pub fn get_column_writers( ) -> Result> { let mut writers = Vec::with_capacity(arrow.fields.len()); let mut leaves = parquet.columns().iter(); + let column_factory = ArrowColumnWriterFactory::new(); for field in &arrow.fields { - get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?; + column_factory.get_arrow_column_writer( + field.data_type(), + props, + &mut leaves, + &mut writers, + )?; } Ok(writers) } -/// Gets the [`ArrowColumnWriter`] for the given `data_type` -fn get_arrow_column_writer( - data_type: &ArrowDataType, +/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption +#[cfg(feature = "encryption")] +fn get_column_writers_with_encryptor( + parquet: &SchemaDescriptor, props: &WriterPropertiesPtr, - leaves: &mut Iter<'_, ColumnDescPtr>, - out: &mut Vec, -) -> Result<()> { - let col = |desc: &ColumnDescPtr| { - let page_writer = Box::::default(); - let chunk = page_writer.buffer.clone(); - let writer = get_column_writer(desc.clone(), props.clone(), page_writer); - ArrowColumnWriter { - chunk, - writer: ArrowColumnWriterImpl::Column(writer), - } - }; + arrow: &SchemaRef, + file_encryptor: Option>, + row_group_index: usize, +) -> Result> { + let mut writers = Vec::with_capacity(arrow.fields.len()); + let mut leaves = parquet.columns().iter(); + let column_factory = + ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor); + for field in &arrow.fields { + column_factory.get_arrow_column_writer( + field.data_type(), + props, + &mut leaves, + &mut writers, + )?; + } + Ok(writers) +} - let bytes = |desc: &ColumnDescPtr| { - let page_writer = Box::::default(); - let chunk = page_writer.buffer.clone(); - let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer); - ArrowColumnWriter { - chunk, - writer: ArrowColumnWriterImpl::ByteArray(writer), - } - }; +/// Gets [`ArrowColumnWriter`] instances for different data types +struct ArrowColumnWriterFactory { + #[cfg(feature = "encryption")] + row_group_index: usize, + #[cfg(feature = "encryption")] + file_encryptor: Option>, +} - match data_type { - _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())), - ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())), - ArrowDataType::LargeBinary - | ArrowDataType::Binary - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 - | ArrowDataType::BinaryView - | ArrowDataType::Utf8View => { - out.push(bytes(leaves.next().unwrap())) - } - ArrowDataType::List(f) - | ArrowDataType::LargeList(f) - | ArrowDataType::FixedSizeList(f, _) => { - get_arrow_column_writer(f.data_type(), props, leaves, out)? +impl ArrowColumnWriterFactory { + pub fn new() -> Self { + Self { + #[cfg(feature = "encryption")] + row_group_index: 0, + #[cfg(feature = "encryption")] + file_encryptor: None, } - ArrowDataType::Struct(fields) => { - for field in fields { - get_arrow_column_writer(field.data_type(), props, leaves, out)? + } + + #[cfg(feature = "encryption")] + pub fn with_file_encryptor( + mut self, + row_group_index: usize, + file_encryptor: Option>, + ) -> Self { + self.row_group_index = row_group_index; + self.file_encryptor = file_encryptor; + self + } + + #[cfg(feature = "encryption")] + fn create_page_writer( + &self, + column_descriptor: &ColumnDescPtr, + column_index: usize, + ) -> Result> { + let column_path = column_descriptor.path().string(); + let page_encryptor = PageEncryptor::create_if_column_encrypted( + &self.file_encryptor, + self.row_group_index, + column_index, + &column_path, + )?; + Ok(Box::new( + ArrowPageWriter::default().with_encryptor(page_encryptor), + )) + } + + #[cfg(not(feature = "encryption"))] + fn create_page_writer( + &self, + _column_descriptor: &ColumnDescPtr, + _column_index: usize, + ) -> Result> { + Ok(Box::::default()) + } + + /// Gets the [`ArrowColumnWriter`] for the given `data_type` + fn get_arrow_column_writer( + &self, + data_type: &ArrowDataType, + props: &WriterPropertiesPtr, + leaves: &mut Iter<'_, ColumnDescPtr>, + out: &mut Vec, + ) -> Result<()> { + let col = |desc: &ColumnDescPtr| -> Result { + let page_writer = self.create_page_writer(desc, out.len())?; + let chunk = page_writer.buffer.clone(); + let writer = get_column_writer(desc.clone(), props.clone(), page_writer); + Ok(ArrowColumnWriter { + chunk, + writer: ArrowColumnWriterImpl::Column(writer), + }) + }; + + let bytes = |desc: &ColumnDescPtr| -> Result { + let page_writer = self.create_page_writer(desc, out.len())?; + let chunk = page_writer.buffer.clone(); + let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer); + Ok(ArrowColumnWriter { + chunk, + writer: ArrowColumnWriterImpl::ByteArray(writer), + }) + }; + + match data_type { + _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?), + ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?), + ArrowDataType::LargeBinary + | ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::BinaryView + | ArrowDataType::Utf8View => { + out.push(bytes(leaves.next().unwrap())?) } - } - ArrowDataType::Map(f, _) => match f.data_type() { - ArrowDataType::Struct(f) => { - get_arrow_column_writer(f[0].data_type(), props, leaves, out)?; - get_arrow_column_writer(f[1].data_type(), props, leaves, out)? + ArrowDataType::List(f) + | ArrowDataType::LargeList(f) + | ArrowDataType::FixedSizeList(f, _) => { + self.get_arrow_column_writer(f.data_type(), props, leaves, out)? } - _ => unreachable!("invalid map type"), - } - ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() { - ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => { - out.push(bytes(leaves.next().unwrap())) + ArrowDataType::Struct(fields) => { + for field in fields { + self.get_arrow_column_writer(field.data_type(), props, leaves, out)? + } } - ArrowDataType::Utf8View | ArrowDataType::BinaryView => { - out.push(bytes(leaves.next().unwrap())) + ArrowDataType::Map(f, _) => match f.data_type() { + ArrowDataType::Struct(f) => { + self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?; + self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)? + } + _ => unreachable!("invalid map type"), } - _ => { - out.push(col(leaves.next().unwrap())) + ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() { + ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => { + out.push(bytes(leaves.next().unwrap())?) + } + ArrowDataType::Utf8View | ArrowDataType::BinaryView => { + out.push(bytes(leaves.next().unwrap())?) + } + _ => { + out.push(col(leaves.next().unwrap())?) + } } + _ => return Err(ParquetError::NYI( + format!( + "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented" + ) + )) } - _ => return Err(ParquetError::NYI( - format!( - "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented" - ) - )) + Ok(()) } - Ok(()) } fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs index fc3c5cf34221..1e534bdd6b77 100644 --- a/parquet/src/column/mod.rs +++ b/parquet/src/column/mod.rs @@ -118,5 +118,10 @@ //! ``` pub mod page; +#[cfg(feature = "encryption")] +pub(crate) mod page_encryption; +#[cfg(not(feature = "encryption"))] +#[path = "page_encryption_disabled.rs"] +pub(crate) mod page_encryption; pub mod reader; pub mod writer; diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index b5afe6b93389..1dabe6794f07 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -263,6 +263,25 @@ impl CompressedPage { } page_header } + + /// Update the compressed buffer for a page. + /// This might be required when encrypting page data for example. + /// The size of uncompressed data must not change. + #[cfg(feature = "encryption")] + pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self { + match &mut self.compressed_page { + Page::DataPage { buf, .. } => { + *buf = new_buffer; + } + Page::DataPageV2 { buf, .. } => { + *buf = new_buffer; + } + Page::DictionaryPage { buf, .. } => { + *buf = new_buffer; + } + } + self + } } /// Contains page write metrics. diff --git a/parquet/src/column/page_encryption.rs b/parquet/src/column/page_encryption.rs new file mode 100644 index 000000000000..0fb7c8942675 --- /dev/null +++ b/parquet/src/column/page_encryption.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::column::page::CompressedPage; +use crate::encryption::ciphers::BlockEncryptor; +use crate::encryption::encrypt::{encrypt_object, FileEncryptor}; +use crate::encryption::modules::{create_module_aad, ModuleType}; +use crate::errors::ParquetError; +use crate::errors::Result; +use crate::format::PageHeader; +use crate::format::PageType; +use bytes::Bytes; +use std::io::Write; +use std::sync::Arc; + +#[derive(Debug)] +/// Encrypts page headers and page data for columns +pub(crate) struct PageEncryptor { + file_encryptor: Arc, + block_encryptor: Box, + row_group_index: usize, + column_index: usize, + page_index: usize, +} + +impl PageEncryptor { + /// Create a [`PageEncryptor`] for a column if it should be encrypted + pub fn create_if_column_encrypted( + file_encryptor: &Option>, + row_group_index: usize, + column_index: usize, + column_path: &str, + ) -> Result> { + match file_encryptor { + Some(file_encryptor) if file_encryptor.is_column_encrypted(column_path) => { + let block_encryptor = file_encryptor.get_column_encryptor(column_path)?; + Ok(Some(Self { + file_encryptor: file_encryptor.clone(), + block_encryptor, + row_group_index, + column_index, + page_index: 0, + })) + } + _ => Ok(None), + } + } + + /// Update the page index after a data page has been processed + pub fn increment_page(&mut self) { + self.page_index += 1; + } + + fn encrypt_page(&mut self, page: &CompressedPage) -> Result> { + let module_type = if page.compressed_page().is_data_page() { + ModuleType::DataPage + } else { + ModuleType::DictionaryPage + }; + let aad = create_module_aad( + self.file_encryptor.file_aad(), + module_type, + self.row_group_index, + self.column_index, + Some(self.page_index), + )?; + let encrypted_buffer = self.block_encryptor.encrypt(page.data(), &aad)?; + + Ok(encrypted_buffer) + } + + /// Encrypt compressed column page data + pub fn encrypt_compressed_page(&mut self, page: CompressedPage) -> Result { + let encrypted_page = self.encrypt_page(&page)?; + Ok(page.with_new_compressed_buffer(Bytes::from(encrypted_page))) + } + + /// Encrypt a column page header + pub fn encrypt_page_header( + &mut self, + page_header: &PageHeader, + sink: &mut W, + ) -> Result<()> { + let module_type = match page_header.type_ { + PageType::DATA_PAGE => ModuleType::DataPageHeader, + PageType::DATA_PAGE_V2 => ModuleType::DataPageHeader, + PageType::DICTIONARY_PAGE => ModuleType::DictionaryPageHeader, + _ => { + return Err(general_err!( + "Unsupported page type for page header encryption: {:?}", + page_header.type_ + )) + } + }; + let aad = create_module_aad( + self.file_encryptor.file_aad(), + module_type, + self.row_group_index, + self.column_index, + Some(self.page_index), + )?; + + encrypt_object(page_header, &mut self.block_encryptor, sink, &aad) + } +} diff --git a/parquet/src/column/page_encryption_disabled.rs b/parquet/src/column/page_encryption_disabled.rs new file mode 100644 index 000000000000..e85b0281168a --- /dev/null +++ b/parquet/src/column/page_encryption_disabled.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::column::page::CompressedPage; +use crate::errors::Result; +use crate::format::PageHeader; +use std::io::Write; + +#[derive(Debug)] +/// Dummy PageEncryptor struct that can never be instantiated, +/// provided to support compilation without the encryption feature enabled. +pub(crate) struct PageEncryptor { + _empty: (), +} + +impl PageEncryptor { + pub fn increment_page(&mut self) {} + + pub fn encrypt_compressed_page(&mut self, _page: CompressedPage) -> Result { + unreachable!("The encryption feature is disabled") + } + + pub fn encrypt_page_header( + &mut self, + _page_header: &PageHeader, + _sink: &mut W, + ) -> Result<()> { + unreachable!("The encryption feature is disabled") + } +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index ae418237515e..18373fe7f6fd 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -32,8 +32,12 @@ use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::encodings::levels::LevelEncoder; +#[cfg(feature = "encryption")] +use crate::encryption::encrypt::get_column_crypto_metadata; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder}; +use crate::file::metadata::{ + ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder, +}; use crate::file::properties::EnabledStatistics; use crate::file::statistics::{Statistics, ValueStatistics}; use crate::file::{ @@ -1199,6 +1203,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ); } + builder = self.set_column_chunk_encryption_properties(builder); + let metadata = builder.build()?; Ok(metadata) } @@ -1292,6 +1298,31 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { _ => {} } } + + #[inline] + #[cfg(feature = "encryption")] + fn set_column_chunk_encryption_properties( + &self, + builder: ColumnChunkMetaDataBuilder, + ) -> ColumnChunkMetaDataBuilder { + if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() { + builder.set_column_crypto_metadata(get_column_crypto_metadata( + encryption_properties, + &self.descr, + )) + } else { + builder + } + } + + #[inline] + #[cfg(not(feature = "encryption"))] + fn set_column_chunk_encryption_properties( + &self, + builder: ColumnChunkMetaDataBuilder, + ) -> ColumnChunkMetaDataBuilder { + builder + } } fn update_min(descr: &ColumnDescriptor, val: &T, min: &mut Option) { diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 9b5a04d622f0..18a6f5776d6b 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -15,16 +15,19 @@ // specific language governing permissions and limitations // under the License. +use crate::errors::ParquetError; use crate::errors::ParquetError::General; use crate::errors::Result; -use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM}; +use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; +use ring::rand::{SecureRandom, SystemRandom}; use std::fmt::Debug; +const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff; const NONCE_LEN: usize = 12; const TAG_LEN: usize = 16; const SIZE_LEN: usize = 4; -pub trait BlockDecryptor: Debug + Send + Sync { +pub(crate) trait BlockDecryptor: Debug + Send + Sync { fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Result>; } @@ -61,3 +64,121 @@ impl BlockDecryptor for RingGcmBlockDecryptor { Ok(result) } } + +pub(crate) trait BlockEncryptor: Debug + Send + Sync { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Result>; +} + +#[derive(Debug, Clone)] +struct CounterNonce { + start: u128, + counter: u128, +} + +impl CounterNonce { + pub fn new(rng: &SystemRandom) -> Result { + let mut buf = [0; 16]; + rng.fill(&mut buf)?; + + // Since this is a random seed value, endianness doesn't matter at all, + // and we can use whatever is platform-native. + let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE; + let counter = start.wrapping_add(1); + + Ok(Self { start, counter }) + } + + /// One accessor for the nonce bytes to avoid potentially flipping endianness + #[inline] + pub fn get_bytes(&self) -> [u8; NONCE_LEN] { + self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap() + } +} + +impl NonceSequence for CounterNonce { + fn advance(&mut self) -> Result { + // If we've wrapped around, we've exhausted this nonce sequence + if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) { + Err(ring::error::Unspecified) + } else { + // Otherwise, just advance and return the new value + let buf: [u8; NONCE_LEN] = self.get_bytes(); + self.counter = self.counter.wrapping_add(1); + Ok(ring::aead::Nonce::assume_unique_for_key(buf)) + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct RingGcmBlockEncryptor { + key: LessSafeKey, + nonce_sequence: CounterNonce, +} + +impl RingGcmBlockEncryptor { + /// Create a new `RingGcmBlockEncryptor` with a given key and random nonce. + /// The nonce will advance appropriately with each block encryption and + /// return an error if it wraps around. + pub(crate) fn new(key_bytes: &[u8]) -> Result { + let rng = SystemRandom::new(); + + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes) + .map_err(|e| general_err!("Error creating AES key: {}", e))?; + let nonce = CounterNonce::new(&rng)?; + + Ok(Self { + key: LessSafeKey::new(key), + nonce_sequence: nonce, + }) + } +} + +impl BlockEncryptor for RingGcmBlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Result> { + // Create encrypted buffer. + // Format is: [ciphertext size, nonce, ciphertext, authentication tag] + let ciphertext_length: u32 = (NONCE_LEN + plaintext.len() + TAG_LEN) + .try_into() + .map_err(|err| General(format!("Plaintext data too long. {:?}", err)))?; + // Not checking for overflow here because we've already checked for it with ciphertext_length + let mut ciphertext = Vec::with_capacity(SIZE_LEN + ciphertext_length as usize); + ciphertext.extend((ciphertext_length).to_le_bytes()); + + let nonce = self.nonce_sequence.advance()?; + ciphertext.extend(nonce.as_ref()); + ciphertext.extend(plaintext); + + let tag = self.key.seal_in_place_separate_tag( + nonce, + Aad::from(aad), + &mut ciphertext[SIZE_LEN + NONCE_LEN..], + )?; + + ciphertext.extend(tag.as_ref()); + + debug_assert_eq!(SIZE_LEN + ciphertext_length as usize, ciphertext.len()); + + Ok(ciphertext) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_round_trip() { + let key = [0u8; 16]; + let mut encryptor = RingGcmBlockEncryptor::new(&key).unwrap(); + let decryptor = RingGcmBlockDecryptor::new(&key).unwrap(); + + let plaintext = b"hello, world!"; + let aad = b"some aad"; + + let ciphertext = encryptor.encrypt(plaintext, aad).unwrap(); + let decrypted = decryptor.decrypt(&ciphertext, aad).unwrap(); + + assert_eq!(plaintext, decrypted.as_slice()); + } +} diff --git a/parquet/src/encryption/decrypt.rs b/parquet/src/encryption/decrypt.rs index 7674619f4d4c..41e5757f3670 100644 --- a/parquet/src/encryption/decrypt.rs +++ b/parquet/src/encryption/decrypt.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Configuration and utilities for decryption of files using Parquet Modular Encryption + use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor}; use crate::encryption::modules::{create_module_aad, ModuleType}; use crate::errors::{ParquetError, Result}; @@ -27,10 +29,11 @@ use std::sync::Arc; /// Trait for retrieving an encryption key using the key's metadata pub trait KeyRetriever: Send + Sync { + /// Retrieve a decryption key given the key metadata fn retrieve_key(&self, key_metadata: &[u8]) -> Result>; } -pub fn read_and_decrypt( +pub(crate) fn read_and_decrypt( decryptor: &Arc, input: &mut T, aad: &[u8], @@ -196,7 +199,7 @@ impl PartialEq for DecryptionKeys { #[derive(Clone, PartialEq)] pub struct FileDecryptionProperties { keys: DecryptionKeys, - pub(crate) aad_prefix: Option>, + aad_prefix: Option>, } impl FileDecryptionProperties { @@ -212,9 +215,14 @@ impl FileDecryptionProperties { DecryptionPropertiesBuilder::new_with_key_retriever(key_retriever) } + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub fn aad_prefix(&self) -> Option<&Vec> { + self.aad_prefix.as_ref() + } + /// Get the encryption key for decrypting a file's footer, /// and also column data if uniform encryption is used. - pub(crate) fn footer_key(&self, key_metadata: Option<&[u8]>) -> Result>> { + pub fn footer_key(&self, key_metadata: Option<&[u8]>) -> Result>> { match &self.keys { DecryptionKeys::Explicit(keys) => Ok(Cow::Borrowed(&keys.footer_key)), DecryptionKeys::ViaRetriever(retriever) => { @@ -225,7 +233,7 @@ impl FileDecryptionProperties { } /// Get the column-specific encryption key for decrypting column data and metadata within a file - pub(crate) fn column_key( + pub fn column_key( &self, column_name: &str, key_metadata: Option<&[u8]>, @@ -233,7 +241,7 @@ impl FileDecryptionProperties { match &self.keys { DecryptionKeys::Explicit(keys) => match keys.column_keys.get(column_name) { None => Err(general_err!( - "No column decryption key set for column '{}'", + "No column decryption key set for encrypted column '{}'", column_name )), Some(key) => Ok(Cow::Borrowed(key)), @@ -244,6 +252,22 @@ impl FileDecryptionProperties { } } } + + /// Get the column names and associated decryption keys that have been configured. + /// If a key retriever is used rather than explicit decryption keys, the result + /// will be empty. + /// Provided for testing consumer code. + pub fn column_keys(&self) -> (Vec, Vec>) { + let mut column_names: Vec = Vec::new(); + let mut column_keys: Vec> = Vec::new(); + if let DecryptionKeys::Explicit(keys) = &self.keys { + for (key, value) in keys.column_keys.iter() { + column_names.push(key.clone()); + column_keys.push(value.clone()); + } + } + (column_names, column_keys) + } } impl std::fmt::Debug for FileDecryptionProperties { @@ -324,6 +348,21 @@ impl DecryptionPropertiesBuilder { .insert(column_name.to_string(), decryption_key); self } + + /// Specify multiple column decryption keys + pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec>) -> Result { + if column_names.len() != keys.len() { + return Err(general_err!( + "The number of column names ({}) does not match the number of keys ({})", + column_names.len(), + keys.len() + )); + } + for (column_name, key) in column_names.into_iter().zip(keys.into_iter()) { + self.column_keys.insert(column_name.to_string(), key); + } + Ok(self) + } } #[derive(Clone, Debug)] diff --git a/parquet/src/encryption/encrypt.rs b/parquet/src/encryption/encrypt.rs new file mode 100644 index 000000000000..13cab64fa6a1 --- /dev/null +++ b/parquet/src/encryption/encrypt.rs @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Configuration and utilities for Parquet Modular Encryption + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +struct EncryptionKey { + key: Vec, + key_metadata: Option>, +} + +impl EncryptionKey { + fn new(key: Vec) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + fn with_metadata(mut self, metadata: Vec) -> Self { + self.key_metadata = Some(metadata); + self + } + + fn key(&self) -> &Vec { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +/// Defines how data in a Parquet file should be encrypted +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap, + aad_prefix: Option>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + /// Create a new builder for encryption properties with the given footer encryption key + pub fn builder(footer_key: Vec) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + /// Should the footer be encrypted + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + /// Retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn footer_key_metadata(&self) -> Option<&Vec> { + self.footer_key.key_metadata.as_ref() + } + + /// Retrieval of key used for encryption of footer and (possibly) columns + pub fn footer_key(&self) -> &Vec { + &self.footer_key.key + } + + /// Get the column names, keys, and metadata for columns to be encrypted + pub fn column_keys(&self) -> (Vec, Vec>, Vec>) { + let mut column_names: Vec = Vec::with_capacity(self.column_keys.len()); + let mut keys: Vec> = Vec::with_capacity(self.column_keys.len()); + let mut meta: Vec> = Vec::with_capacity(self.column_keys.len()); + for (key, value) in self.column_keys.iter() { + column_names.push(key.clone()); + keys.push(value.key.clone()); + if let Some(metadata) = value.key_metadata.as_ref() { + meta.push(metadata.clone()); + } + } + (column_names, keys, meta) + } + + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub fn aad_prefix(&self) -> Option<&Vec> { + self.aad_prefix.as_ref() + } + + /// Should the AAD prefix be stored in the file + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + pub(crate) fn validate_encrypted_column_names( + &self, + schema: &SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let column_paths = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::>(); + if !encryption_columns.is_subset(&column_paths) { + let mut columns_missing_in_schema = encryption_columns + .difference(&column_paths) + .cloned() + .collect::>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "The following columns with encryption keys specified were not found in the schema: {}", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +/// Builder for [`FileEncryptionProperties`] +pub struct EncryptionPropertiesBuilder { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap, + aad_prefix: Option>, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + /// Create a new [`EncryptionPropertiesBuilder`] with the given footer encryption key + pub fn new(footer_key: Vec) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: false, + } + } + + /// Set if the footer should be stored in plaintext (not encrypted). Defaults to false. + pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self { + self.encrypt_footer = !plaintext_footer; + self + } + + /// Set retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn with_footer_key_metadata(mut self, metadata: Vec) -> Self { + self.footer_key = self.footer_key.with_metadata(metadata); + self + } + + /// Set the key used for encryption of a column. Note that if no column keys are configured then + /// all columns will be encrypted with the footer key. + /// If any column keys are configured then only the columns with a key will be encrypted. + pub fn with_column_key(mut self, column_name: &str, key: Vec) -> Self { + self.column_keys + .insert(column_name.to_string(), EncryptionKey::new(key)); + self + } + + /// Set the key used for encryption of a column and its metadata. The Key's metadata field is to + /// enable file readers to recover the key. For example, the metadata can keep a serialized + /// ID of a data key. Note that if no column keys are configured then all columns + /// will be encrypted with the footer key. If any column keys are configured then only the + /// columns with a key will be encrypted. + pub fn with_column_key_and_metadata( + mut self, + column_name: &str, + key: Vec, + metadata: Vec, + ) -> Self { + self.column_keys.insert( + column_name.to_string(), + EncryptionKey::new(key).with_metadata(metadata), + ); + self + } + + /// Set the keys used for encryption of columns. Analogous to + /// with_column_key but for multiple columns. This will add column keys provided to the + /// existing column keys. If column keys were already provided for some columns, the new keys + /// will overwrite the old ones. + pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec>) -> Result { + if column_names.len() != keys.len() { + return Err(general_err!( + "The number of column names ({}) does not match the number of keys ({})", + column_names.len(), + keys.len() + )); + } + for (i, column_name) in column_names.into_iter().enumerate() { + self.column_keys + .insert(column_name.to_string(), EncryptionKey::new(keys[i].clone())); + } + Ok(self) + } + + /// The AAD prefix uniquely identifies the file and allows to differentiate it e.g. from + /// older versions of the file or from other partition files in the same data set (table). + /// These bytes are optionally passed by a writer upon file creation. When not specified, no + /// AAD prefix is used. + pub fn with_aad_prefix(mut self, aad_prefix: Vec) -> Self { + self.aad_prefix = Some(aad_prefix); + self + } + + /// Should the AAD prefix be stored in the file. If false, readers will need to provide the + /// AAD prefix to be able to decrypt data. Defaults to false. + pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self { + self.store_aad_prefix = store_aad_prefix; + self + } + + /// Build the encryption properties + pub fn build(self) -> Result { + Ok(FileEncryptionProperties { + encrypt_footer: self.encrypt_footer, + footer_key: self.footer_key, + column_keys: self.column_keys, + aad_prefix: self.aad_prefix, + store_aad_prefix: self.store_aad_prefix, + }) + } +} + +#[derive(Debug)] +/// The encryption configuration for a single Parquet file +pub(crate) struct FileEncryptor { + properties: FileEncryptionProperties, + aad_file_unique: Vec, + file_aad: Vec, +} + +impl FileEncryptor { + pub(crate) fn new(properties: FileEncryptionProperties) -> Result { + // Generate unique AAD for file + let rng = SystemRandom::new(); + let mut aad_file_unique = vec![0u8; 8]; + rng.fill(&mut aad_file_unique)?; + + let file_aad = match properties.aad_prefix.as_ref() { + None => aad_file_unique.clone(), + Some(aad_prefix) => [aad_prefix.clone(), aad_file_unique.clone()].concat(), + }; + + Ok(Self { + properties, + aad_file_unique, + file_aad, + }) + } + + /// Get the encryptor's file encryption properties + pub fn properties(&self) -> &FileEncryptionProperties { + &self.properties + } + + /// Combined AAD prefix and suffix for the file generated + pub fn file_aad(&self) -> &[u8] { + &self.file_aad + } + + /// Unique file identifier part of AAD suffix. The full AAD suffix is generated per module by + /// concatenating aad_file_unique, module type, row group ordinal (all except + /// footer), column ordinal (all except footer) and page ordinal (data page and + /// header only). + pub fn aad_file_unique(&self) -> &Vec { + &self.aad_file_unique + } + + /// Returns whether data for the specified column should be encrypted + pub fn is_column_encrypted(&self, column_path: &str) -> bool { + if self.properties.column_keys.is_empty() { + // Uniform encryption + true + } else { + self.properties.column_keys.contains_key(column_path) + } + } + + /// Get the BlockEncryptor for the footer + pub(crate) fn get_footer_encryptor(&self) -> Result> { + Ok(Box::new(RingGcmBlockEncryptor::new( + &self.properties.footer_key.key, + )?)) + } + + /// Get the encryptor for a column. + /// Will return an error if the column is not an encrypted column. + pub(crate) fn get_column_encryptor( + &self, + column_path: &str, + ) -> Result> { + if self.properties.column_keys.is_empty() { + return self.get_footer_encryptor(); + } + match self.properties.column_keys.get(column_path) { + None => Err(general_err!("Column '{}' is not encrypted", column_path)), + Some(column_key) => Ok(Box::new(RingGcmBlockEncryptor::new(column_key.key())?)), + } + } +} + +/// Write an encrypted Thrift serializable object +pub(crate) fn encrypt_object( + object: &T, + encryptor: &mut Box, + sink: &mut W, + module_aad: &[u8], +) -> Result<()> { + let encrypted_buffer = encrypt_object_to_vec(object, encryptor, module_aad)?; + sink.write_all(&encrypted_buffer)?; + Ok(()) +} + +/// Encrypt a Thrift serializable object to a byte vector +pub(crate) fn encrypt_object_to_vec( + object: &T, + encryptor: &mut Box, + module_aad: &[u8], +) -> Result> { + let mut buffer: Vec = vec![]; + { + let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut buffer); + object.write_to_out_protocol(&mut unencrypted_protocol)?; + } + + encryptor.encrypt(buffer.as_ref(), module_aad) +} + +/// Get the crypto metadata for a column from the file encryption properties +pub(crate) fn get_column_crypto_metadata( + properties: &FileEncryptionProperties, + column: &ColumnDescPtr, +) -> Option { + if properties.column_keys.is_empty() { + // Uniform encryption + Some(ColumnCryptoMetaData::EncryptionWithFooterKey) + } else { + properties + .column_keys + .get(&column.path().string()) + .map(|encryption_key| { + // Column is encrypted with a column specific key + ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey { + path_in_schema: column.path().parts().to_vec(), + key_metadata: encryption_key.key_metadata.clone(), + }) + }) + } +} diff --git a/parquet/src/encryption/mod.rs b/parquet/src/encryption/mod.rs index b4f4d6d221be..062c351ac1cd 100644 --- a/parquet/src/encryption/mod.rs +++ b/parquet/src/encryption/mod.rs @@ -20,4 +20,5 @@ pub(crate) mod ciphers; pub mod decrypt; +pub mod encrypt; pub(crate) mod modules; diff --git a/parquet/src/encryption/modules.rs b/parquet/src/encryption/modules.rs index 6bf9306b256d..418c3827c077 100644 --- a/parquet/src/encryption/modules.rs +++ b/parquet/src/encryption/modules.rs @@ -25,6 +25,10 @@ pub(crate) enum ModuleType { DictionaryPage = 3, DataPageHeader = 4, DictionaryPageHeader = 5, + ColumnIndex = 6, + OffsetIndex = 7, + _BloomFilterHeader = 8, + _BloomFilterBitset = 9, } pub fn create_footer_aad(file_aad: &[u8]) -> crate::errors::Result> { diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 81157399ac58..233a55778721 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -109,7 +109,6 @@ use crate::file::page_encoding_stats::{self, PageEncodingStats}; use crate::file::page_index::index::Index; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::statistics::{self, Statistics}; -#[cfg(feature = "encryption")] use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData; use crate::format::{ BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, @@ -1261,7 +1260,7 @@ impl ColumnChunkMetaData { offset_index_length: self.offset_index_length, column_index_offset: self.column_index_offset, column_index_length: self.column_index_length, - crypto_metadata: None, + crypto_metadata: self.column_crypto_metadata_thrift(), encrypted_column_metadata: None, } } @@ -1318,6 +1317,18 @@ impl ColumnChunkMetaData { pub fn into_builder(self) -> ColumnChunkMetaDataBuilder { ColumnChunkMetaDataBuilder::from(self) } + + #[cfg(feature = "encryption")] + fn column_crypto_metadata_thrift(&self) -> Option { + self.column_crypto_metadata + .as_ref() + .map(column_crypto_metadata::to_thrift) + } + + #[cfg(not(feature = "encryption"))] + fn column_crypto_metadata_thrift(&self) -> Option { + None + } } /// Builder for [`ColumnChunkMetaData`] @@ -1519,6 +1530,13 @@ impl ColumnChunkMetaDataBuilder { self } + #[cfg(feature = "encryption")] + /// Set the encryption metadata for an encrypted column + pub fn set_column_crypto_metadata(mut self, value: Option) -> Self { + self.0.column_crypto_metadata = value; + self + } + /// Builds column chunk metadata. pub fn build(self) -> Result { Ok(self.0) diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index f0eedd2bb99d..f03325964235 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -815,6 +815,17 @@ impl ParquetMetaDataReader { let t_file_crypto_metadata: TFileCryptoMetaData = TFileCryptoMetaData::read_from_in_protocol(&mut prot) .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; + let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { + EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix, + _ => Some(false), + } + .unwrap_or(false); + if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() { + return Err(general_err!( + "Parquet file was encrypted with an AAD prefix that is not stored in the file, \ + but no AAD prefix was provided in the file decryption properties" + )); + } let decryptor = get_file_decryptor( t_file_crypto_metadata.encryption_algorithm, t_file_crypto_metadata.key_metadata.as_deref(), @@ -959,8 +970,8 @@ fn get_file_decryptor( let aad_file_unique = algo .aad_file_unique .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; - let aad_prefix = if file_decryption_properties.aad_prefix.is_some() { - file_decryption_properties.aad_prefix.clone().unwrap() + let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() { + aad_prefix.clone() } else { algo.aad_prefix.unwrap_or_default() }; diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 87f1fdebd91e..c1fc41314415 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -15,12 +15,19 @@ // specific language governing permissions and limitations // under the License. +#[cfg(feature = "encryption")] +use crate::encryption::encrypt::{encrypt_object, encrypt_object_to_vec, FileEncryptor}; +#[cfg(feature = "encryption")] +use crate::encryption::modules::{create_footer_aad, create_module_aad, ModuleType}; +#[cfg(feature = "encryption")] +use crate::errors::ParquetError; use crate::errors::Result; use crate::file::metadata::{KeyValue, ParquetMetaData}; use crate::file::page_index::index::Index; -use crate::file::writer::TrackedWrite; -use crate::file::PARQUET_MAGIC; -use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; +use crate::file::writer::{get_file_magic, TrackedWrite}; +#[cfg(feature = "encryption")] +use crate::format::{AesGcmV1, ColumnCryptoMetaData, EncryptionAlgorithm}; +use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup}; use crate::schema::types; use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::thrift::TSerializable; @@ -40,6 +47,7 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> { offset_indexes: Option<&'a [Vec>]>, key_value_metadata: Option>, created_by: Option, + object_writer: MetadataObjectWriter, writer_version: i32, } @@ -57,8 +65,13 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() { if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] { let start_offset = self.buf.bytes_written(); - let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - offset_index.write_to_out_protocol(&mut protocol)?; + self.object_writer.write_offset_index( + offset_index, + column_metadata, + row_group_idx, + column_idx, + &mut self.buf, + )?; let end_offset = self.buf.bytes_written(); // set offset and index for offset index column_metadata.offset_index_offset = Some(start_offset as i64); @@ -82,8 +95,13 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() { if let Some(column_index) = &column_indexes[row_group_idx][column_idx] { let start_offset = self.buf.bytes_written(); - let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - column_index.write_to_out_protocol(&mut protocol)?; + self.object_writer.write_column_index( + column_index, + column_metadata, + row_group_idx, + column_idx, + &mut self.buf, + )?; let end_offset = self.buf.bytes_written(); // set offset and index for offset index column_metadata.column_index_offset = Some(start_offset as i64); @@ -119,9 +137,13 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { // But for simplicity we always set this field. let column_orders = Some(column_orders); - let file_metadata = crate::format::FileMetaData { + let (row_groups, unencrypted_row_groups) = self + .object_writer + .apply_row_group_encryption(self.row_groups)?; + + let mut file_metadata = FileMetaData { num_rows, - row_groups: self.row_groups, + row_groups, key_value_metadata: self.key_value_metadata.clone(), version: self.writer_version, schema: types::to_thrift(self.schema.as_ref())?, @@ -133,17 +155,24 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { // Write file metadata let start_pos = self.buf.bytes_written(); - { - let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - file_metadata.write_to_out_protocol(&mut protocol)?; - } + self.object_writer + .write_file_metadata(&file_metadata, &mut self.buf)?; let end_pos = self.buf.bytes_written(); // Write footer let metadata_len = (end_pos - start_pos) as u32; self.buf.write_all(&metadata_len.to_le_bytes())?; - self.buf.write_all(&PARQUET_MAGIC)?; + self.buf.write_all(self.object_writer.get_file_magic())?; + + if let Some(row_groups) = unencrypted_row_groups { + // If row group metadata was encrypted, we replace the encrypted row groups with + // unencrypted metadata before it is returned to users. This allows the metadata + // to be usable for retrieving the row group statistics for example, without users + // needing to decrypt the metadata. + file_metadata.row_groups = row_groups; + } + Ok(file_metadata) } @@ -164,6 +193,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { offset_indexes: None, key_value_metadata: None, created_by, + object_writer: Default::default(), writer_version, } } @@ -182,6 +212,12 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { self.key_value_metadata = Some(key_value_metadata); self } + + #[cfg(feature = "encryption")] + pub fn with_file_encryptor(mut self, file_encryptor: Option>) -> Self { + self.object_writer = self.object_writer.with_file_encryptor(file_encryptor); + self + } } /// Writes [`ParquetMetaData`] to a byte stream @@ -378,3 +414,295 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { } } } + +#[derive(Debug, Default)] +struct MetadataObjectWriter { + #[cfg(feature = "encryption")] + file_encryptor: Option>, +} + +impl MetadataObjectWriter { + #[inline] + fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> { + let mut protocol = TCompactOutputProtocol::new(sink); + object.write_to_out_protocol(&mut protocol)?; + Ok(()) + } +} + +/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled +#[cfg(not(feature = "encryption"))] +impl MetadataObjectWriter { + /// Write [`FileMetaData`] in Thrift format + fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> { + Self::write_object(file_metadata, sink) + } + + /// Write a column [`OffsetIndex`] in Thrift format + fn write_offset_index( + &self, + offset_index: &OffsetIndex, + _column_chunk: &ColumnChunk, + _row_group_idx: usize, + _column_idx: usize, + sink: impl Write, + ) -> Result<()> { + Self::write_object(offset_index, sink) + } + + /// Write a column [`ColumnIndex`] in Thrift format + fn write_column_index( + &self, + column_index: &ColumnIndex, + _column_chunk: &ColumnChunk, + _row_group_idx: usize, + _column_idx: usize, + sink: impl Write, + ) -> Result<()> { + Self::write_object(column_index, sink) + } + + /// No-op implementation of row-group metadata encryption + fn apply_row_group_encryption( + &self, + row_groups: Vec, + ) -> Result<(Vec, Option>)> { + Ok((row_groups, None)) + } + + /// Get the "magic" bytes identifying the file type + pub fn get_file_magic(&self) -> &[u8; 4] { + get_file_magic() + } +} + +/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled +#[cfg(feature = "encryption")] +impl MetadataObjectWriter { + /// Set the file encryptor to use + fn with_file_encryptor(mut self, encryptor: Option>) -> Self { + self.file_encryptor = encryptor; + self + } + + /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required + fn write_file_metadata( + &self, + file_metadata: &FileMetaData, + mut sink: impl Write, + ) -> Result<()> { + match self.file_encryptor.as_ref() { + Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => { + // First write FileCryptoMetadata + let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?; + let mut protocol = TCompactOutputProtocol::new(&mut sink); + crypto_metadata.write_to_out_protocol(&mut protocol)?; + + // Then write encrypted footer + let aad = create_footer_aad(file_encryptor.file_aad())?; + let mut encryptor = file_encryptor.get_footer_encryptor()?; + encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad) + } + _ => Self::write_object(file_metadata, &mut sink), + } + } + + /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required + fn write_offset_index( + &self, + offset_index: &OffsetIndex, + column_chunk: &ColumnChunk, + row_group_idx: usize, + column_idx: usize, + sink: impl Write, + ) -> Result<()> { + match &self.file_encryptor { + Some(file_encryptor) => Self::write_object_with_encryption( + offset_index, + sink, + file_encryptor, + column_chunk, + ModuleType::OffsetIndex, + row_group_idx, + column_idx, + ), + None => Self::write_object(offset_index, sink), + } + } + + /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required + fn write_column_index( + &self, + column_index: &ColumnIndex, + column_chunk: &ColumnChunk, + row_group_idx: usize, + column_idx: usize, + sink: impl Write, + ) -> Result<()> { + match &self.file_encryptor { + Some(file_encryptor) => Self::write_object_with_encryption( + column_index, + sink, + file_encryptor, + column_chunk, + ModuleType::ColumnIndex, + row_group_idx, + column_idx, + ), + None => Self::write_object(column_index, sink), + } + } + + /// If encryption is enabled and configured, encrypt row group metadata. + /// Returns a tuple of the row group metadata to write, + /// and possibly unencrypted metadata to be returned to clients if data was encrypted. + fn apply_row_group_encryption( + &self, + row_groups: Vec, + ) -> Result<(Vec, Option>)> { + match &self.file_encryptor { + Some(file_encryptor) => { + let unencrypted_row_groups = row_groups.clone(); + let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?; + Ok((encrypted_row_groups, Some(unencrypted_row_groups))) + } + None => Ok((row_groups, None)), + } + } + + /// Get the "magic" bytes identifying the file type + fn get_file_magic(&self) -> &[u8; 4] { + get_file_magic( + self.file_encryptor + .as_ref() + .map(|encryptor| encryptor.properties()), + ) + } + + fn write_object_with_encryption( + object: &impl TSerializable, + mut sink: impl Write, + file_encryptor: &FileEncryptor, + column_metadata: &ColumnChunk, + module_type: ModuleType, + row_group_index: usize, + column_index: usize, + ) -> Result<()> { + let column_path_vec = &column_metadata + .meta_data + .as_ref() + .ok_or_else(|| { + general_err!( + "Column metadata not set for column {} when encrypting object", + column_index + ) + })? + .path_in_schema; + + let joined_column_path; + let column_path = if column_path_vec.len() == 1 { + &column_path_vec[0] + } else { + joined_column_path = column_path_vec.join("."); + &joined_column_path + }; + + if file_encryptor.is_column_encrypted(column_path) { + let aad = create_module_aad( + file_encryptor.file_aad(), + module_type, + row_group_index, + column_index, + None, + )?; + let mut encryptor = file_encryptor.get_column_encryptor(column_path)?; + encrypt_object(object, &mut encryptor, &mut sink, &aad) + } else { + Self::write_object(object, sink) + } + } + + fn file_crypto_metadata( + file_encryptor: &FileEncryptor, + ) -> Result { + let properties = file_encryptor.properties(); + let supply_aad_prefix = properties + .aad_prefix() + .map(|_| !properties.store_aad_prefix()); + let encryption_algorithm = AesGcmV1 { + aad_prefix: if properties.store_aad_prefix() { + properties.aad_prefix().cloned() + } else { + None + }, + aad_file_unique: Some(file_encryptor.aad_file_unique().clone()), + supply_aad_prefix, + }; + + Ok(crate::format::FileCryptoMetaData { + encryption_algorithm: EncryptionAlgorithm::AESGCMV1(encryption_algorithm), + key_metadata: properties.footer_key_metadata().cloned(), + }) + } + + fn encrypt_row_groups( + row_groups: Vec, + file_encryptor: &Arc, + ) -> Result> { + row_groups + .into_iter() + .enumerate() + .map(|(rg_idx, mut rg)| { + let cols: Result> = rg + .columns + .into_iter() + .enumerate() + .map(|(col_idx, c)| { + Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx) + }) + .collect(); + rg.columns = cols?; + Ok(rg) + }) + .collect() + } + + /// Apply column encryption to column chunk metadata + fn encrypt_column_chunk( + mut column_chunk: ColumnChunk, + file_encryptor: &Arc, + row_group_index: usize, + column_index: usize, + ) -> Result { + // Column crypto metadata should have already been set when the column was created. + // Here we apply the encryption by encrypting the column metadata if required. + match column_chunk.crypto_metadata.as_ref() { + None => {} + Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => { + // When uniform encryption is used the footer is already encrypted, + // so the column chunk does not need additional encryption. + } + Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => { + let column_path = col_key.path_in_schema.join("."); + let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?; + let meta_data = column_chunk + .meta_data + .take() + .ok_or_else(|| general_err!("Column metadata not set for encryption"))?; + let aad = create_module_aad( + file_encryptor.file_aad(), + ModuleType::ColumnMetaData, + row_group_index, + column_index, + None, + )?; + let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?; + + column_chunk.encrypted_column_metadata = Some(ciphertext); + debug_assert!(column_chunk.meta_data.is_none()); + } + } + + Ok(column_chunk) + } +} diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 8c812cc17dd7..a69a854df608 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -18,6 +18,8 @@ //! Configuration via [`WriterProperties`] and [`ReaderProperties`] use crate::basic::{Compression, Encoding}; use crate::compression::{CodecOptions, CodecOptionsBuilder}; +#[cfg(feature = "encryption")] +use crate::encryption::encrypt::FileEncryptionProperties; use crate::file::metadata::KeyValue; use crate::format::SortingColumn; use crate::schema::types::ColumnPath; @@ -169,6 +171,8 @@ pub struct WriterProperties { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + #[cfg(feature = "encryption")] + pub(crate) file_encryption_properties: Option, } impl Default for WriterProperties { @@ -370,6 +374,12 @@ impl WriterProperties { .and_then(|c| c.bloom_filter_properties()) .or_else(|| self.default_column_properties.bloom_filter_properties()) } + + /// Return file encryption properties + #[cfg(feature = "encryption")] + pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> { + self.file_encryption_properties.as_ref() + } } /// Builder for [`WriterProperties`] parquet writer configuration. @@ -392,6 +402,8 @@ pub struct WriterPropertiesBuilder { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + #[cfg(feature = "encryption")] + file_encryption_properties: Option, } impl WriterPropertiesBuilder { @@ -414,6 +426,8 @@ impl WriterPropertiesBuilder { column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, coerce_types: DEFAULT_COERCE_TYPES, + #[cfg(feature = "encryption")] + file_encryption_properties: None, } } @@ -436,6 +450,8 @@ impl WriterPropertiesBuilder { column_index_truncate_length: self.column_index_truncate_length, statistics_truncate_length: self.statistics_truncate_length, coerce_types: self.coerce_types, + #[cfg(feature = "encryption")] + file_encryption_properties: self.file_encryption_properties, } } @@ -808,6 +824,16 @@ impl WriterPropertiesBuilder { self.coerce_types = coerce_types; self } + + /// Sets FileEncryptionProperties. + #[cfg(feature = "encryption")] + pub fn with_file_encryption_properties( + mut self, + file_encryption_properties: FileEncryptionProperties, + ) -> Self { + self.file_encryption_properties = Some(file_encryption_properties); + self + } } /// Controls the level of statistics to be computed by the writer and stored in diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6a02055d0dda..ec2cd38c1389 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -352,7 +352,12 @@ pub(crate) fn read_encrypted_page_header( let data_decryptor = crypto_context.data_decryptor(); let aad = crypto_context.create_page_header_aad()?; - let buf = read_and_decrypt(data_decryptor, input, aad.as_ref())?; + let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| { + ParquetError::General(format!( + "Error decrypting column {}, decryptor may be wrong or missing", + crypto_context.column_ordinal + )) + })?; let mut prot = TCompactSliceInputProtocol::new(buf.as_slice()); Ok(PageHeader::read_from_in_protocol(&mut prot)?) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 6b7707f03cd9..18e357ebc2b9 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -27,15 +27,22 @@ use std::io::{BufWriter, IoSlice, Read}; use std::{io::Write, sync::Arc}; use thrift::protocol::TCompactOutputProtocol; +use crate::column::page_encryption::PageEncryptor; use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl}; use crate::column::{ page::{CompressedPage, PageWriteSpec, PageWriter}, writer::{get_column_writer, ColumnWriter}, }; use crate::data_type::DataType; +#[cfg(feature = "encryption")] +use crate::encryption::encrypt::{ + get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor, +}; use crate::errors::{ParquetError, Result}; use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; +#[cfg(feature = "encryption")] +use crate::file::PARQUET_MAGIC_ENCR_FOOTER; use crate::file::{metadata::*, PARQUET_MAGIC}; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; @@ -153,6 +160,8 @@ pub struct SerializedFileWriter { // kv_metadatas will be appended to `props` when `write_metadata` kv_metadatas: Vec, finished: bool, + #[cfg(feature = "encryption")] + file_encryptor: Option>, } impl Debug for SerializedFileWriter { @@ -171,11 +180,17 @@ impl SerializedFileWriter { /// Creates new file writer. pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result { let mut buf = TrackedWrite::new(buf); - Self::start_file(&mut buf)?; + + let schema_descriptor = SchemaDescriptor::new(schema.clone()); + + #[cfg(feature = "encryption")] + let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?; + + Self::start_file(&properties, &mut buf)?; Ok(Self { buf, - schema: schema.clone(), - descr: Arc::new(SchemaDescriptor::new(schema)), + schema, + descr: Arc::new(schema_descriptor), props: properties, row_groups: vec![], bloom_filters: vec![], @@ -184,9 +199,33 @@ impl SerializedFileWriter { row_group_index: 0, kv_metadatas: Vec::new(), finished: false, + #[cfg(feature = "encryption")] + file_encryptor, }) } + #[cfg(feature = "encryption")] + fn get_file_encryptor( + properties: &WriterPropertiesPtr, + schema_descriptor: &SchemaDescriptor, + ) -> Result>> { + if let Some(file_encryption_properties) = &properties.file_encryption_properties { + file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?; + + if !file_encryption_properties.encrypt_footer() { + return Err(general_err!( + "Writing encrypted files with plaintext footers is not supported yet" + )); + } + + Ok(Some(Arc::new(FileEncryptor::new( + file_encryption_properties.clone(), + )?))) + } else { + Ok(None) + } + } + /// Creates new row group from this file writer. /// In case of IO error or Thrift error, returns `Err`. /// @@ -241,6 +280,9 @@ impl SerializedFileWriter { ordinal, Some(Box::new(on_close)), ); + #[cfg(feature = "encryption")] + let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone()); + Ok(row_group_writer) } @@ -267,8 +309,18 @@ impl SerializedFileWriter { } /// Writes magic bytes at the beginning of the file. - fn start_file(buf: &mut TrackedWrite) -> Result<()> { - buf.write_all(&PARQUET_MAGIC)?; + #[cfg(not(feature = "encryption"))] + fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite) -> Result<()> { + buf.write_all(get_file_magic())?; + Ok(()) + } + + /// Writes magic bytes at the beginning of the file. + #[cfg(feature = "encryption")] + fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite) -> Result<()> { + let magic = get_file_magic(properties.file_encryption_properties.as_ref()); + + buf.write_all(magic)?; Ok(()) } @@ -301,6 +353,12 @@ impl SerializedFileWriter { Some(self.props.created_by().to_string()), self.props.writer_version().as_num(), ); + + #[cfg(feature = "encryption")] + { + encoder = encoder.with_file_encryptor(self.file_encryptor.clone()); + } + if let Some(key_value_metadata) = key_value_metadata { encoder = encoder.with_key_value_metadata(key_value_metadata) } @@ -361,6 +419,12 @@ impl SerializedFileWriter { pub fn bytes_written(&self) -> usize { self.buf.bytes_written() } + + /// Get the file encryptor used by this instance to encrypt data + #[cfg(feature = "encryption")] + pub(crate) fn file_encryptor(&self) -> Option> { + self.file_encryptor.clone() + } } /// Serialize all the bloom filters of the given row group to the given buffer, @@ -429,6 +493,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { row_group_index: i16, file_offset: i64, on_close: Option>, + #[cfg(feature = "encryption")] + file_encryptor: Option>, } impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { @@ -465,9 +531,21 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { offset_indexes: Vec::with_capacity(num_columns), total_bytes_written: 0, total_uncompressed_bytes: 0, + #[cfg(feature = "encryption")] + file_encryptor: None, } } + #[cfg(feature = "encryption")] + /// Set the file encryptor to use for encrypting row group data and metadata + pub(crate) fn with_file_encryptor( + mut self, + file_encryptor: Option>, + ) -> Self { + self.file_encryptor = file_encryptor; + self + } + /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any fn next_column_desc(&mut self) -> Option { let ret = self.descr.columns().get(self.column_index)?.clone(); @@ -523,12 +601,24 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { ) -> Result, { self.assert_previous_writer_closed()?; + + let encryptor_context = self.get_page_encryptor_context(); + Ok(match self.next_column_desc() { Some(column) => { let props = self.props.clone(); let (buf, on_close) = self.get_on_close(); - let page_writer = Box::new(SerializedPageWriter::new(buf)); - Some(factory(column, props, page_writer, Box::new(on_close))?) + + let page_writer = SerializedPageWriter::new(buf); + let page_writer = + Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?; + + Some(factory( + column, + props, + Box::new(page_writer), + Box::new(on_close), + )?) } None => None, }) @@ -605,6 +695,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { if let Some(statistics) = metadata.statistics() { builder = builder.set_statistics(statistics.clone()) } + builder = self.set_column_crypto_metadata(builder, &metadata); close.metadata = builder.build()?; if let Some(offsets) = close.offset_index.as_mut() { @@ -649,6 +740,75 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { Ok(metadata) } + /// Set the column crypto metadata for a column chunk + #[cfg(feature = "encryption")] + fn set_column_crypto_metadata( + &self, + builder: ColumnChunkMetaDataBuilder, + metadata: &ColumnChunkMetaData, + ) -> ColumnChunkMetaDataBuilder { + if let Some(file_encryptor) = self.file_encryptor.as_ref() { + builder.set_column_crypto_metadata(get_column_crypto_metadata( + file_encryptor.properties(), + &metadata.column_descr_ptr(), + )) + } else { + builder + } + } + + /// Get context required to create a [`PageEncryptor`] for a column + #[cfg(feature = "encryption")] + fn get_page_encryptor_context(&self) -> PageEncryptorContext { + PageEncryptorContext { + file_encryptor: self.file_encryptor.clone(), + row_group_index: self.row_group_index as usize, + column_index: self.column_index, + } + } + + /// Set the [`PageEncryptor`] on a page writer if a column is encrypted + #[cfg(feature = "encryption")] + fn set_page_writer_encryptor<'b>( + column: &ColumnDescPtr, + context: PageEncryptorContext, + page_writer: SerializedPageWriter<'b, W>, + ) -> Result> { + let page_encryptor = PageEncryptor::create_if_column_encrypted( + &context.file_encryptor, + context.row_group_index, + context.column_index, + &column.path().string(), + )?; + + Ok(page_writer.with_page_encryptor(page_encryptor)) + } + + /// No-op implementation of setting the column crypto metadata for a column chunk + #[cfg(not(feature = "encryption"))] + fn set_column_crypto_metadata( + &self, + builder: ColumnChunkMetaDataBuilder, + _metadata: &ColumnChunkMetaData, + ) -> ColumnChunkMetaDataBuilder { + builder + } + + #[cfg(not(feature = "encryption"))] + fn get_page_encryptor_context(&self) -> PageEncryptorContext { + PageEncryptorContext {} + } + + /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled + #[cfg(not(feature = "encryption"))] + fn set_page_writer_encryptor<'b>( + _column: &ColumnDescPtr, + _context: PageEncryptorContext, + page_writer: SerializedPageWriter<'b, W>, + ) -> Result> { + Ok(page_writer) + } + #[inline] fn assert_previous_writer_closed(&self) -> Result<()> { if self.column_index != self.column_chunks.len() { @@ -659,6 +819,17 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { } } +/// Context required to create a [`PageEncryptor`] for a column +#[cfg(feature = "encryption")] +struct PageEncryptorContext { + file_encryptor: Option>, + row_group_index: usize, + column_index: usize, +} + +#[cfg(not(feature = "encryption"))] +struct PageEncryptorContext {} + /// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`] pub struct SerializedColumnWriter<'a> { inner: ColumnWriter<'a>, @@ -699,12 +870,18 @@ impl<'a> SerializedColumnWriter<'a> { /// `SerializedPageWriter` should not be used after calling `close()`. pub struct SerializedPageWriter<'a, W: Write> { sink: &'a mut TrackedWrite, + #[cfg(feature = "encryption")] + page_encryptor: Option, } impl<'a, W: Write> SerializedPageWriter<'a, W> { /// Creates new page writer. pub fn new(sink: &'a mut TrackedWrite) -> Self { - Self { sink } + Self { + sink, + #[cfg(feature = "encryption")] + page_encryptor: None, + } } /// Serializes page header into Thrift. @@ -712,21 +889,64 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { #[inline] fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result { let start_pos = self.sink.bytes_written(); - { - let mut protocol = TCompactOutputProtocol::new(&mut self.sink); - header.write_to_out_protocol(&mut protocol)?; + match self.page_encryptor_and_sink_mut() { + Some((page_encryptor, sink)) => { + page_encryptor.encrypt_page_header(&header, sink)?; + } + None => { + let mut protocol = TCompactOutputProtocol::new(&mut self.sink); + header.write_to_out_protocol(&mut protocol)?; + } } Ok(self.sink.bytes_written() - start_pos) } } +#[cfg(feature = "encryption")] +impl<'a, W: Write> SerializedPageWriter<'a, W> { + /// Set the encryptor to use to encrypt page data + fn with_page_encryptor(mut self, page_encryptor: Option) -> Self { + self.page_encryptor = page_encryptor; + self + } + + fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> { + self.page_encryptor.as_mut() + } + + fn page_encryptor_and_sink_mut( + &mut self, + ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite)> { + self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink)) + } +} + +#[cfg(not(feature = "encryption"))] +impl<'a, W: Write> SerializedPageWriter<'a, W> { + fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> { + None + } + + fn page_encryptor_and_sink_mut( + &mut self, + ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite)> { + None + } +} + impl PageWriter for SerializedPageWriter<'_, W> { fn write_page(&mut self, page: CompressedPage) -> Result { + let page = match self.page_encryptor_mut() { + Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?, + None => page, + }; + let page_type = page.page_type(); let start_pos = self.sink.bytes_written() as u64; let page_header = page.to_thrift_header(); let header_size = self.serialize_page_header(page_header)?; + self.sink.write_all(page.data())?; let mut spec = PageWriteSpec::new(); @@ -737,6 +957,11 @@ impl PageWriter for SerializedPageWriter<'_, W> { spec.bytes_written = self.sink.bytes_written() as u64 - start_pos; spec.num_values = page.num_values(); + if let Some(page_encryptor) = self.page_encryptor_mut() { + if page.compressed_page().is_data_page() { + page_encryptor.increment_page(); + } + } Ok(spec) } @@ -746,6 +971,25 @@ impl PageWriter for SerializedPageWriter<'_, W> { } } +/// Get the magic bytes at the start and end of the file that identify this +/// as a Parquet file. +#[cfg(feature = "encryption")] +pub(crate) fn get_file_magic( + file_encryption_properties: Option<&FileEncryptionProperties>, +) -> &'static [u8; 4] { + match file_encryption_properties.as_ref() { + Some(encryption_properties) if encryption_properties.encrypt_footer() => { + &PARQUET_MAGIC_ENCR_FOOTER + } + _ => &PARQUET_MAGIC, + } +} + +#[cfg(not(feature = "encryption"))] +pub(crate) fn get_file_magic() -> &'static [u8; 4] { + &PARQUET_MAGIC +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/tests/arrow_reader/encryption.rs b/parquet/tests/arrow_reader/encryption.rs deleted file mode 100644 index 362a58772ac6..000000000000 --- a/parquet/tests/arrow_reader/encryption.rs +++ /dev/null @@ -1,259 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module contains tests for reading encrypted Parquet files with the Arrow API - -use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever}; -use arrow_array::RecordBatch; -use parquet::arrow::arrow_reader::{ - ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, -}; -use parquet::encryption::decrypt::FileDecryptionProperties; -use std::fs::File; -use std::sync::Arc; - -#[test] -fn test_non_uniform_encryption_plaintext_footer() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted"); - let file = File::open(path).unwrap(); - - // There is always a footer key even with a plaintext footer, - // but this is used for signing the footer. - let footer_key = "0123456789012345".as_bytes(); // 128bit/16 - let column_1_key = "1234567890123450".as_bytes(); - let column_2_key = "1234567890123451".as_bytes(); - - let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) - .with_column_key("double_field", column_1_key.to_vec()) - .with_column_key("float_field", column_2_key.to_vec()) - .build() - .unwrap(); - - verify_encryption_test_file_read(file, decryption_properties); -} - -#[test] -fn test_non_uniform_encryption_disabled_aad_storage() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = - format!("{test_data}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted"); - let file = File::open(path.clone()).unwrap(); - - let footer_key = "0123456789012345".as_bytes(); // 128bit/16 - let column_1_key = "1234567890123450".as_bytes(); - let column_2_key = "1234567890123451".as_bytes(); - - // Can read successfully when providing the correct AAD prefix - let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) - .with_column_key("double_field", column_1_key.to_vec()) - .with_column_key("float_field", column_2_key.to_vec()) - .with_aad_prefix("tester".as_bytes().to_vec()) - .build() - .unwrap(); - - verify_encryption_test_file_read(file, decryption_properties); - - // Using wrong AAD prefix should fail - let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) - .with_column_key("double_field", column_1_key.to_vec()) - .with_column_key("float_field", column_2_key.to_vec()) - .with_aad_prefix("wrong_aad_prefix".as_bytes().to_vec()) - .build() - .unwrap(); - - let file = File::open(path.clone()).unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); - let result = ArrowReaderMetadata::load(&file, options.clone()); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "Parquet error: Provided footer key and AAD were unable to decrypt parquet footer" - ); - - // Not providing any AAD prefix should fail as it isn't stored in the file - let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) - .with_column_key("double_field", column_1_key.to_vec()) - .with_column_key("float_field", column_2_key.to_vec()) - .build() - .unwrap(); - - let file = File::open(path).unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); - let result = ArrowReaderMetadata::load(&file, options.clone()); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "Parquet error: Provided footer key and AAD were unable to decrypt parquet footer" - ); -} - -#[test] -#[cfg(feature = "snap")] -fn test_plaintext_footer_read_without_decryption() { - crate::encryption_agnostic::read_plaintext_footer_file_without_decryption_properties(); -} - -#[test] -fn test_non_uniform_encryption() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let footer_key = "0123456789012345".as_bytes(); // 128bit/16 - let column_1_key = "1234567890123450".as_bytes(); - let column_2_key = "1234567890123451".as_bytes(); - - let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) - .with_column_key("double_field", column_1_key.to_vec()) - .with_column_key("float_field", column_2_key.to_vec()) - .build() - .unwrap(); - - verify_encryption_test_file_read(file, decryption_properties); -} - -#[test] -fn test_uniform_encryption() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/uniform_encryption.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let key_code: &[u8] = "0123456789012345".as_bytes(); - let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec()) - .build() - .unwrap(); - - verify_encryption_test_file_read(file, decryption_properties); -} - -#[test] -fn test_decrypting_without_decryption_properties_fails() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/uniform_encryption.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let options = ArrowReaderOptions::default(); - let result = ArrowReaderMetadata::load(&file, options.clone()); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" - ); -} - -#[test] -fn test_aes_ctr_encryption() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/encrypt_columns_and_footer_ctr.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let footer_key = "0123456789012345".as_bytes(); - let column_1_key = "1234567890123450".as_bytes(); - let column_2_key = "1234567890123451".as_bytes(); - - let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) - .with_column_key("double_field", column_1_key.to_vec()) - .with_column_key("float_field", column_2_key.to_vec()) - .build() - .unwrap(); - - let options = - ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); - let metadata = ArrowReaderMetadata::load(&file, options); - - match metadata { - Err(parquet::errors::ParquetError::NYI(s)) => { - assert!(s.contains("AES_GCM_CTR_V1")); - } - _ => { - panic!("Expected ParquetError::NYI"); - } - }; -} - -#[test] -fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let key_retriever = TestKeyRetriever::new() - .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec()) - .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec()) - .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec()); - - let decryption_properties = - FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever)) - .build() - .unwrap(); - - verify_encryption_test_file_read(file, decryption_properties); -} - -#[test] -fn test_non_uniform_encryption_with_key_retriever() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let key_retriever = TestKeyRetriever::new() - .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec()) - .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec()) - .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec()); - - let decryption_properties = - FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever)) - .build() - .unwrap(); - - verify_encryption_test_file_read(file, decryption_properties); -} - -#[test] -fn test_uniform_encryption_with_key_retriever() { - let test_data = arrow::util::test_util::parquet_test_data(); - let path = format!("{test_data}/uniform_encryption.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let key_retriever = - TestKeyRetriever::new().with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec()); - - let decryption_properties = - FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever)) - .build() - .unwrap(); - - verify_encryption_test_file_read(file, decryption_properties); -} - -fn verify_encryption_test_file_read(file: File, decryption_properties: FileDecryptionProperties) { - let options = - ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); - let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); - let metadata = reader_metadata.metadata(); - - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); - let record_reader = builder.build().unwrap(); - let record_batches = record_reader - .map(|x| x.unwrap()) - .collect::>(); - - verify_encryption_test_data(record_batches, metadata); -} diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 93b63921d333..0e6783583cd5 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -38,15 +38,6 @@ use tempfile::NamedTempFile; mod bad_data; #[cfg(feature = "crc")] mod checksum; -#[cfg(feature = "encryption")] -mod encryption; -mod encryption_agnostic; -#[cfg(all(feature = "encryption", feature = "async"))] -mod encryption_async; -#[cfg(not(feature = "encryption"))] -mod encryption_disabled; -#[cfg(feature = "encryption")] -mod encryption_util; mod statistics; // returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs new file mode 100644 index 000000000000..86a148be2bd8 --- /dev/null +++ b/parquet/tests/encryption/encryption.rs @@ -0,0 +1,767 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains tests for reading encrypted Parquet files with the Arrow API + +use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever}; +use arrow::array::*; +use arrow::error::Result as ArrowResult; +use arrow_array::{Int32Array, RecordBatch}; +use arrow_schema::{DataType as ArrowDataType, DataType, Field, Schema}; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use parquet::arrow::ArrowWriter; +use parquet::data_type::{ByteArray, ByteArrayType}; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; +use parquet::file::writer::SerializedFileWriter; +use parquet::schema::parser::parse_message_type; +use std::fs::File; +use std::sync::Arc; + +#[test] +fn test_non_uniform_encryption_plaintext_footer() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + // There is always a footer key even with a plaintext footer, + // but this is used for signing the footer. + let footer_key = "0123456789012345".as_bytes(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key.to_vec()) + .with_column_key("float_field", column_2_key.to_vec()) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); +} + +#[test] +fn test_non_uniform_encryption_disabled_aad_storage() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = + format!("{test_data}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted"); + let file = File::open(path.clone()).unwrap(); + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_1_key = b"1234567890123450".to_vec(); + let column_2_key = b"1234567890123451".to_vec(); + + // Can read successfully when providing the correct AAD prefix + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_key("double_field", column_1_key.clone()) + .with_column_key("float_field", column_2_key.clone()) + .with_aad_prefix(b"tester".to_vec()) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); + + // Using wrong AAD prefix should fail + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_key("double_field", column_1_key.clone()) + .with_column_key("float_field", column_2_key.clone()) + .with_aad_prefix(b"wrong_aad_prefix".to_vec()) + .build() + .unwrap(); + + let file = File::open(path.clone()).unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let result = ArrowReaderMetadata::load(&file, options.clone()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Provided footer key and AAD were unable to decrypt parquet footer" + ); + + // Not providing any AAD prefix should fail as it isn't stored in the file + let decryption_properties = FileDecryptionProperties::builder(footer_key) + .with_column_key("double_field", column_1_key) + .with_column_key("float_field", column_2_key) + .build() + .unwrap(); + + let file = File::open(path).unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let result = ArrowReaderMetadata::load(&file, options.clone()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file was encrypted with an AAD prefix that is not stored in the file, \ + but no AAD prefix was provided in the file decryption properties" + ); +} + +#[test] +#[cfg(feature = "snap")] +fn test_plaintext_footer_read_without_decryption() { + crate::encryption_agnostic::read_plaintext_footer_file_without_decryption_properties(); +} + +#[test] +fn test_non_uniform_encryption() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_1_key = b"1234567890123450".to_vec(); + let column_2_key = b"1234567890123451".to_vec(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key) + .with_column_key("double_field", column_1_key) + .with_column_key("float_field", column_2_key) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); +} + +#[test] +fn test_uniform_encryption() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let key_code = b"0123456789012345".to_vec(); + let decryption_properties = FileDecryptionProperties::builder(key_code).build().unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); +} + +#[test] +fn test_decrypting_without_decryption_properties_fails() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let options = ArrowReaderOptions::default(); + let result = ArrowReaderMetadata::load(&file, options.clone()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" + ); +} + +#[test] +fn test_aes_ctr_encryption() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/encrypt_columns_and_footer_ctr.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let footer_key = b"0123456789012345".to_vec(); + let column_1_key = b"1234567890123450".to_vec(); + let column_2_key = b"1234567890123451".to_vec(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key) + .with_column_key("double_field", column_1_key) + .with_column_key("float_field", column_2_key) + .build() + .unwrap(); + + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); + let metadata = ArrowReaderMetadata::load(&file, options); + + match metadata { + Err(parquet::errors::ParquetError::NYI(s)) => { + assert!(s.contains("AES_GCM_CTR_V1")); + } + _ => { + panic!("Expected ParquetError::NYI"); + } + }; +} + +#[test] +fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let key_retriever = TestKeyRetriever::new() + .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec()) + .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec()) + .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec()); + + let decryption_properties = + FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever)) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); +} + +#[test] +fn test_non_uniform_encryption_with_key_retriever() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let key_retriever = TestKeyRetriever::new() + .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec()) + .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec()) + .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec()); + + let decryption_properties = + FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever)) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); +} + +#[test] +fn test_uniform_encryption_with_key_retriever() { + let test_data = arrow::util::test_util::parquet_test_data(); + let path = format!("{test_data}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let key_retriever = + TestKeyRetriever::new().with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec()); + + let decryption_properties = + FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever)) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); +} + +fn verify_encryption_test_file_read(file: File, decryption_properties: FileDecryptionProperties) { + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); + let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let metadata = reader_metadata.metadata(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let record_reader = builder.build().unwrap(); + let record_batches = record_reader + .map(|x| x.unwrap()) + .collect::>(); + + verify_encryption_test_data(record_batches, metadata); +} + +fn row_group_sizes(metadata: &ParquetMetaData) -> Vec { + metadata.row_groups().iter().map(|x| x.num_rows()).collect() +} + +#[test] +fn test_uniform_encryption_roundtrip() { + let x0_arrays = [ + Int32Array::from((0..100).collect::>()), + Int32Array::from((100..150).collect::>()), + ]; + let x1_arrays = [ + Int32Array::from((100..200).collect::>()), + Int32Array::from((200..250).collect::>()), + ]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("x0", ArrowDataType::Int32, false), + Field::new("x1", ArrowDataType::Int32, false), + ])); + + let file = tempfile::tempfile().unwrap(); + + let footer_key = b"0123456789012345"; + let file_encryption_properties = FileEncryptionProperties::builder(footer_key.to_vec()) + .build() + .unwrap(); + + let props = WriterProperties::builder() + // Ensure multiple row groups + .set_max_row_group_size(50) + // Ensure multiple pages per row group + .set_write_batch_size(20) + .set_data_page_row_count_limit(20) + .with_file_encryption_properties(file_encryption_properties) + .build(); + + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + + for (x0, x1) in x0_arrays.into_iter().zip(x1_arrays.into_iter()) { + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(x0), Arc::new(x1)]).unwrap(); + writer.write(&batch).unwrap(); + } + + writer.close().unwrap(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .build() + .unwrap(); + + let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + assert_eq!(&row_group_sizes(builder.metadata()), &[50, 50, 50]); + + let batches = builder + .with_batch_size(100) + .build() + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(batches.len(), 2); + assert!(batches.iter().all(|x| x.num_columns() == 2)); + + let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect(); + + assert_eq!(&batch_sizes, &[100, 50]); + + let x0_values: Vec<_> = batches + .iter() + .flat_map(|x| { + x.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .cloned() + }) + .collect(); + + let x1_values: Vec<_> = batches + .iter() + .flat_map(|x| { + x.column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .cloned() + }) + .collect(); + + let expected_x0_values: Vec<_> = [0..100, 100..150].into_iter().flatten().collect(); + assert_eq!(&x0_values, &expected_x0_values); + + let expected_x1_values: Vec<_> = [100..200, 200..250].into_iter().flatten().collect(); + assert_eq!(&x1_values, &expected_x1_values); +} + +#[test] +fn test_write_non_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_names = vec!["double_field", "float_field"]; + let column_keys = vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()]; + + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_keys(column_names.clone(), column_keys.clone()) + .unwrap() + .build() + .unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(footer_key) + .with_column_keys(column_names, column_keys) + .unwrap() + .build() + .unwrap(); + + read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties); +} + +// todo: currently we raise if writing with plaintext footer, but we should support it +// for uniform and non-uniform encryption (see https://github.com/apache/arrow-rs/issues/7320) +#[test] +fn test_write_uniform_encryption_plaintext_footer() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_1_key = b"1234567890123450".to_vec(); + let column_2_key = b"1234567890123451".to_vec(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_key("double_field", column_1_key) + .with_column_key("float_field", column_2_key) + .build() + .unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(footer_key) + .with_plaintext_footer(true) + .build() + .unwrap(); + + let file = File::open(path).unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + + let props = WriterProperties::builder() + .with_file_encryption_properties(file_encryption_properties) + .build(); + let temp_file = tempfile::tempfile().unwrap(); + + let writer = ArrowWriter::try_new( + temp_file.try_clone().unwrap(), + metadata.schema().clone(), + Some(props), + ); + assert!(writer.is_err()); + assert_eq!( + writer.unwrap_err().to_string(), + "Parquet error: Writing encrypted files with plaintext footers is not supported yet" + ) +} + +#[test] +fn test_write_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .build() + .unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(footer_key) + .build() + .unwrap(); + + read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties); +} + +#[test] +fn test_write_non_uniform_encryption_column_missmatch() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_1_key = b"1234567890123450".to_vec(); + let column_2_key = b"1234567890123451".to_vec(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_key("double_field", column_1_key.clone()) + .with_column_key("float_field", column_2_key.clone()) + .build() + .unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(footer_key) + .with_column_key("double_field", column_1_key.clone()) + .with_column_key("other_field", column_1_key) + .with_column_key("yet_another_field", column_2_key) + .build() + .unwrap(); + + let temp_file = tempfile::tempfile().unwrap(); + + // read example data + let file = File::open(path).unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let props = WriterProperties::builder() + .with_file_encryption_properties(file_encryption_properties) + .build(); + + let result = ArrowWriter::try_new( + temp_file.try_clone().unwrap(), + metadata.schema().clone(), + Some(props), + ); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: The following columns with encryption keys specified were not found in the schema: other_field, yet_another_field" + ); +} + +#[test] +fn test_write_encrypted_column() { + let message_type = " + message test_schema { + OPTIONAL BYTE_ARRAY a (UTF8); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let data = vec![ByteArray::from(b"parquet".to_vec()); 7]; + let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1]; + + let num_row_groups = 3; + let num_batches = 3; + let rows_per_batch = def_levels.len(); + let valid_rows_per_batch = def_levels.iter().filter(|&level| *level > 0).count(); + + let file: File = tempfile::tempfile().unwrap(); + + let builder = WriterProperties::builder(); + let footer_key: &[u8] = "0123456789012345".as_bytes(); + let file_encryption_properties = FileEncryptionProperties::builder(footer_key.to_vec()) + .build() + .unwrap(); + + let props = Arc::new( + builder + .with_file_encryption_properties(file_encryption_properties) + .set_data_page_row_count_limit(rows_per_batch) + .build(), + ); + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + for _ in 0..num_row_groups { + let mut row_group_writer = writer.next_row_group().unwrap(); + let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); + + for _ in 0..num_batches { + col_writer + .typed::() + .write_batch(&data, Some(&def_levels), None) + .unwrap(); + } + + col_writer.close().unwrap(); + row_group_writer.close().unwrap(); + } + + let _file_metadata = writer.close().unwrap(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .build() + .unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let file_metadata = metadata.metadata().file_metadata(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let record_reader = builder.build().unwrap(); + + assert_eq!( + file_metadata.num_rows(), + (num_row_groups * num_batches * rows_per_batch) as i64 + ); + assert_eq!(file_metadata.schema_descr().num_columns(), 1); + + assert_eq!(metadata.metadata().num_row_groups(), num_row_groups); + metadata.metadata().row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 1); + assert_eq!(rg.num_rows(), (num_batches * rows_per_batch) as i64); + }); + + let mut row_count = 0; + for batch in record_reader { + let batch = batch.unwrap(); + row_count += batch.num_rows(); + + let string_col = batch.column(0).as_string_opt::().unwrap(); + + let mut valid_count = 0; + for x in string_col.iter().flatten() { + valid_count += 1; + assert_eq!(x, "parquet"); + } + assert_eq!( + valid_count, + valid_rows_per_batch * num_batches * num_row_groups + ); + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); +} + +#[test] +fn test_write_encrypted_struct_field() { + let int_32: Int32Array = [Some(1), Some(6)].iter().collect(); + let float_64: Float64Array = [None, Some(8.5)].iter().collect(); + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("int64_col", DataType::Int32, true)), + Arc::new(int_32) as ArrayRef, + ), + ( + Arc::new(Field::new("float64_col", DataType::Float64, true)), + Arc::new(float_64) as ArrayRef, + ), + ]); + let struct_array_data = Arc::new(struct_array); + + let schema = Arc::new(Schema::new(vec![Field::new( + "struct_col", + struct_array_data.data_type().clone(), + true, + )])); + let record_batches = + vec![RecordBatch::try_new(schema.clone(), vec![struct_array_data]).unwrap()]; + + let temp_file = tempfile::tempfile().unwrap(); + + // When configuring encryption keys for struct columns, + // keys need to be specified for each leaf-level Parquet column using the full "." separated + // column path. + let builder = WriterProperties::builder(); + let footer_key = b"0123456789012345".to_vec(); + let column_key_1 = b"1234567890123450".to_vec(); + let column_key_2 = b"1234567890123451".to_vec(); + let file_encryption_properties = FileEncryptionProperties::builder(footer_key.clone()) + .with_column_key("struct_col.int64_col", column_key_1.clone()) + .with_column_key("struct_col.float64_col", column_key_2.clone()) + .build() + .unwrap(); + + let props = builder + .with_file_encryption_properties(file_encryption_properties) + .build(); + let mut writer = + ArrowWriter::try_new(temp_file.try_clone().unwrap(), schema, Some(props)).unwrap(); + for batch in record_batches.clone() { + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key) + .with_column_key("struct_col.int64_col", column_key_1) + .with_column_key("struct_col.float64_col", column_key_2) + .build() + .unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(temp_file, options).unwrap(); + let record_reader = builder.build().unwrap(); + + let read_record_reader = record_reader + .map(|x| x.unwrap()) + .collect::>(); + + // show read batches are equal to written batches + assert_eq!(read_record_reader.len(), record_batches.len()); + for (read_batch, written_batch) in read_record_reader.iter().zip(record_batches.iter()) { + assert_eq!(read_batch.num_columns(), written_batch.num_columns()); + assert_eq!(read_batch.num_rows(), written_batch.num_rows()); + for (read_column, written_column) in read_batch + .columns() + .iter() + .zip(written_batch.columns().iter()) + { + assert_eq!(read_column, written_column); + } + } +} + +/// Test that when per-column encryption is used, +/// unencrypted row group metadata are returned when the writer is closed +/// and statistics can be used. +#[test] +pub fn test_retrieve_row_group_statistics_after_encrypted_write() { + let values = Int32Array::from(vec![8, 3, 4, 19, 5]); + + let schema = Arc::new(Schema::new(vec![Field::new( + "x", + values.data_type().clone(), + true, + )])); + let values = Arc::new(values); + let record_batches = vec![RecordBatch::try_new(schema.clone(), vec![values]).unwrap()]; + + let temp_file = tempfile::tempfile().unwrap(); + + let footer_key = b"0123456789012345".to_vec(); + let column_key = b"1234567890123450".to_vec(); + let file_encryption_properties = FileEncryptionProperties::builder(footer_key.clone()) + .with_column_key("x", column_key.clone()) + .build() + .unwrap(); + + let props = WriterProperties::builder() + .with_file_encryption_properties(file_encryption_properties) + .build(); + let mut writer = ArrowWriter::try_new(temp_file, schema, Some(props)).unwrap(); + + for batch in record_batches.clone() { + writer.write(&batch).unwrap(); + } + let file_metadata = writer.close().unwrap(); + + assert_eq!(file_metadata.row_groups.len(), 1); + let row_group = &file_metadata.row_groups[0]; + assert_eq!(row_group.columns.len(), 1); + let column = &row_group.columns[0]; + let column_stats = column + .meta_data + .as_ref() + .unwrap() + .statistics + .as_ref() + .unwrap(); + assert_eq!( + column_stats.min_value.as_deref(), + Some(3i32.to_le_bytes().as_slice()) + ); + assert_eq!( + column_stats.max_value.as_deref(), + Some(19i32.to_le_bytes().as_slice()) + ); +} + +fn read_and_roundtrip_to_encrypted_file( + path: &str, + decryption_properties: FileDecryptionProperties, + encryption_properties: FileEncryptionProperties, +) { + let temp_file = tempfile::tempfile().unwrap(); + + // read example data + let file = File::open(path).unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let batch_reader = builder.build().unwrap(); + let batches = batch_reader + .collect::, _>>() + .unwrap(); + + // write example data + let props = WriterProperties::builder() + .with_file_encryption_properties(encryption_properties) + .build(); + + let mut writer = ArrowWriter::try_new( + temp_file.try_clone().unwrap(), + metadata.schema().clone(), + Some(props), + ) + .unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + } + + writer.close().unwrap(); + + // check re-written example data + verify_encryption_test_file_read(temp_file, decryption_properties); +} diff --git a/parquet/tests/arrow_reader/encryption_agnostic.rs b/parquet/tests/encryption/encryption_agnostic.rs similarity index 100% rename from parquet/tests/arrow_reader/encryption_agnostic.rs rename to parquet/tests/encryption/encryption_agnostic.rs diff --git a/parquet/tests/arrow_reader/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs similarity index 82% rename from parquet/tests/arrow_reader/encryption_async.rs rename to parquet/tests/encryption/encryption_async.rs index 60eb97363dae..11448207c6fc 100644 --- a/parquet/tests/arrow_reader/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -20,9 +20,13 @@ use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever}; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use parquet::arrow::arrow_writer::ArrowWriterOptions; +use parquet::arrow::AsyncArrowWriter; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; +use parquet::file::properties::WriterProperties; use std::sync::Arc; use tokio::fs::File; @@ -116,7 +120,7 @@ async fn test_misspecified_encryption_keys() { // Missing column key check_for_error( - "Parquet error: No column decryption key set for column 'double_field'", + "Parquet error: No column decryption key set for encrypted column 'double_field'", &path, footer_key, "".as_bytes(), @@ -243,6 +247,36 @@ async fn test_decrypting_without_decryption_properties_fails() { ); } +#[tokio::test] +async fn test_write_non_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_names = vec!["double_field", "float_field"]; + let column_keys = vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()]; + + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_keys(column_names.clone(), column_keys.clone()) + .unwrap() + .build() + .unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(footer_key) + .with_column_keys(column_names, column_keys) + .unwrap() + .build() + .unwrap(); + + read_and_roundtrip_to_encrypted_file_async( + &path, + decryption_properties, + file_encryption_properties, + ) + .await + .unwrap(); +} + #[cfg(feature = "object_store")] async fn get_encrypted_meta_store() -> ( object_store::ObjectMeta, @@ -367,3 +401,39 @@ async fn verify_encryption_test_file_read_async( verify_encryption_test_data(record_batches, metadata); Ok(()) } + +async fn read_and_roundtrip_to_encrypted_file_async( + path: &str, + decryption_properties: FileDecryptionProperties, + encryption_properties: FileEncryptionProperties, +) -> Result<(), ParquetError> { + let temp_file = tempfile::tempfile().unwrap(); + let mut file = File::open(&path).await.unwrap(); + + let options = + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties.clone()); + let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?; + let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( + file.try_clone().await?, + arrow_metadata.clone(), + ) + .build()?; + let record_batches = record_reader.try_collect::>().await?; + + let props = WriterProperties::builder() + .with_file_encryption_properties(encryption_properties) + .build(); + let options = ArrowWriterOptions::new().with_properties(props); + + let file = tokio::fs::File::from_std(temp_file.try_clone().unwrap()); + let mut writer = + AsyncArrowWriter::try_new_with_options(file, arrow_metadata.schema().clone(), options) + .unwrap(); + for batch in record_batches { + writer.write(&batch).await.unwrap(); + } + writer.close().await.unwrap(); + + let mut file = tokio::fs::File::from_std(temp_file.try_clone().unwrap()); + verify_encryption_test_file_read_async(&mut file, decryption_properties).await +} diff --git a/parquet/tests/arrow_reader/encryption_disabled.rs b/parquet/tests/encryption/encryption_disabled.rs similarity index 100% rename from parquet/tests/arrow_reader/encryption_disabled.rs rename to parquet/tests/encryption/encryption_disabled.rs diff --git a/parquet/tests/arrow_reader/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs similarity index 100% rename from parquet/tests/arrow_reader/encryption_util.rs rename to parquet/tests/encryption/encryption_util.rs diff --git a/parquet/tests/encryption/mod.rs b/parquet/tests/encryption/mod.rs new file mode 100644 index 000000000000..7ade1f573233 --- /dev/null +++ b/parquet/tests/encryption/mod.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(feature = "encryption")] +mod encryption; +mod encryption_agnostic; +#[cfg(all(feature = "encryption", feature = "async"))] +mod encryption_async; +#[cfg(not(feature = "encryption"))] +mod encryption_disabled; +#[cfg(feature = "encryption")] +mod encryption_util;