Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
55e32d2
identify a few spots
paleolimbot Oct 1, 2025
9a88f7e
add a dyn accumulator
paleolimbot Oct 1, 2025
6a34eaf
document and set
paleolimbot Oct 1, 2025
d59c644
maybe pipe into the encoder
paleolimbot Oct 1, 2025
2f268f3
try
paleolimbot Oct 1, 2025
5c9547e
remove some previous work
paleolimbot Oct 1, 2025
6516544
plausibly working
paleolimbot Oct 1, 2025
629afc7
test
paleolimbot Oct 1, 2025
7ae4f17
failing test but a bit better
paleolimbot Oct 2, 2025
523575d
passing test
paleolimbot Oct 2, 2025
203ea9c
test!
paleolimbot Oct 2, 2025
5305e4e
ensure size stats are written for geometry/geography from generic enc…
paleolimbot Oct 6, 2025
3089b69
remove tests that will start failing when new thift footer merges
paleolimbot Oct 6, 2025
9e12b57
more flexible testers
paleolimbot Oct 6, 2025
a3b729b
more tests
paleolimbot Oct 7, 2025
f8b58c6
add roundtrip tests, fix accumulator for the all empty case
paleolimbot Oct 7, 2025
7472ba6
more test files
paleolimbot Oct 7, 2025
be7b522
use code-compatible stats accessor
paleolimbot Oct 7, 2025
e60cd98
fix test for new accessor
paleolimbot Oct 7, 2025
a34e5c4
add documentation for new trait member
paleolimbot Oct 7, 2025
15bbe3c
explicit schema test
paleolimbot Oct 7, 2025
0fca11f
document the accumulator
paleolimbot Oct 7, 2025
cd0f609
tests
paleolimbot Oct 7, 2025
92d0d73
rename
paleolimbot Oct 7, 2025
182776e
once lock thinger
paleolimbot Oct 7, 2025
4c7c52a
remove uneeded todo
paleolimbot Oct 7, 2025
0e600b4
remove copied comment
paleolimbot Oct 7, 2025
59a00ed
add better docstring
paleolimbot Oct 7, 2025
a024793
Apply suggestions from code review
paleolimbot Oct 8, 2025
3798609
more monospace
paleolimbot Oct 8, 2025
85ebb72
more monospace
paleolimbot Oct 8, 2025
2bc7bbe
more compact updater
paleolimbot Oct 8, 2025
431da25
try_new_geo_stats_accumulator()
paleolimbot Oct 8, 2025
6156112
fix link
paleolimbot Oct 8, 2025
ed85f90
Merge branch 'main' into spatial-stats-write
paleolimbot Oct 8, 2025
9d6d6c3
fix build
paleolimbot Oct 8, 2025
392d949
remove duplicate test
paleolimbot Oct 8, 2025
f9112f7
maybe merge tests better
paleolimbot Oct 8, 2025
ec31096
Apply suggestions from code review
paleolimbot Oct 9, 2025
eac356e
document feature flag
paleolimbot Oct 9, 2025
1bb2cd8
verify stats/null count
paleolimbot Oct 9, 2025
d5ba2f2
test column index
paleolimbot Oct 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet-geospatial/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rust-version = { workspace = true }
[dependencies]
arrow-schema = { workspace = true }
geo-traits = { version = "0.3" }
wkb = { version = "0.9" }
wkb = { version = "0.9.1" }

[dev-dependencies]
wkt = { version = "0.14" }
Expand Down
1 change: 1 addition & 0 deletions parquet-geospatial/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@

