Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2527,8 +2527,8 @@ mod tests {
let stats = statistics_roundtrip::<Int96Type>(&input);
assert!(!stats.is_min_max_backwards_compatible());
if let Statistics::Int96(stats) = stats {
assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
} else {
panic!("expecting Statistics::Int96, got {stats:?}");
}
Expand Down
38 changes: 34 additions & 4 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::util::bit_util::FromBytes;

/// Rust representation for logical type INT96, value is backed by an array of `u32`.
/// The type only takes 12 bytes, without extra padding.
#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Int96 {
value: [u32; 3],
}
Expand Down Expand Up @@ -118,14 +118,44 @@ impl Int96 {
.wrapping_add(nanos)
}

#[inline]
fn get_days(&self) -> i32 {
self.data()[2] as i32
}

#[inline]
fn get_nanos(&self) -> i64 {
((self.data()[1] as i64) << 32) + self.data()[0] as i64
}

#[inline]
fn data_as_days_and_nanos(&self) -> (i32, i64) {
let day = self.data()[2] as i32;
let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
(day, nanos)
(self.get_days(), self.get_nanos())
}
}

impl PartialOrd for Int96 {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Int96 {
/// Order `Int96` correctly for (deprecated) timestamp types.
///
/// Note: this is done even though the Int96 type is deprecated and the
/// [spec does not define the sort order]
/// because some engines, notably Spark and Databricks Photon still write
/// Int96 timestamps and rely on their order for optimization.
///
/// [spec does not define the sort order]: https://github.com/apache/parquet-format/blob/cf943c197f4fad826b14ba0c40eb0ffdab585285/src/main/thrift/parquet.thrift#L1079
fn cmp(&self, other: &Self) -> Ordering {
match self.get_days().cmp(&other.get_days()) {
Ordering::Equal => self.get_nanos().cmp(&other.get_nanos()),
ord => ord,
}
}
}
impl From<Vec<u32>> for Int96 {
fn from(buf: Vec<u32>) -> Self {
assert_eq!(buf.len(), 3);
Expand Down
3 changes: 0 additions & 3 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,6 @@ pub fn from_thrift(
old_format,
),
Type::INT96 => {
// INT96 statistics may not be correct, because comparison is signed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should remove this until we have a filter on known good statistics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// INT96 statistics may not be correct, because comparison is signed
// INT96 statistics may not be correct, because comparison is signed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to merge this one. so I made a follow on PR:

// byte-wise, not actual timestamps. It is recommended to ignore
// min/max statistics for INT96 columns.
let min = if let Some(data) = min {
assert_eq!(data.len(), 12);
Some(Int96::try_from_le_slice(&data)?)
Expand Down
151 changes: 151 additions & 0 deletions parquet/tests/int96_stats_roundtrip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use chrono::{DateTime, NaiveDateTime, Utc};
use parquet::basic::Type;
use parquet::data_type::{Int96, Int96Type};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::statistics::Statistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::parser::parse_message_type;
use rand::seq::SliceRandom;
use std::fs::File;
use std::sync::Arc;
use tempfile::Builder;

fn datetime_to_int96(dt: &str) -> Int96 {
let naive = NaiveDateTime::parse_from_str(dt, "%Y-%m-%d %H:%M:%S%.f").unwrap();
let datetime: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive, Utc);
let nanos = datetime.timestamp_nanos_opt().unwrap();
let mut int96 = Int96::new();
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const NANOSECONDS_IN_DAY: i64 = 86_400_000_000_000;
let days = nanos / NANOSECONDS_IN_DAY;
let remaining_nanos = nanos % NANOSECONDS_IN_DAY;
let julian_day = (days + JULIAN_DAY_OF_EPOCH) as i32;
let julian_day_u32 = julian_day as u32;
let nanos_low = (remaining_nanos & 0xFFFFFFFF) as u32;
let nanos_high = ((remaining_nanos >> 32) & 0xFFFFFFFF) as u32;
int96.set_data(nanos_low, nanos_high, julian_day_u32);
int96
}

fn verify_ordering(data: Vec<Int96>) {
// Create a temporary file
let tmp = Builder::new()
.prefix("test_int96_stats")
.tempfile()
.unwrap();
let file_path = tmp.path().to_owned();

// Create schema with INT96 field
let message_type = "
message test {
REQUIRED INT96 timestamp;
}
";
let schema = parse_message_type(message_type).unwrap();

// Configure writer properties to enable statistics
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let expected_min = data[0];
let expected_max = data[data.len() - 1];

{
let file = File::create(&file_path).unwrap();
let mut writer = SerializedFileWriter::new(file, schema.into(), Arc::new(props)).unwrap();
let mut row_group = writer.next_row_group().unwrap();
let mut col_writer = row_group.next_column().unwrap().unwrap();

{
let writer = col_writer.typed::<Int96Type>();
let mut shuffled_data = data.clone();
shuffled_data.shuffle(&mut rand::rng());
writer.write_batch(&shuffled_data, None, None).unwrap();
}
col_writer.close().unwrap();
row_group.close().unwrap();
writer.close().unwrap();
}

let file = File::open(&file_path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
let row_group = metadata.row_group(0);
let column = row_group.column(0);

let stats = column.statistics().unwrap();
assert_eq!(stats.physical_type(), Type::INT96);

if let Statistics::Int96(stats) = stats {
let min = stats.min_opt().unwrap();
let max = stats.max_opt().unwrap();

assert_eq!(
*min, expected_min,
"Min value should be {expected_min} but was {min}"
);
assert_eq!(
*max, expected_max,
"Max value should be {expected_max} but was {max}"
);
assert_eq!(stats.null_count_opt(), Some(0));
} else {
panic!("Expected Int96 statistics");
}
}

#[test]
fn test_multiple_dates() {
let data = vec![
datetime_to_int96("2020-01-01 00:00:00.000"),
datetime_to_int96("2020-02-29 23:59:59.000"),
datetime_to_int96("2020-12-31 23:59:59.000"),
datetime_to_int96("2021-01-01 00:00:00.000"),
datetime_to_int96("2023-06-15 12:30:45.000"),
datetime_to_int96("2024-02-29 15:45:30.000"),
datetime_to_int96("2024-12-25 07:00:00.000"),
datetime_to_int96("2025-01-01 00:00:00.000"),
datetime_to_int96("2025-07-04 20:00:00.000"),
datetime_to_int96("2025-12-31 23:59:59.000"),
];
verify_ordering(data);
}

#[test]
fn test_same_day_different_time() {
let data = vec![
datetime_to_int96("2020-01-01 00:01:00.000"),
datetime_to_int96("2020-01-01 00:02:00.000"),
datetime_to_int96("2020-01-01 00:03:00.000"),
];
verify_ordering(data);
}

#[test]
fn test_increasing_day_decreasing_time() {
let data = vec![
datetime_to_int96("2020-01-01 12:00:00.000"),
datetime_to_int96("2020-02-01 11:00:00.000"),
datetime_to_int96("2020-03-01 10:00:00.000"),
];
verify_ordering(data);
}
Loading