diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 05557069aa7d..1b23b78b53f7 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -110,6 +110,8 @@ test_common = ["arrow/test_utils"] experimental = [] # Enable async APIs async = ["futures", "tokio"] +# Enable `!Send` with async APIs +async-no-send = ["async"] # Enable object_store integration object_store = ["dep:object_store", "async"] # Group Zstd dependencies diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs index 33427a37b59a..554efe6096f2 100644 --- a/parquet/benches/arrow_reader_row_filter.rs +++ b/parquet/benches/arrow_reader_row_filter.rs @@ -62,7 +62,6 @@ use arrow_array::StringViewArray; use arrow_cast::pretty::pretty_format_batches; use bytes::Bytes; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use futures::future::BoxFuture; use futures::{FutureExt, StreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, @@ -76,6 +75,11 @@ use rand::{rngs::StdRng, Rng, SeedableRng}; use std::ops::Range; use std::sync::Arc; +#[cfg(feature = "async-no-send")] +type MaybeLocalBoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>; +#[cfg(not(feature = "async-no-send"))] +type MaybeLocalBoxFuture<'a, T> = futures::future::BoxFuture<'a, T>; + /// Generates a random string. Has a 50% chance to generate a short string (3–11 characters) /// or a long string (13–20 characters). fn random_string(rng: &mut StdRng) -> String { @@ -571,7 +575,10 @@ impl InMemoryReader { } impl AsyncFileReader for InMemoryReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + fn get_bytes( + &mut self, + range: Range, + ) -> MaybeLocalBoxFuture<'_, parquet::errors::Result> { let data = self.inner.slice(range.start as usize..range.end as usize); async move { Ok(data) }.boxed() } @@ -579,7 +586,7 @@ impl AsyncFileReader for InMemoryReader { fn get_metadata<'a>( &'a mut self, _options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, parquet::errors::Result>> { + ) -> MaybeLocalBoxFuture<'a, parquet::errors::Result>> { let metadata = Arc::clone(&self.metadata); async move { Ok(metadata) }.boxed() } diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 0ab6a621fca0..4e0010ebe637 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -17,8 +17,8 @@ use crate::arrow::async_reader::AsyncFileReader; use crate::errors::Result; +use crate::util::async_util::MaybeLocalBoxFuture; use bytes::Bytes; -use futures::future::BoxFuture; use std::ops::Range; /// A data source that can be used with [`ParquetMetaDataReader`] to load [`ParquetMetaData`] @@ -33,9 +33,12 @@ use std::ops::Range; /// # use bytes::Bytes; /// # use std::ops::Range; /// # use std::io::SeekFrom; -/// # use futures::future::BoxFuture; /// # use futures::FutureExt; /// # use tokio::io::{AsyncReadExt, AsyncSeekExt}; +/// # #[cfg(feature = "async-no-send")] +/// # type BoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>; +/// # #[cfg(not(feature = "async-no-send"))] +/// # type BoxFuture<'a, T> = futures::future::BoxFuture<'a, T>; /// // Adapter that implements the API for reading bytes from an async source (in /// // this case a tokio::fs::File) /// struct TokioFileMetadata { @@ -52,7 +55,7 @@ use std::ops::Range; /// self.file.read_exact(&mut buf).await?; /// Ok(Bytes::from(buf)) // convert to Bytes /// } -/// .boxed() // turn into BoxedFuture, using FutureExt::boxed +/// .boxed() // turn into BoxedFuture, using FutureExt::boxed /// } /// } ///``` @@ -65,12 +68,12 @@ pub trait MetadataFetch { /// Note the returned type is a boxed future, often created by /// [`FutureExt::boxed`]. See the trait documentation for an example /// - /// [`FutureExt::boxed`]: futures::FutureExt::boxed - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + /// [`FutureExt::boxed`]: futures::future::FutureExt::boxed + fn fetch(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result>; } impl MetadataFetch for &mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result> { self.get_bytes(range) } } @@ -86,6 +89,6 @@ pub trait MetadataSuffixFetch: MetadataFetch { /// Note the returned type is a boxed future, often created by /// [`FutureExt::boxed`]. See the trait documentation for an example /// - /// [`FutureExt::boxed`]: futures::FutureExt::boxed - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result>; + /// [`FutureExt::boxed`]: futures::future::FutureExt::boxed + fn fetch_suffix(&mut self, suffix: usize) -> MaybeLocalBoxFuture<'_, Result>; } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..25af5e884c71 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -30,7 +30,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; -use futures::future::{BoxFuture, FutureExt}; +use futures::future::FutureExt; use futures::ready; use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -54,6 +54,7 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; +use crate::util::async_util::{MaybeLocalBoxFuture, MaybeLocalFutureExt, MaybeSend}; mod metadata; pub use metadata::*; @@ -79,12 +80,15 @@ pub use store::*; /// [`ObjectStore`]: object_store::ObjectStore /// /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html -pub trait AsyncFileReader: Send { +pub trait AsyncFileReader: MaybeSend { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> MaybeLocalBoxFuture<'_, Result>> { async move { let mut result = Vec::with_capacity(ranges.len()); @@ -95,7 +99,7 @@ pub trait AsyncFileReader: Send { Ok(result) } - .boxed() + .boxed_maybe_local() } /// Return a future which results in the [`ParquetMetaData`] for this Parquet file. @@ -117,41 +121,44 @@ pub trait AsyncFileReader: Send { fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>>; + ) -> MaybeLocalBoxFuture<'a, Result>>; } /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result> { self.as_mut().get_bytes(range) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> MaybeLocalBoxFuture<'_, Result>> { self.as_mut().get_byte_ranges(ranges) } fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> MaybeLocalBoxFuture<'a, Result>> { self.as_mut().get_metadata(options) } } impl MetadataSuffixFetch for T { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { + fn fetch_suffix(&mut self, suffix: usize) -> MaybeLocalBoxFuture<'_, Result> { async move { self.seek(SeekFrom::End(-(suffix as i64))).await?; let mut buf = Vec::with_capacity(suffix); self.take(suffix as _).read_to_end(&mut buf).await?; Ok(buf.into()) } - .boxed() + .boxed_maybe_local() } } -impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { +impl AsyncFileReader for T { + fn get_bytes(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result> { async move { self.seek(SeekFrom::Start(range.start)).await?; @@ -164,13 +171,13 @@ impl AsyncFileReader for T { Ok(buffer.into()) } - .boxed() + .boxed_maybe_local() } fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> MaybeLocalBoxFuture<'a, Result>> { async move { let metadata_reader = ParquetMetaDataReader::new() .with_page_indexes(options.is_some_and(|o| o.page_index)); @@ -183,7 +190,7 @@ impl AsyncFileReader for T { let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?; Ok(Arc::new(parquet_metadata)) } - .boxed() + .boxed_maybe_local() } } @@ -221,7 +228,7 @@ pub struct AsyncReader(T); /// See [`ArrowReaderBuilder`] for additional member functions pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder>; -impl ParquetRecordBatchStreamBuilder { +impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the /// specified source. /// @@ -564,7 +571,7 @@ struct ReaderFactory { impl ReaderFactory where - T: AsyncFileReader + Send, + T: AsyncFileReader + MaybeSend, { /// Reads the next row group with the provided `selection`, `projection` and `batch_size` /// @@ -676,7 +683,7 @@ enum StreamState { /// Decoding a batch Decoding(ParquetRecordBatchReader), /// Reading data from input - Reading(BoxFuture<'static, ReadResult>), + Reading(MaybeLocalBoxFuture<'static, ReadResult>), /// Error Error, } @@ -752,7 +759,7 @@ impl ParquetRecordBatchStream { impl ParquetRecordBatchStream where - T: AsyncFileReader + Unpin + Send + 'static, + T: AsyncFileReader + Unpin + MaybeSend + 'static, { /// Fetches the next row group from the stream. /// @@ -815,7 +822,7 @@ where impl Stream for ParquetRecordBatchStream where - T: AsyncFileReader + Unpin + Send + 'static, + T: AsyncFileReader + Unpin + MaybeSend + 'static, { type Item = Result; @@ -851,7 +858,7 @@ where self.projection.clone(), self.batch_size, ) - .boxed(); + .boxed_maybe_local(); self.state = StreamState::Reading(fut) } @@ -892,7 +899,7 @@ impl InMemoryRowGroup<'_> { /// /// If `selection` is provided, only the pages required for the selection /// are fetched. Otherwise, all pages are fetched. - async fn fetch( + async fn fetch( &mut self, input: &mut T, projection: &ProjectionMask, @@ -1148,7 +1155,7 @@ mod tests { } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result> { let range = range.clone(); self.requests .lock() @@ -1163,7 +1170,7 @@ mod tests { fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> MaybeLocalBoxFuture<'a, Result>> { let metadata_reader = ParquetMetaDataReader::new() .with_page_indexes(options.is_some_and(|o| o.page_index)); self.metadata = Some(Arc::new( diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 51dc368bc9ea..4a6622c5055b 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -21,6 +21,7 @@ use crate::arrow::arrow_reader::ArrowReaderOptions; use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::util::async_util::MaybeLocalBoxFuture; use bytes::Bytes; use futures::{future::BoxFuture, FutureExt, TryFutureExt}; use object_store::{path::Path, ObjectStore}; @@ -161,7 +162,7 @@ impl ParquetObjectReader { } impl MetadataSuffixFetch for &mut ParquetObjectReader { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { + fn fetch_suffix(&mut self, suffix: usize) -> MaybeLocalBoxFuture<'_, Result> { let options = GetOptions { range: Some(GetRange::Suffix(suffix as u64)), ..Default::default() @@ -177,14 +178,14 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader { } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result> { self.spawn(|store, path| store.get_range(path, range)) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> - where - Self: Send, - { + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> MaybeLocalBoxFuture<'_, Result>> { self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) } @@ -197,7 +198,7 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata<'a>( &'a mut self, options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, Result>> { + ) -> MaybeLocalBoxFuture<'a, Result>> { Box::pin(async move { let mut metadata = ParquetMetaDataReader::new() .with_column_indexes(self.preload_column_index) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index faec427907a7..90a58f327de7 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -61,62 +61,60 @@ mod store; pub use store::*; use crate::{ - arrow::arrow_writer::ArrowWriterOptions, - arrow::ArrowWriter, + arrow::{arrow_writer::ArrowWriterOptions, ArrowWriter}, errors::{ParquetError, Result}, file::{metadata::RowGroupMetaData, properties::WriterProperties}, format::{FileMetaData, KeyValue}, + util::async_util::{MaybeLocalBoxFuture, MaybeLocalFutureExt, MaybeSend}, }; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use bytes::Bytes; -use futures::future::BoxFuture; -use futures::FutureExt; use std::mem; use tokio::io::{AsyncWrite, AsyncWriteExt}; /// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files. -pub trait AsyncFileWriter: Send { +pub trait AsyncFileWriter: MaybeSend { /// Write the provided bytes to the underlying writer /// /// The underlying writer CAN decide to buffer the data or write it immediately. /// This design allows the writer implementer to control the buffering and I/O scheduling. /// /// The underlying writer MAY implement retry logic to prevent breaking users write process. - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>; + fn write(&mut self, bs: Bytes) -> MaybeLocalBoxFuture<'_, Result<()>>; /// Flush any buffered data to the underlying writer and finish writing process. /// /// After `complete` returns `Ok(())`, caller SHOULD not call write again. - fn complete(&mut self) -> BoxFuture<'_, Result<()>>; + fn complete(&mut self) -> MaybeLocalBoxFuture<'_, Result<()>>; } impl AsyncFileWriter for Box { - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + fn write(&mut self, bs: Bytes) -> MaybeLocalBoxFuture<'_, Result<()>> { self.as_mut().write(bs) } - fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + fn complete(&mut self) -> MaybeLocalBoxFuture<'_, Result<()>> { self.as_mut().complete() } } -impl AsyncFileWriter for T { - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { +impl AsyncFileWriter for T { + fn write(&mut self, bs: Bytes) -> MaybeLocalBoxFuture<'_, Result<()>> { async move { self.write_all(&bs).await?; Ok(()) } - .boxed() + .boxed_maybe_local() } - fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + fn complete(&mut self) -> MaybeLocalBoxFuture<'_, Result<()>> { async move { self.flush().await?; self.shutdown().await?; Ok(()) } - .boxed() + .boxed_maybe_local() } } diff --git a/parquet/src/arrow/async_writer/store.rs b/parquet/src/arrow/async_writer/store.rs index ad09eae4996f..cd023cc84b3b 100644 --- a/parquet/src/arrow/async_writer/store.rs +++ b/parquet/src/arrow/async_writer/store.rs @@ -16,11 +16,11 @@ // under the License. use bytes::Bytes; -use futures::future::BoxFuture; use std::sync::Arc; use crate::arrow::async_writer::AsyncFileWriter; use crate::errors::{ParquetError, Result}; +use crate::util::async_util::MaybeLocalBoxFuture; use object_store::buffered::BufWriter; use object_store::path::Path; use object_store::ObjectStore; @@ -93,7 +93,7 @@ impl ParquetObjectWriter { } impl AsyncFileWriter for ParquetObjectWriter { - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + fn write(&mut self, bs: Bytes) -> MaybeLocalBoxFuture<'_, Result<()>> { Box::pin(async { self.w .put(bs) @@ -102,7 +102,7 @@ impl AsyncFileWriter for ParquetObjectWriter { }) } - fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + fn complete(&mut self) -> MaybeLocalBoxFuture<'_, Result<()>> { Box::pin(async { self.w .shutdown() diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 356713837530..073bc7768276 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -1303,7 +1303,6 @@ mod tests { mod async_tests { use super::*; use bytes::Bytes; - use futures::future::BoxFuture; use futures::FutureExt; use std::fs::File; use std::future::Future; @@ -1312,6 +1311,7 @@ mod async_tests { use std::sync::atomic::{AtomicUsize, Ordering}; use crate::file::reader::Length; + use crate::util::async_util::MaybeLocalBoxFuture; use crate::util::test_common::file_util::get_test_file; struct MetadataFetchFn(F); @@ -1321,7 +1321,7 @@ mod async_tests { F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } @@ -1334,7 +1334,7 @@ mod async_tests { Fut: Future> + Send, F2: Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> MaybeLocalBoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } @@ -1345,7 +1345,7 @@ mod async_tests { F2: FnMut(usize) -> Fut + Send, Fut: Future> + Send, { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { + fn fetch_suffix(&mut self, suffix: usize) -> MaybeLocalBoxFuture<'_, Result> { async move { self.1(suffix).await }.boxed() } } diff --git a/parquet/src/util/async_util.rs b/parquet/src/util/async_util.rs new file mode 100644 index 000000000000..4f4c0dfa773b --- /dev/null +++ b/parquet/src/util/async_util.rs @@ -0,0 +1,25 @@ +#[cfg(feature = "async-no-send")] +mod send_impl { + pub trait MaybeSend {} + impl MaybeSend for T {} + pub type MaybeLocalBoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>; +} + +#[cfg(not(feature = "async-no-send"))] +mod send_impl { + pub use std::marker::Send as MaybeSend; + pub type MaybeLocalBoxFuture<'a, T> = futures::future::BoxFuture<'a, T>; +} + +pub use send_impl::*; + +pub trait MaybeLocalFutureExt: std::future::Future { + fn boxed_maybe_local<'a>(self) -> MaybeLocalBoxFuture<'a, Self::Output> + where + Self: Sized + MaybeSend + 'a, + { + Box::pin(self) + } +} + +impl MaybeLocalFutureExt for T where T: std::future::Future {} diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 1431132473e9..02f4f96f7c56 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -24,6 +24,9 @@ pub(crate) mod interner; pub(crate) mod test_common; pub mod utf8; +#[cfg(feature = "async")] +pub(crate) mod async_util; + #[cfg(any(test, feature = "test_common"))] pub use self::test_common::page_util::{ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,