diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index d51df550622d..108926b59f6f 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -20,7 +20,7 @@ use arrow::array::{Array, ArrayData, ArrayRef, AsArray, BinaryViewArray, StructArray}; use arrow::buffer::NullBuffer; use arrow::datatypes::Int32Type; -use arrow_schema::{ArrowError, DataType}; +use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields}; use parquet_variant::Variant; use std::any::Any; use std::sync::Arc; @@ -48,6 +48,9 @@ pub struct VariantArray { /// Reference to the underlying StructArray inner: StructArray, + /// The metadata column of this variant + metadata: BinaryViewArray, + /// how is this variant array shredded? shredding_state: ShreddingState, } @@ -102,31 +105,42 @@ impl VariantArray { ))); }; - // Find the value field, if present - let value = inner - .column_by_name("value") - .map(|v| { - v.as_binary_view_opt().ok_or_else(|| { - ArrowError::NotYetImplemented(format!( - "VariantArray 'value' field must be BinaryView, got {}", - v.data_type() - )) - }) - }) - .transpose()?; - - // Find the typed_value field, if present - let typed_value = inner.column_by_name("typed_value"); - // Note these clones are cheap, they just bump the ref count - let inner = inner.clone(); - let shredding_state = - ShreddingState::try_new(metadata.clone(), value.cloned(), typed_value.cloned())?; - Ok(Self { + inner: inner.clone(), + metadata: metadata.clone(), + shredding_state: ShreddingState::try_new(inner)?, + }) + } + + #[allow(unused)] + pub(crate) fn from_parts( + metadata: BinaryViewArray, + value: Option, + typed_value: Option, + nulls: Option, + ) -> Self { + let mut builder = + StructArrayBuilder::new().with_field("metadata", Arc::new(metadata.clone())); + if let Some(value) = value.clone() { + builder = builder.with_field("value", Arc::new(value)); + } + if let Some(typed_value) = typed_value.clone() { + builder = builder.with_field("typed_value", typed_value); + } + if let Some(nulls) = nulls { + builder = builder.with_nulls(nulls); + } + + // This would be a lot simpler if ShreddingState were just a pair of Option... we already + // have everything we need. + let inner = builder.build(); + let shredding_state = ShreddingState::try_new(&inner).unwrap(); // valid by construction + Self { inner, + metadata, shredding_state, - }) + } } /// Returns a reference to the underlying [`StructArray`]. @@ -166,23 +180,19 @@ impl VariantArray { /// caller to ensure that the metadata and value were constructed correctly. pub fn value(&self, index: usize) -> Variant<'_, '_> { match &self.shredding_state { - ShreddingState::Unshredded { metadata, value } => { - Variant::new(metadata.value(index), value.value(index)) + ShreddingState::Unshredded { value } => { + Variant::new(self.metadata.value(index), value.value(index)) } - ShreddingState::Typed { typed_value, .. } => { + ShreddingState::PerfectlyShredded { typed_value, .. } => { if typed_value.is_null(index) { Variant::Null } else { typed_value_to_variant(typed_value, index) } } - ShreddingState::PartiallyShredded { - metadata, - value, - typed_value, - } => { + ShreddingState::ImperfectlyShredded { value, typed_value } => { if typed_value.is_null(index) { - Variant::new(metadata.value(index), value.value(index)) + Variant::new(self.metadata.value(index), value.value(index)) } else { typed_value_to_variant(typed_value, index) } @@ -192,7 +202,96 @@ impl VariantArray { /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &BinaryViewArray { - self.shredding_state.metadata_field() + &self.metadata + } + + /// Return a reference to the value field of the `StructArray` + pub fn value_field(&self) -> Option<&BinaryViewArray> { + self.shredding_state.value_field() + } + + /// Return a reference to the typed_value field of the `StructArray`, if present + pub fn typed_value_field(&self) -> Option<&ArrayRef> { + self.shredding_state.typed_value_field() + } +} + +/// One shredded field of a partially or prefectly shredded variant. For example, suppose the +/// shredding schema for variant `v` treats it as an object with a single field `a`, where `a` is +/// itself a struct with the single field `b` of type INT. Then the physical layout of the column +/// is: +/// +/// ```text +/// v: VARIANT { +/// metadata: BINARY, +/// value: BINARY, +/// typed_value: STRUCT { +/// a: SHREDDED_VARIANT_FIELD { +/// value: BINARY, +/// typed_value: STRUCT { +/// a: SHREDDED_VARIANT_FIELD { +/// value: BINARY, +/// typed_value: INT, +/// }, +/// }, +/// }, +/// }, +/// } +/// ``` +/// +/// In the above, each row of `v.value` is either a variant value (shredding failed, `v` was not an +/// object at all) or a variant object (partial shredding, `v` was an object but included unexpected +/// fields other than `a`), or is NULL (perfect shredding, `v` was an object containing only the +/// single expected field `a`). +/// +/// A similar story unfolds for each `v.typed_value.a.value` -- a variant value if shredding failed +/// (`v:a` was not an object at all), or a variant object (`v:a` was an object with unexpected +/// additional fields), or NULL (`v:a` was an object containing only the single expected field `b`). +/// +/// Finally, `v.typed_value.a.typed_value.b.value` is either NULL (`v:a.b` was an integer) or else a +/// variant value. +pub struct ShreddedVariantFieldArray { + shredding_state: ShreddingState, +} + +#[allow(unused)] +impl ShreddedVariantFieldArray { + /// Creates a new `ShreddedVariantFieldArray` from a [`StructArray`]. + /// + /// # Arguments + /// - `inner` - The underlying [`StructArray`] that contains the variant data. + /// + /// # Returns + /// - A new instance of `ShreddedVariantFieldArray`. + /// + /// # Errors: + /// - If the `StructArray` does not contain the required fields + /// + /// # Requirements of the `StructArray` + /// + /// 1. An optional field named `value` that is binary, large_binary, or + /// binary_view + /// + /// 2. An optional field named `typed_value` which can be any primitive type + /// or be a list, large_list, list_view or struct + /// + /// Currently, only `value` columns of type [`BinaryViewArray`] are supported. + pub fn try_new(inner: ArrayRef) -> Result { + let Some(inner) = inner.as_struct_opt() else { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: requires StructArray as input".to_string(), + )); + }; + + // Note this clone is cheap, it just bumps the ref count + Ok(Self { + shredding_state: ShreddingState::try_new(inner)?, + }) + } + + /// Return the shredding state of this `VariantArray` + pub fn shredding_state(&self) -> &ShreddingState { + &self.shredding_state } /// Return a reference to the value field of the `StructArray` @@ -229,24 +328,21 @@ pub enum ShreddingState { // TODO: add missing state where there is neither value nor typed_value // Missing { metadata: BinaryViewArray }, /// This variant has no typed_value field - Unshredded { - metadata: BinaryViewArray, - value: BinaryViewArray, - }, + Unshredded { value: BinaryViewArray }, /// This variant has a typed_value field and no value field /// meaning it is the shredded type - Typed { - metadata: BinaryViewArray, - typed_value: ArrayRef, - }, - /// Partially shredded: - /// * value is an object - /// * typed_value is a shredded object. + PerfectlyShredded { typed_value: ArrayRef }, + /// Imperfectly shredded: Shredded values reside in `typed_value` while those that failed to + /// shred reside in `value`. Missing field values are NULL in both columns, while NULL primitive + /// values have NULL `typed_value` and `Variant::Null` in `value`. /// - /// Note the spec says "Writers must not produce data where both value and - /// typed_value are non-null, unless the Variant value is an object." - PartiallyShredded { - metadata: BinaryViewArray, + /// NOTE: A partially shredded struct is a special kind of imperfect shredding, where + /// `typed_value` and `value` are both non-NULL. The `typed_value` is a struct containing the + /// subset of fields for which shredding was attempted (each field will then have its own value + /// and/or typed_value sub-fields that indicate how shredding actually turned out). Meanwhile, + /// the `value` is a variant object containing the subset of fields for which shredding was + /// not even attempted. + ImperfectlyShredded { value: BinaryViewArray, typed_value: ArrayRef, }, @@ -254,43 +350,44 @@ pub enum ShreddingState { impl ShreddingState { /// try to create a new `ShreddingState` from the given fields - pub fn try_new( - metadata: BinaryViewArray, - value: Option, - typed_value: Option, - ) -> Result { - match (metadata, value, typed_value) { - (metadata, Some(value), Some(typed_value)) => Ok(Self::PartiallyShredded { - metadata, - value, - typed_value, - }), - (metadata, Some(value), None) => Ok(Self::Unshredded { metadata, value }), - (metadata, None, Some(typed_value)) => Ok(Self::Typed { - metadata, - typed_value, - }), - (_metadata_field, None, None) => Err(ArrowError::InvalidArgumentError(String::from( + pub fn try_new(inner: &StructArray) -> Result { + // Note the specification allows for any order so we must search by name + + // Find the value field, if present + let value = inner + .column_by_name("value") + .map(|v| { + v.as_binary_view_opt().ok_or_else(|| { + ArrowError::NotYetImplemented(format!( + "VariantArray 'value' field must be BinaryView, got {}", + v.data_type() + )) + }) + }) + .transpose()? + .cloned(); + + // Find the typed_value field, if present + let typed_value = inner.column_by_name("typed_value").cloned(); + + match (value, typed_value) { + (Some(value), Some(typed_value)) => { + Ok(Self::ImperfectlyShredded { value, typed_value }) + } + (Some(value), None) => Ok(Self::Unshredded { value }), + (None, Some(typed_value)) => Ok(Self::PerfectlyShredded { typed_value }), + (None, None) => Err(ArrowError::InvalidArgumentError(String::from( "VariantArray has neither value nor typed_value field", ))), } } - /// Return a reference to the metadata field - pub fn metadata_field(&self) -> &BinaryViewArray { - match self { - ShreddingState::Unshredded { metadata, .. } => metadata, - ShreddingState::Typed { metadata, .. } => metadata, - ShreddingState::PartiallyShredded { metadata, .. } => metadata, - } - } - /// Return a reference to the value field, if present pub fn value_field(&self) -> Option<&BinaryViewArray> { match self { ShreddingState::Unshredded { value, .. } => Some(value), - ShreddingState::Typed { .. } => None, - ShreddingState::PartiallyShredded { value, .. } => Some(value), + ShreddingState::PerfectlyShredded { .. } => None, + ShreddingState::ImperfectlyShredded { value, .. } => Some(value), } } @@ -298,38 +395,71 @@ impl ShreddingState { pub fn typed_value_field(&self) -> Option<&ArrayRef> { match self { ShreddingState::Unshredded { .. } => None, - ShreddingState::Typed { typed_value, .. } => Some(typed_value), - ShreddingState::PartiallyShredded { typed_value, .. } => Some(typed_value), + ShreddingState::PerfectlyShredded { typed_value, .. } => Some(typed_value), + ShreddingState::ImperfectlyShredded { typed_value, .. } => Some(typed_value), } } /// Slice all the underlying arrays pub fn slice(&self, offset: usize, length: usize) -> Self { match self { - ShreddingState::Unshredded { metadata, value } => ShreddingState::Unshredded { - metadata: metadata.slice(offset, length), - value: value.slice(offset, length), - }, - ShreddingState::Typed { - metadata, - typed_value, - } => ShreddingState::Typed { - metadata: metadata.slice(offset, length), - typed_value: typed_value.slice(offset, length), - }, - ShreddingState::PartiallyShredded { - metadata, - value, - typed_value, - } => ShreddingState::PartiallyShredded { - metadata: metadata.slice(offset, length), + ShreddingState::Unshredded { value } => ShreddingState::Unshredded { value: value.slice(offset, length), - typed_value: typed_value.slice(offset, length), }, + ShreddingState::PerfectlyShredded { typed_value } => { + ShreddingState::PerfectlyShredded { + typed_value: typed_value.slice(offset, length), + } + } + ShreddingState::ImperfectlyShredded { value, typed_value } => { + ShreddingState::ImperfectlyShredded { + value: value.slice(offset, length), + typed_value: typed_value.slice(offset, length), + } + } } } } +/// Builds struct arrays from component fields +/// +/// TODO: move to arrow crate +#[derive(Debug, Default, Clone)] +pub struct StructArrayBuilder { + fields: Vec, + arrays: Vec, + nulls: Option, +} + +impl StructArrayBuilder { + pub fn new() -> Self { + Default::default() + } + + /// Add an array to this struct array as a field with the specified name. + pub fn with_field(mut self, field_name: &str, array: ArrayRef) -> Self { + let field = Field::new(field_name, array.data_type().clone(), true); + self.fields.push(Arc::new(field)); + self.arrays.push(array); + self + } + + /// Set the null buffer for this struct array. + pub fn with_nulls(mut self, nulls: NullBuffer) -> Self { + self.nulls = Some(nulls); + self + } + + pub fn build(self) -> StructArray { + let Self { + fields, + arrays, + nulls, + } = self; + StructArray::new(Fields::from(fields), arrays, nulls) + } +} + /// returns the non-null element at index as a Variant fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, '_> { match typed_value.data_type() { @@ -372,9 +502,11 @@ impl Array for VariantArray { fn slice(&self, offset: usize, length: usize) -> ArrayRef { let inner = self.inner.slice(offset, length); + let metadata = self.metadata.slice(offset, length); let shredding_state = self.shredding_state.slice(offset, length); Arc::new(Self { inner, + metadata, shredding_state, }) } diff --git a/parquet-variant-compute/src/variant_get/mod.rs b/parquet-variant-compute/src/variant_get/mod.rs index cc852bbc32a2..c494b384c045 100644 --- a/parquet-variant-compute/src/variant_get/mod.rs +++ b/parquet-variant-compute/src/variant_get/mod.rs @@ -15,25 +15,197 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option, typed_value: Option| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; + } + ShreddedPathStep::Missing => { + let num_rows = input.len(); + let arr = match as_type { + Some(data_type) => Arc::new(array::new_null_array(data_type, num_rows)) as _, + None => Arc::new(array::NullArray::new(num_rows)) as _, + }; + return Ok(arr); + } + ShreddedPathStep::NotShredded => { + let target = make_target_variant(shredding_state.value_field().cloned(), None); + return shred_basic_variant(target, path[path_index..].into(), as_type); + } + }; + } + + // Path exhausted! Create a new `VariantArray` for the location we landed on. + let target = make_target_variant( + shredding_state.value_field().cloned(), + shredding_state.typed_value_field().cloned(), + ); + + // If our caller did not request any specific type, we can just return whatever we landed on. + let Some(data_type) = as_type else { + return Ok(Arc::new(target)); + }; + + // Structs are special. Recurse into each field separately, hoping to follow the shredding even + // further, and build up the final struct from those individually shredded results. + if let DataType::Struct(fields) = data_type { + let children = fields + .iter() + .map(|field| { + shredded_get_path( + &target, + &[VariantPathElement::from(field.name().as_str())], + Some(field.data_type()), + ) + }) + .collect::>>()?; + + return Ok(Arc::new(StructArray::try_new( + fields.clone(), + children, + target.nulls().cloned(), + )?)); + } + + // Not a struct, so directly shred the variant as the requested type + shred_basic_variant(target, VariantPath::default(), as_type) +} + /// Returns an array with the specified path extracted from the variant values. /// /// The return array type depends on the `as_type` field of the options parameter /// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point /// to the specified path. /// 2. `as_type: Some()`: an array of the specified type is returned. +/// +/// TODO: How would a caller request a struct or list type where the fields/elements can be any +/// variant? Caller can pass None as the requested type to fetch a specific path, but it would +/// quickly become annoying (and inefficient) to call `variant_get` for each leaf value in a struct or +/// list and then try to assemble the results. pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { ArrowError::InvalidArgumentError( @@ -41,24 +213,10 @@ pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { ) })?; - // Create the output writer based on the specified output options - let output_builder = instantiate_output_builder(options.clone())?; + let GetOptions { as_type, path, .. } = &options; - // Dispatch based on the shredding state of the input variant array - match variant_array.shredding_state() { - ShreddingState::PartiallyShredded { - metadata, - value, - typed_value, - } => output_builder.partially_shredded(variant_array, metadata, value, typed_value), - ShreddingState::Typed { - metadata, - typed_value, - } => output_builder.typed(variant_array, metadata, typed_value), - ShreddingState::Unshredded { metadata, value } => { - output_builder.unshredded(variant_array, metadata, value) - } - } + let as_type = as_type.as_ref().map(|f| f.data_type()); + shredded_get_path(variant_array, path, as_type) } /// Controls the action of the variant_get kernel. @@ -106,10 +264,10 @@ impl<'a> GetOptions<'a> { mod test { use std::sync::Arc; - use arrow::array::{Array, ArrayRef, BinaryViewArray, Int32Array, StringArray, StructArray}; + use arrow::array::{Array, ArrayRef, BinaryViewArray, Int32Array, StringArray}; use arrow::buffer::NullBuffer; use arrow::compute::CastOptions; - use arrow_schema::{DataType, Field, FieldRef, Fields}; + use arrow_schema::{DataType, Field, FieldRef}; use parquet_variant::{Variant, VariantPath}; use crate::batch_json_string_to_variant; @@ -309,7 +467,7 @@ mod test { let metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(&metadata, 3)); let typed_value = Int32Array::from(vec![Some(1), Some(2), Some(3)]); - let struct_array = StructArrayBuilder::new() + let struct_array = crate::variant_array::StructArrayBuilder::new() .with_field("metadata", Arc::new(metadata)) .with_field("typed_value", Arc::new(typed_value)) .build(); @@ -377,7 +535,7 @@ mod test { Some(100), // row 3 is shredded, so it has a value ]); - let struct_array = StructArrayBuilder::new() + let struct_array = crate::variant_array::StructArrayBuilder::new() .with_field("metadata", Arc::new(metadata)) .with_field("typed_value", Arc::new(typed_value)) .with_field("value", Arc::new(values)) @@ -388,43 +546,4 @@ mod test { VariantArray::try_new(Arc::new(struct_array)).expect("should create variant array"), ) } - - /// Builds struct arrays from component fields - /// - /// TODO: move to arrow crate - #[derive(Debug, Default, Clone)] - struct StructArrayBuilder { - fields: Vec, - arrays: Vec, - nulls: Option, - } - - impl StructArrayBuilder { - fn new() -> Self { - Default::default() - } - - /// Add an array to this struct array as a field with the specified name. - fn with_field(mut self, field_name: &str, array: ArrayRef) -> Self { - let field = Field::new(field_name, array.data_type().clone(), true); - self.fields.push(Arc::new(field)); - self.arrays.push(array); - self - } - - /// Set the null buffer for this struct array. - fn with_nulls(mut self, nulls: NullBuffer) -> Self { - self.nulls = Some(nulls); - self - } - - pub fn build(self) -> StructArray { - let Self { - fields, - arrays, - nulls, - } = self; - StructArray::new(Fields::from(fields), arrays, nulls) - } - } } diff --git a/parquet-variant-compute/src/variant_get/output/mod.rs b/parquet-variant-compute/src/variant_get/output/mod.rs index 245d73cce8db..81c177aae2df 100644 --- a/parquet-variant-compute/src/variant_get/output/mod.rs +++ b/parquet-variant-compute/src/variant_get/output/mod.rs @@ -16,6 +16,7 @@ // under the License. mod primitive; +pub(crate) mod struct_output; mod variant; use crate::variant_get::output::primitive::PrimitiveOutputBuilder; @@ -33,6 +34,7 @@ use arrow_schema::{ArrowError, DataType}; /// or as a specific type (e.g. Int32Array). /// /// See [`instantiate_output_builder`] to create an instance of this trait. +#[allow(unused)] pub(crate) trait OutputBuilder { /// create output for a shredded variant array fn partially_shredded( @@ -60,6 +62,7 @@ pub(crate) trait OutputBuilder { ) -> Result; } +#[allow(unused)] pub(crate) fn instantiate_output_builder<'a>( options: GetOptions<'a>, ) -> Result> { diff --git a/parquet-variant-compute/src/variant_get/output/primitive.rs b/parquet-variant-compute/src/variant_get/output/primitive.rs index 36e4221e3242..132d0dcf42f9 100644 --- a/parquet-variant-compute/src/variant_get/output/primitive.rs +++ b/parquet-variant-compute/src/variant_get/output/primitive.rs @@ -33,6 +33,7 @@ use std::sync::Arc; /// Trait for Arrow primitive types that can be used in the output builder /// /// This just exists to add a generic way to convert from Variant to the primitive type +#[allow(unused)] pub(super) trait ArrowPrimitiveVariant: ArrowPrimitiveType { /// Try to extract the primitive value from a Variant, returning None if it /// cannot be converted @@ -42,6 +43,7 @@ pub(super) trait ArrowPrimitiveVariant: ArrowPrimitiveType { } /// Outputs Primitive arrays +#[allow(unused)] pub(super) struct PrimitiveOutputBuilder<'a, T: ArrowPrimitiveVariant> { /// What path to extract path: VariantPath<'a>, diff --git a/parquet-variant-compute/src/variant_get/output/struct_output.rs b/parquet-variant-compute/src/variant_get/output/struct_output.rs new file mode 100644 index 000000000000..77d76e6317ba --- /dev/null +++ b/parquet-variant-compute/src/variant_get/output/struct_output.rs @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result; + + fn finish(&mut self) -> Result; +} + +/// A thin wrapper whose only job is to extract a specific path from a variant value and pass the +/// result to a nested builder. +#[allow(unused)] +struct VariantPathRowBuilder<'a, T: VariantShreddingRowBuilder> { + builder: T, + path: VariantPath<'a>, +} + +impl VariantShreddingRowBuilder for VariantPathRowBuilder<'_, T> { + fn append_null(&mut self) -> Result<()> { + self.builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { + if let Some(v) = value.get_path(&self.path) { + self.builder.append_value(&v) + } else { + self.builder.append_null()?; + Ok(false) + } + } + fn finish(&mut self) -> Result { + self.builder.finish() + } +} + +/// Helper trait for converting `Variant` values to arrow primitive values. +#[allow(unused)] +trait VariantAsPrimitive { + fn as_primitive(&self) -> Option; +} +impl VariantAsPrimitive for Variant<'_, '_> { + fn as_primitive(&self) -> Option { + self.as_int32() + } +} +impl VariantAsPrimitive for Variant<'_, '_> { + fn as_primitive(&self) -> Option { + self.as_f64() + } +} + +/// Builder for shredding variant values to primitive values +#[allow(unused)] +struct PrimitiveVariantShreddingRowBuilder { + builder: arrow::array::PrimitiveBuilder, +} + +impl VariantShreddingRowBuilder for PrimitiveVariantShreddingRowBuilder +where + T: ArrowPrimitiveType, + for<'m, 'v> Variant<'m, 'v>: VariantAsPrimitive, +{ + fn append_null(&mut self) -> Result<()> { + self.builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { + if let Some(v) = value.as_primitive() { + self.builder.append_value(v); + Ok(true) + } else { + self.builder.append_null(); // TODO: handle casting failure + Ok(false) + } + } + + fn finish(&mut self) -> Result { + Ok(Arc::new(self.builder.finish())) + } +} + +/// Builder for appending raw binary variant values to a BinaryViewArray. It copies the bytes +/// as-is, without any decoding. +#[allow(unused)] +struct BinaryVariantRowBuilder { + nulls: NullBufferBuilder, +} + +impl VariantShreddingRowBuilder for BinaryVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + Ok(()) + } + fn append_value(&mut self, _value: &Variant<'_, '_>) -> Result { + // We need a way to convert a Variant directly to bytes. In particular, we want to just copy + // across the underlying value byte slice of a `Variant::Object` or `Variant::List`, without + // any interaction with a `VariantMetadata` (because we will just reuse the existing one). + // + // One could _probably_ emulate this with parquet_variant::VariantBuilder, but it would do a + // lot of unnecessary work and would also create a new metadata column we don't need. + todo!() + } + + fn finish(&mut self) -> Result { + // What `finish` does will depend strongly on how `append_value` ends up working. But + // ultimately we'll create and return a `VariantArray` instance. + todo!() + } +} + +/// Builder that extracts a struct. Casting failures produce NULL or error according to options. +#[allow(unused)] +struct StructVariantShreddingRowBuilder { + nulls: NullBufferBuilder, + field_builders: Vec<(FieldRef, Box)>, +} + +impl VariantShreddingRowBuilder for StructVariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()> { + for (_, builder) in &mut self.field_builders { + builder.append_null()?; + } + self.nulls.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { + // Casting failure if it's not even an object. + let Variant::Object(value) = value else { + // TODO: handle casting failure + self.append_null()?; + return Ok(false); + }; + + // Process each field. If the field is missing, it becomes NULL. If the field is present, + // the child builder handles it from there, and a failed cast could produce NULL or error. + // + // TODO: This loop costs `O(m lg n)` where `m` is the number of fields in this builder and + // `n` is the number of fields in the variant object we're probing. Given that `m` and `n` + // could both be large -- indepentently of each other -- we should consider doing something + // more clever that bounds the cost to O(m + n). + for (field, builder) in &mut self.field_builders { + match value.get(field.name()) { + None => builder.append_null()?, + Some(v) => { + builder.append_value(&v)?; + } + } + } + self.nulls.append_non_null(); + Ok(true) + } + + fn finish(&mut self) -> Result { + let mut fields = Vec::with_capacity(self.field_builders.len()); + let mut arrays = Vec::with_capacity(self.field_builders.len()); + for (field, mut builder) in std::mem::take(&mut self.field_builders) { + fields.push(field); + arrays.push(builder.finish()?); + } + Ok(Arc::new(arrow::array::StructArray::try_new( + fields.into(), + arrays, + self.nulls.finish(), + )?)) + } +} + +/// Used for actual shredding of binary variant values into shredded variant values +#[allow(unused)] +struct ShreddedVariantRowBuilder { + metadata: arrow::array::BinaryViewArray, + nulls: NullBufferBuilder, + value_builder: BinaryVariantRowBuilder, + typed_value_builder: Box, +} + +impl VariantShreddingRowBuilder for ShreddedVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + self.value_builder.append_value(&Variant::Null)?; + self.typed_value_builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { + self.nulls.append_non_null(); + if self.typed_value_builder.append_value(value)? { + // spec: (value: NULL, typed_value: non-NULL => value is present and shredded) + self.value_builder.append_null()?; + } else { + // spec: (value: non-NULL, typed_value: NULL => value is present and unshredded) + self.value_builder.append_value(value)?; + } + Ok(true) + } + + fn finish(&mut self) -> Result { + let value = self.value_builder.finish()?; + let Some(value) = value.as_byte_view_opt() else { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: value builder must produce a BinaryViewArray".to_string(), + )); + }; + Ok(Arc::new(crate::VariantArray::from_parts( + self.metadata.clone(), + Some(value.clone()), // TODO: How to consume an ArrayRef directly? + Some(self.typed_value_builder.finish()?), + self.nulls.finish(), + ))) + } +} + +/// Like VariantShreddingRowBuilder, but for (partially shredded) structs which need special +/// handling on a per-field basis. +#[allow(unused)] +struct VariantShreddingStructRowBuilder { + metadata: arrow::array::BinaryViewArray, + nulls: NullBufferBuilder, + value_builder: BinaryVariantRowBuilder, + typed_value_field_builders: Vec<(FieldRef, Box)>, + typed_value_nulls: NullBufferBuilder, +} + +#[allow(unused)] +impl VariantShreddingStructRowBuilder { + fn append_null_typed_value(&mut self) -> Result<()> { + for (_, builder) in &mut self.typed_value_field_builders { + builder.append_null()?; + } + self.typed_value_nulls.append_null(); + Ok(()) + } + + // Co-iterate over all fields of both the input value object and the target `typed_value` + // schema, effectively performing full outer merge join by field name. + // + // NOTE: At most one of the two options can be empty. + fn merge_join_fields<'a>( + &'a mut self, + _value: &'a VariantObject<'a, 'a>, + ) -> impl Iterator< + Item = ( + Option<&'a Variant<'a, 'a>>, + &'a str, + Option<&'a mut dyn VariantShreddingRowBuilder>, + ), + > { + std::iter::empty() + } +} + +impl VariantShreddingRowBuilder for VariantShreddingStructRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.append_null_typed_value()?; + self.value_builder.append_null()?; + self.nulls.append_null(); + Ok(()) + } + + #[allow(unused_assignments)] + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { + // If it's not even an object, just append the whole thing to the child value builder. + let Variant::Object(value) = value else { + self.append_null_typed_value()?; + self.value_builder.append_value(value)?; + self.nulls.append_non_null(); + return Ok(false); + }; + + let mut found_value_field = false; + for (input_value_field, _field_name, typed_value_builder) in self.merge_join_fields(value) { + match (input_value_field, typed_value_builder) { + (Some(input_value_field), Some(typed_value_builder)) => { + // The field is part of the shredding schema, so the output `value` object must + // NOT include it. The child builder handles any field shredding failure. + typed_value_builder.append_value(input_value_field)?; + } + (Some(_input_value_field), None) => { + // The field is not part of the shredding schema, so copy the field's value + // bytes over unchanged to the output `value` object. + found_value_field = true; + todo!() + } + (None, Some(typed_value_builder)) => { + // The field is part of the shredding schema, but the input does not have it. + typed_value_builder.append_null()?; + } + // NOTE: Every row of an outer join must include at least one of left or right side. + (None, None) => unreachable!(), + } + } + + // Finish the value builder, if non-empty. + if found_value_field { + #[allow(unreachable_code)] + self.value_builder.append_value(todo!())?; + } else { + self.value_builder.append_null()?; + } + + // The typed_value row is valid even if all its fields are NULL. + self.typed_value_nulls.append_non_null(); + Ok(true) + } + + fn finish(&mut self) -> Result { + let mut fields = Vec::with_capacity(self.typed_value_field_builders.len()); + let mut arrays = Vec::with_capacity(self.typed_value_field_builders.len()); + for (field, mut builder) in std::mem::take(&mut self.typed_value_field_builders) { + fields.push(field); + arrays.push(builder.finish()?); + } + let typed_value = Arc::new(arrow::array::StructArray::try_new( + fields.into(), + arrays, + self.typed_value_nulls.finish(), + )?); + + let value = self.value_builder.finish()?; + let Some(value) = value.as_byte_view_opt() else { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: value builder must produce a BinaryViewArray".to_string(), + )); + }; + Ok(Arc::new(crate::VariantArray::from_parts( + self.metadata.clone(), + Some(value.clone()), // TODO: How to consume an ArrayRef directly? + Some(typed_value), + self.nulls.finish(), + ))) + } +} diff --git a/parquet-variant-compute/src/variant_get/output/variant.rs b/parquet-variant-compute/src/variant_get/output/variant.rs index 2c04111a5306..d268ee40e9cc 100644 --- a/parquet-variant-compute/src/variant_get/output/variant.rs +++ b/parquet-variant-compute/src/variant_get/output/variant.rs @@ -24,6 +24,7 @@ use parquet_variant::{Variant, VariantPath}; use std::sync::Arc; /// Outputs VariantArrays +#[allow(unused)] pub(super) struct VariantOutputBuilder<'a> { /// What path to extract path: VariantPath<'a>, diff --git a/parquet-variant/src/path.rs b/parquet-variant/src/path.rs index 3ba50da3285e..192a7b54667a 100644 --- a/parquet-variant/src/path.rs +++ b/parquet-variant/src/path.rs @@ -109,6 +109,12 @@ impl<'a> From for VariantPath<'a> { } } +impl<'a> From<&[VariantPathElement<'a>]> for VariantPath<'a> { + fn from(elements: &[VariantPathElement<'a>]) -> Self { + VariantPath::new(elements.to_vec()) + } +} + /// Create from iter impl<'a> FromIterator> for VariantPath<'a> { fn from_iter>>(iter: T) -> Self {