Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ test_common = ["arrow/test_utils"]
experimental = []
# Enable async APIs
async = ["futures", "tokio"]
# Enable `!Send` with async APIs
async-no-send = ["async"]
# Enable object_store integration
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
Expand Down
13 changes: 10 additions & 3 deletions parquet/benches/arrow_reader_row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ use arrow_array::StringViewArray;
use arrow_cast::pretty::pretty_format_batches;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter,
Expand All @@ -76,6 +75,11 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
use std::ops::Range;
use std::sync::Arc;

#[cfg(feature = "async-no-send")]
type MaybeLocalBoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;
#[cfg(not(feature = "async-no-send"))]
type MaybeLocalBoxFuture<'a, T> = futures::future::BoxFuture<'a, T>;

/// Generates a random string. Has a 50% chance to generate a short string (3–11 characters)
/// or a long string (13–20 characters).
fn random_string(rng: &mut StdRng) -> String {
Expand Down Expand Up @@ -571,15 +575,18 @@ impl InMemoryReader {
}

impl AsyncFileReader for InMemoryReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
fn get_bytes(
&mut self,
range: Range<u64>,
) -> MaybeLocalBoxFuture<'_, parquet::errors::Result<Bytes>> {
let data = self.inner.slice(range.start as usize..range.end as usize);
async move { Ok(data) }.boxed()
}

fn get_metadata<'a>(
&'a mut self,
_options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
) -> MaybeLocalBoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata = Arc::clone(&self.metadata);
async move { Ok(metadata) }.boxed()
}
Expand Down
19 changes: 11 additions & 8 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
use crate::util::async_util::MaybeLocalBoxFuture;
use bytes::Bytes;
use futures::future::BoxFuture;
use std::ops::Range;

/// A data source that can be used with [`ParquetMetaDataReader`] to load [`ParquetMetaData`]
Expand All @@ -33,9 +33,12 @@ use std::ops::Range;
/// # use bytes::Bytes;
/// # use std::ops::Range;
/// # use std::io::SeekFrom;
/// # use futures::future::BoxFuture;
/// # use futures::FutureExt;
/// # use tokio::io::{AsyncReadExt, AsyncSeekExt};
/// # #[cfg(feature = "async-no-send")]
/// # type BoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;
/// # #[cfg(not(feature = "async-no-send"))]
/// # type BoxFuture<'a, T> = futures::future::BoxFuture<'a, T>;
/// // Adapter that implements the API for reading bytes from an async source (in
/// // this case a tokio::fs::File)
/// struct TokioFileMetadata {
Expand All @@ -52,7 +55,7 @@ use std::ops::Range;
/// self.file.read_exact(&mut buf).await?;
/// Ok(Bytes::from(buf)) // convert to Bytes
/// }
/// .boxed() // turn into BoxedFuture, using FutureExt::boxed
/// .boxed() // turn into BoxedFuture, using FutureExt::boxed
/// }
/// }
///```
Expand All @@ -65,12 +68,12 @@ pub trait MetadataFetch {
/// Note the returned type is a boxed future, often created by
/// [`FutureExt::boxed`]. See the trait documentation for an example
///
/// [`FutureExt::boxed`]: futures::FutureExt::boxed
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
/// [`FutureExt::boxed`]: futures::future::FutureExt::boxed
fn fetch(&mut self, range: Range<u64>) -> MaybeLocalBoxFuture<'_, Result<Bytes>>;
}

impl<T: AsyncFileReader> MetadataFetch for &mut T {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn fetch(&mut self, range: Range<u64>) -> MaybeLocalBoxFuture<'_, Result<Bytes>> {
self.get_bytes(range)
}
}
Expand All @@ -86,6 +89,6 @@ pub trait MetadataSuffixFetch: MetadataFetch {
/// Note the returned type is a boxed future, often created by
/// [`FutureExt::boxed`]. See the trait documentation for an example
///
/// [`FutureExt::boxed`]: futures::FutureExt::boxed
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
/// [`FutureExt::boxed`]: futures::future::FutureExt::boxed
fn fetch_suffix(&mut self, suffix: usize) -> MaybeLocalBoxFuture<'_, Result<Bytes>>;
}
57 changes: 32 additions & 25 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::future::FutureExt;
use futures::ready;
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
Expand All @@ -54,6 +54,7 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::util::async_util::{MaybeLocalBoxFuture, MaybeLocalFutureExt, MaybeSend};

mod metadata;
pub use metadata::*;
Expand All @@ -79,12 +80,15 @@ pub use store::*;
/// [`ObjectStore`]: object_store::ObjectStore
///
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
pub trait AsyncFileReader: Send {
pub trait AsyncFileReader: MaybeSend {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
fn get_bytes(&mut self, range: Range<u64>) -> MaybeLocalBoxFuture<'_, Result<Bytes>>;

/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> MaybeLocalBoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());

Expand All @@ -95,7 +99,7 @@ pub trait AsyncFileReader: Send {

Ok(result)
}
.boxed()
.boxed_maybe_local()
}

/// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
Expand All @@ -117,41 +121,44 @@ pub trait AsyncFileReader: Send {
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
) -> MaybeLocalBoxFuture<'a, Result<Arc<ParquetMetaData>>>;
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> MaybeLocalBoxFuture<'_, Result<Bytes>> {
self.as_mut().get_bytes(range)
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> MaybeLocalBoxFuture<'_, Result<Vec<Bytes>>> {
self.as_mut().get_byte_ranges(ranges)
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
) -> MaybeLocalBoxFuture<'a, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata(options)
}
}

impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
fn fetch_suffix(&mut self, suffix: usize) -> MaybeLocalBoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::End(-(suffix as i64))).await?;
let mut buf = Vec::with_capacity(suffix);
self.take(suffix as _).read_to_end(&mut buf).await?;
Ok(buf.into())
}
.boxed()
.boxed_maybe_local()
}
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
impl<T: AsyncRead + AsyncSeek + Unpin + MaybeSend> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<u64>) -> MaybeLocalBoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::Start(range.start)).await?;

Expand All @@ -164,13 +171,13 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {

Ok(buffer.into())
}
.boxed()
.boxed_maybe_local()
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
) -> MaybeLocalBoxFuture<'a, Result<Arc<ParquetMetaData>>> {
async move {
let metadata_reader = ParquetMetaDataReader::new()
.with_page_indexes(options.is_some_and(|o| o.page_index));
Expand All @@ -183,7 +190,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
Ok(Arc::new(parquet_metadata))
}
.boxed()
.boxed_maybe_local()
}
}

Expand Down Expand Up @@ -221,7 +228,7 @@ pub struct AsyncReader<T>(T);
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;

impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
impl<T: AsyncFileReader + MaybeSend + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
/// specified source.
///
Expand Down Expand Up @@ -564,7 +571,7 @@ struct ReaderFactory<T> {

impl<T> ReaderFactory<T>
where
T: AsyncFileReader + Send,
T: AsyncFileReader + MaybeSend,
{
/// Reads the next row group with the provided `selection`, `projection` and `batch_size`
///
Expand Down Expand Up @@ -676,7 +683,7 @@ enum StreamState<T> {
/// Decoding a batch
Decoding(ParquetRecordBatchReader),
/// Reading data from input
Reading(BoxFuture<'static, ReadResult<T>>),
Reading(MaybeLocalBoxFuture<'static, ReadResult<T>>),
/// Error
Error,
}
Expand Down Expand Up @@ -752,7 +759,7 @@ impl<T> ParquetRecordBatchStream<T> {

impl<T> ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
T: AsyncFileReader + Unpin + MaybeSend + 'static,
{
/// Fetches the next row group from the stream.
///
Expand Down Expand Up @@ -815,7 +822,7 @@ where

impl<T> Stream for ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
T: AsyncFileReader + Unpin + MaybeSend + 'static,
{
type Item = Result<RecordBatch>;

Expand Down Expand Up @@ -851,7 +858,7 @@ where
self.projection.clone(),
self.batch_size,
)
.boxed();
.boxed_maybe_local();

self.state = StreamState::Reading(fut)
}
Expand Down Expand Up @@ -892,7 +899,7 @@ impl InMemoryRowGroup<'_> {
///
/// If `selection` is provided, only the pages required for the selection
/// are fetched. Otherwise, all pages are fetched.
async fn fetch<T: AsyncFileReader + Send>(
async fn fetch<T: AsyncFileReader + MaybeSend>(
&mut self,
input: &mut T,
projection: &ProjectionMask,
Expand Down Expand Up @@ -1148,7 +1155,7 @@ mod tests {
}

impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> MaybeLocalBoxFuture<'_, Result<Bytes>> {
let range = range.clone();
self.requests
.lock()
Expand All @@ -1163,7 +1170,7 @@ mod tests {
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
) -> MaybeLocalBoxFuture<'a, Result<Arc<ParquetMetaData>>> {
let metadata_reader = ParquetMetaDataReader::new()
.with_page_indexes(options.is_some_and(|o| o.page_index));
self.metadata = Some(Arc::new(
Expand Down
15 changes: 8 additions & 7 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::arrow::arrow_reader::ArrowReaderOptions;
use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::util::async_util::MaybeLocalBoxFuture;
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use object_store::{path::Path, ObjectStore};
Expand Down Expand Up @@ -161,7 +162,7 @@ impl ParquetObjectReader {
}

impl MetadataSuffixFetch for &mut ParquetObjectReader {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
fn fetch_suffix(&mut self, suffix: usize) -> MaybeLocalBoxFuture<'_, Result<Bytes>> {
let options = GetOptions {
range: Some(GetRange::Suffix(suffix as u64)),
..Default::default()
Expand All @@ -177,14 +178,14 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader {
}

impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> MaybeLocalBoxFuture<'_, Result<Bytes>> {
self.spawn(|store, path| store.get_range(path, range))
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> MaybeLocalBoxFuture<'_, Result<Vec<Bytes>>> {
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
}

Expand All @@ -197,7 +198,7 @@ impl AsyncFileReader for ParquetObjectReader {
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
) -> MaybeLocalBoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let mut metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
Expand Down
Loading
Loading