pub mod bounding;
pub mod interval;
pub mod testing;
66 changes: 66 additions & 0 deletions parquet-geospatial/src/testing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Testing utilities for geospatial Parquet types
/// Build well-known binary representing a point with the given XY coordinate
pub fn wkb_point_xy(x: f64, y: f64) -> Vec<u8> {
let mut item: [u8; 21] = [0; 21];
item[0] = 0x01;
item[1] = 0x01;
item[5..13].copy_from_slice(x.to_le_bytes().as_slice());
item[13..21].copy_from_slice(y.to_le_bytes().as_slice());
item.to_vec()
}
Comment on lines +21 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a huge deal for XY and XYZM points, but if we want more complex helpers for more complex geometries, I think it would be more maintainable and more understandable for future people to use an existing crate to generate the WKB buffers. (In my own projects I use wkt::types as simple extended-dimension geometry types that I pass into wkb APIs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! The GeometryBounder is tested with those here (where we have wkt as a dev dependency). I don't mind how these are implemented (I just needed something for the parquet/geospatial tests).


/// Build well-known binary representing a point with the given XYZM coordinate
pub fn wkb_point_xyzm(x: f64, y: f64, z: f64, m: f64) -> Vec<u8> {
let mut item: [u8; 37] = [0; 37];
item[0] = 0x01;
item[1..5].copy_from_slice(3001_u32.to_le_bytes().as_slice());
item[5..13].copy_from_slice(x.to_le_bytes().as_slice());
item[13..21].copy_from_slice(y.to_le_bytes().as_slice());
item[21..29].copy_from_slice(z.to_le_bytes().as_slice());
item[29..37].copy_from_slice(m.to_le_bytes().as_slice());
item.to_vec()
}

#[cfg(test)]
mod test {

use wkb::reader::Wkb;

use super::*;

#[test]
fn test_wkb_item() {
let bytes = wkb_point_xy(1.0, 2.0);
let geometry = Wkb::try_new(&bytes).unwrap();
let mut wkt = String::new();
wkt::to_wkt::write_geometry(&mut wkt, &geometry).unwrap();
assert_eq!(wkt, "POINT(1 2)");
}

#[test]
fn test_wkb_point_xyzm() {
let bytes = wkb_point_xyzm(1.0, 2.0, 3.0, 4.0);
let geometry = Wkb::try_new(&bytes).unwrap();
let mut wkt = String::new();
wkt::to_wkt::write_geometry(&mut wkt, &geometry).unwrap();
assert_eq!(wkt, "POINT ZM(1 2 3 4)");
}
}
3 changes: 3 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ arrow-data = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
parquet-geospatial = { workspace = true, optional = true }
parquet-variant = { workspace = true, optional = true }
parquet-variant-json = { workspace = true, optional = true }
parquet-variant-compute = { workspace = true, optional = true }
Expand Down Expand Up @@ -131,6 +132,8 @@ flate2-rust_backened = ["flate2/rust_backend"]
flate2-zlib-rs = ["flate2/zlib-rs"]
# Enable parquet variant support
variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
# Enable geospatial support
geospatial = ["parquet-geospatial"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add the new feature flag to the main crate readme as well?

https://github.com/apache/arrow-rs/blob/main/parquet/README.md#feature-flags



[[example]]
Expand Down
1 change: 1 addition & 0 deletions parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The `parquet` crate provides the following features which may be enabled in your
- `simdutf8` (default) - Use the [`simdutf8`] crate for SIMD-accelerated UTF-8 validation
- `encryption` - support for reading / writing encrypted Parquet files
- `variant_experimental` - ⚠️ Experimental [Parquet Variant] support, which may change, even between minor releases.
- `geospatial` - ⚠️ Experimental geospatial support, which may change, even between minor releases.

[`arrow`]: https://crates.io/crates/arrow
[`simdutf8`]: https://crates.io/crates/simdutf8
Expand Down
31 changes: 30 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::geospatial::accumulator::{try_new_geo_stats_accumulator, GeoStatsAccumulator};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
Expand Down Expand Up @@ -421,6 +423,7 @@ pub struct ByteArrayEncoder {
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
}

impl ColumnValueEncoder for ByteArrayEncoder {
Expand All @@ -447,13 +450,16 @@ impl ColumnValueEncoder for ByteArrayEncoder {

let statistics_enabled = props.statistics_enabled(descr.path());

let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);

Ok(Self {
fallback,
statistics_enabled,
bloom_filter,
dict_encoder: dictionary,
min_value: None,
max_value: None,
geo_stats_accumulator,
})
}

Expand Down Expand Up @@ -536,6 +542,10 @@ impl ColumnValueEncoder for ByteArrayEncoder {
_ => self.fallback.flush_data_page(min_value, max_value),
}
}

fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
}
}

/// Encodes the provided `values` and `indices` to `encoder`
Expand All @@ -547,7 +557,9 @@ where
T::Item: Copy + Ord + AsRef<[u8]>,
{
if encoder.statistics_enabled != EnabledStatistics::None {
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
} else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
encoder.min_value = Some(min);
}
Expand Down Expand Up @@ -595,3 +607,20 @@ where
}
Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
}

/// Updates geospatial statistics for the provided array and indices
fn update_geo_stats_accumulator<T>(
bounder: &mut dyn GeoStatsAccumulator,
array: T,
valid: impl Iterator<Item = usize>,
) where
T: ArrayAccessor,
T::Item: Copy + Ord + AsRef<[u8]>,
{
if bounder.is_valid() {
for idx in valid {
let val = array.value(idx);
bounder.update_wkb(val.as_ref());
}
}
}
95 changes: 89 additions & 6 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,18 @@ impl<W: Write + Send> ArrowWriter<W> {
options: ArrowWriterOptions,
) -> Result<Self> {
let mut props = options.properties;
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
if let Some(schema_root) = &options.schema_root {
converter = converter.schema_root(schema_root);
}
let schema = converter.convert(&arrow_schema)?;

let schema = if let Some(parquet_schema) = options.schema_descr {
parquet_schema.clone()
} else {
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
if let Some(schema_root) = &options.schema_root {
converter = converter.schema_root(schema_root);
}

converter.convert(&arrow_schema)?
};

if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
Expand Down Expand Up @@ -457,6 +464,7 @@ pub struct ArrowWriterOptions {
properties: WriterProperties,
skip_arrow_metadata: bool,
schema_root: Option<String>,
schema_descr: Option<SchemaDescriptor>,
}

impl ArrowWriterOptions {
Expand Down Expand Up @@ -490,6 +498,18 @@ impl ArrowWriterOptions {
..self
}
}

/// Explicitly specify the Parquet schema to be used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice API addition I think

///
/// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
/// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
/// already known or must be calculated using custom logic.
pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
Self {
schema_descr: Some(schema_descr),
..self
}
}
}

/// A single column chunk produced by [`ArrowColumnWriter`]
Expand Down Expand Up @@ -1513,7 +1533,7 @@ mod tests {
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::reader::SerializedPageReader;
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
use crate::schema::types::ColumnPath;
use crate::schema::types::{ColumnPath, Type};
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -4135,6 +4155,69 @@ mod tests {
}
}

#[test]
fn test_arrow_writer_explicit_schema() {
// Write an int32 array using explicit int64 storage
let batch_schema = Arc::new(Schema::new(vec![Field::new(
"integers",
DataType::Int32,
true,
)]));
let parquet_schema = Type::group_type_builder("root")
.with_fields(vec![Type::primitive_type_builder(
"integers",
crate::basic::Type::INT64,
)
.build()
.unwrap()
.into()])
.build()
.unwrap();
let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());

let batch = RecordBatch::try_new(
batch_schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();

let explicit_schema_options =
ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new_with_options(
&mut buf,
batch_schema.clone(),
explicit_schema_options,
)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let bytes = Bytes::from(buf);
let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();

let expected_schema = Arc::new(Schema::new(vec![Field::new(
"integers",
DataType::Int64,
true,
)]));
assert_eq!(reader_builder.schema(), &expected_schema);

let batches = reader_builder
.build()
.unwrap()
.collect::<Result<Vec<_>, ArrowError>>()
.unwrap();
assert_eq!(batches.len(), 1);

let expected_batch = RecordBatch::try_new(
expected_schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();
assert_eq!(batches[0], expected_batch);
}

#[test]
fn mismatched_schemas() {
let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
Expand Down
32 changes: 30 additions & 2 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::geospatial::accumulator::{try_new_geo_stats_accumulator, GeoStatsAccumulator};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};

/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
Expand Down Expand Up @@ -121,6 +123,10 @@ pub trait ColumnValueEncoder {
/// will *not* be tracked by the bloom filter as it is empty since. This should be called once
/// near the end of encoding.
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;

/// Computes [`GeospatialStatistics`], if any, and resets internal state such that any internal
/// accumulator is prepared to accumulate statistics for the next column chunk.
fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>>;
}

pub struct ColumnValueEncoderImpl<T: DataType> {
Expand All @@ -133,6 +139,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
variable_length_bytes: Option<i64>,
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
Expand All @@ -145,10 +152,12 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {

fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled != EnabledStatistics::None
// INTERVAL has undefined sort order, so don't write min/max stats for it
// INTERVAL, Geometry, and Geography have undefined sort order, so don't write min/max stats for them
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
if let Some((min, max)) = self.min_max(slice, None) {
if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() {
update_geo_stats_accumulator(accumulator, slice.iter());
} else if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}
Expand Down Expand Up @@ -201,6 +210,8 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);

Ok(Self {
encoder,
dict_encoder,
Expand All @@ -211,6 +222,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
min_value: None,
max_value: None,
variable_length_bytes: None,
geo_stats_accumulator,
})
}

Expand Down Expand Up @@ -307,6 +319,10 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
variable_length_bytes: self.variable_length_bytes.take(),
})
}

fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
}
}

fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
Expand Down Expand Up @@ -367,3 +383,15 @@ fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace:
_ => val.clone(),
}
}

fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
where
T: ParquetValueType + 'a,
I: Iterator<Item = &'a T>,
{
if bounder.is_valid() {
for val in iter {
bounder.update_wkb(val.as_bytes());
}
}
}
Loading
Loading