diff --git a/Cargo.lock b/Cargo.lock index 8301e301a3..50274188f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow2" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d88028fc4bd0aff3be713e067d0f271d3019122dde59de453059a3818b1bee08" +dependencies = [ + "chrono", + "hash_hasher", + "num-traits", +] + [[package]] name = "async-channel" version = "1.6.1" @@ -511,6 +522,7 @@ version = "0.2.0" dependencies = [ "anyhow", "arrow", + "arrow2", "bb8", "bb8-tiberius", "chrono", @@ -1254,6 +1266,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3" +[[package]] +name = "hash_hasher" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74721d007512d0cb3338cd20f0654ac913920061a4c4d0d8708edb3f2a698c0c" + [[package]] name = "hashbrown" version = "0.6.3" diff --git a/connectorx/Cargo.toml b/connectorx/Cargo.toml index a3a02db3e4..00fe0a47e1 100644 --- a/connectorx/Cargo.toml +++ b/connectorx/Cargo.toml @@ -19,6 +19,7 @@ sqlparser = "0.9" thiserror = "1" arrow = {version = "4", optional = true} +arrow2 = {version = "0.5", optional = true, no-default-features = true } bb8 = {version = "0.7", optional = true} bb8-tiberius = {version = "0.5", optional = true} chrono = {version = "0.4", optional = true} @@ -63,6 +64,7 @@ all = ["src_sqlite", "src_postgres", "src_mysql", "src_mssql", "src_oracle", "sr branch = [] default = ["fptr"] dst_arrow = ["arrow", "chrono"] +dst_arrow2 = ["arrow2", "chrono"] dst_polars = ["polars", "dst_arrow"] fptr = [] src_csv = ["csv", "regex", "chrono"] diff --git a/connectorx/src/destinations/arrow2/arrow_assoc.rs b/connectorx/src/destinations/arrow2/arrow_assoc.rs new file mode 100644 index 0000000000..b51aa685f3 --- /dev/null +++ b/connectorx/src/destinations/arrow2/arrow_assoc.rs @@ -0,0 +1,381 @@ +use arrow2::{ + array::*, + datatypes::{DataType as ArrowDataType, Field, TimeUnit}, +}; +use chrono::{Date, DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc}; + +use crate::constants::SECONDS_IN_DAY; + +/// Associate arrow builder with native type +pub trait ArrowAssoc { + type Builder: MutableArray + 'static + Send; + + fn builder(nrows: usize) -> Self::Builder; + fn push(builder: &mut Self::Builder, value: Self); + fn field(header: &str) -> Field; +} + +macro_rules! impl_arrow_assoc { + ($T:ty, $AT:expr, $B:ty) => { + impl ArrowAssoc for $T { + type Builder = $B; + + fn builder(nrows: usize) -> Self::Builder { + Self::Builder::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Self) { + builder.push(Some(value)); + } + + fn field(header: &str) -> Field { + Field::new(header, $AT, false) + } + } + + impl ArrowAssoc for Option<$T> { + type Builder = $B; + + fn builder(nrows: usize) -> Self::Builder { + Self::Builder::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Self) { + builder.push(value); + } + + fn field(header: &str) -> Field { + Field::new(header, $AT, true) + } + } + }; +} + +impl_arrow_assoc!(u32, ArrowDataType::UInt32, MutablePrimitiveArray); +impl_arrow_assoc!(u64, ArrowDataType::UInt64, MutablePrimitiveArray); +impl_arrow_assoc!(i32, ArrowDataType::Int32, MutablePrimitiveArray); +impl_arrow_assoc!(i64, ArrowDataType::Int64, MutablePrimitiveArray); +impl_arrow_assoc!(f32, ArrowDataType::Float32, MutablePrimitiveArray); +impl_arrow_assoc!(f64, ArrowDataType::Float64, MutablePrimitiveArray); +impl_arrow_assoc!(bool, ArrowDataType::Boolean, MutableBooleanArray); + +impl ArrowAssoc for &str { + type Builder = MutableUtf8Array; + + fn builder(nrows: usize) -> Self::Builder { + MutableUtf8Array::::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Self) { + builder.push(Some(value)); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::LargeUtf8, false) + } +} + +impl ArrowAssoc for Option<&str> { + type Builder = MutableUtf8Array; + + fn builder(nrows: usize) -> Self::Builder { + MutableUtf8Array::::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Self) { + builder.push(value); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::LargeUtf8, true) + } +} + +impl ArrowAssoc for String { + type Builder = MutableUtf8Array; + + fn builder(nrows: usize) -> Self::Builder { + MutableUtf8Array::::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: String) { + builder.push(Some(value)); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::LargeUtf8, false) + } +} + +impl ArrowAssoc for Option { + type Builder = MutableUtf8Array; + + fn builder(nrows: usize) -> Self::Builder { + MutableUtf8Array::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Self) { + builder.push(value); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::LargeUtf8, true) + } +} + +impl ArrowAssoc for DateTime { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp( + TimeUnit::Nanosecond, + Some("UTC".to_string()), + )) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: DateTime) { + builder.push(Some(value).map(|x| x.timestamp_nanos())); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())), + true, + ) + } +} + +impl ArrowAssoc for Option> { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp( + TimeUnit::Nanosecond, + Some("UTC".to_string()), + )) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Option>) { + builder.push(value.map(|x| x.timestamp_nanos())); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())), + false, + ) + } +} + +impl ArrowAssoc for Date { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp( + TimeUnit::Second, + Some("UTC".to_string()), + )) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Date) { + builder.push(Some(value).map(|x| x.and_hms(0, 0, 0).timestamp())) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Second, Some("UTC".to_string())), + false, + ) + } +} + +impl ArrowAssoc for Option> { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp( + TimeUnit::Second, + Some("UTC".to_string()), + )) + } + + fn push(builder: &mut Self::Builder, value: Option>) { + // time-aware are stored as naive (i64) + offset (on the datatype) + builder.push(value.map(|x| x.and_hms(0, 0, 0).timestamp())) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Second, Some("UTC".to_string())), + false, + ) + } +} + +fn naive_date_to_date32(nd: NaiveDate) -> i32 { + (nd.and_hms(0, 0, 0).timestamp() / SECONDS_IN_DAY) as i32 +} + +fn naive_time_to_time64_nanos(nd: NaiveTime) -> i64 { + nd.num_seconds_from_midnight() as i64 * 1_000_000_000 + nd.nanosecond() as i64 +} + +impl ArrowAssoc for Option { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Date32) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Option) { + builder.push(value.map(naive_date_to_date32)); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Date32, true) + } +} + +impl ArrowAssoc for NaiveDate { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Date32) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: NaiveDate) { + builder.push(Some(naive_date_to_date32(value))); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Date32, false) + } +} + +impl ArrowAssoc for Option { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + // naive => None + MutablePrimitiveArray::with_capacity(nrows) + .to(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Option) { + builder.push(value.map(|x| x.timestamp_nanos())); + } + + fn field(header: &str) -> Field { + // naive => None + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ) + } +} + +impl ArrowAssoc for NaiveDateTime { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + // naive => None + MutablePrimitiveArray::with_capacity(nrows) + .to(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)) + } + + fn push(builder: &mut Self::Builder, value: NaiveDateTime) { + builder.push(Some(value).map(|x| x.timestamp_nanos())); + } + + fn field(header: &str) -> Field { + // naive => None + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ) + } +} + +impl ArrowAssoc for Option { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Time64(TimeUnit::Nanosecond)) + } + + fn push(builder: &mut Self::Builder, value: Option) { + builder.push(value.map(naive_time_to_time64_nanos)); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), true) + } +} + +impl ArrowAssoc for NaiveTime { + type Builder = MutablePrimitiveArray; + + fn builder(nrows: usize) -> Self::Builder { + MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Time64(TimeUnit::Nanosecond)) + } + + fn push(builder: &mut Self::Builder, value: NaiveTime) { + builder.push(Some(value).map(naive_time_to_time64_nanos)); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), false) + } +} + +impl ArrowAssoc for Option> { + type Builder = MutableBinaryArray; + + fn builder(nrows: usize) -> Self::Builder { + MutableBinaryArray::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Self) { + builder.push(value); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::LargeBinary, true) + } +} + +impl ArrowAssoc for Vec { + type Builder = MutableBinaryArray; + + fn builder(nrows: usize) -> Self::Builder { + MutableBinaryArray::with_capacity(nrows) + } + + #[inline] + fn push(builder: &mut Self::Builder, value: Self) { + builder.push(Some(value)); + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::LargeBinary, false) + } +} diff --git a/connectorx/src/destinations/arrow2/errors.rs b/connectorx/src/destinations/arrow2/errors.rs new file mode 100644 index 0000000000..5c7d04b4bc --- /dev/null +++ b/connectorx/src/destinations/arrow2/errors.rs @@ -0,0 +1,16 @@ +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Error, Debug)] +pub enum ArrowDestinationError { + #[error(transparent)] + ArrowError(#[from] arrow2::error::ArrowError), + + #[error(transparent)] + ConnectorXError(#[from] crate::errors::ConnectorXError), + + /// Any other errors that are too trivial to be put here explicitly. + #[error(transparent)] + Other(#[from] anyhow::Error), +} diff --git a/connectorx/src/destinations/arrow2/funcs.rs b/connectorx/src/destinations/arrow2/funcs.rs new file mode 100644 index 0000000000..ab09bfcc54 --- /dev/null +++ b/connectorx/src/destinations/arrow2/funcs.rs @@ -0,0 +1,75 @@ +use super::arrow_assoc::ArrowAssoc; +use super::Builder; +use crate::errors::Result; +use crate::typesystem::{ParameterizedFunc, ParameterizedOn}; +use anyhow::anyhow; +use arrow2::array::*; +use arrow2::datatypes::Field; + +pub struct FNewBuilder; + +impl ParameterizedFunc for FNewBuilder { + type Function = fn(nrows: usize) -> Builder; +} + +impl ParameterizedOn for FNewBuilder +where + T: ArrowAssoc, +{ + fn parameterize() -> Self::Function { + fn imp(nrows: usize) -> Builder + where + T: ArrowAssoc, + { + Box::new(T::builder(nrows)) as Builder + } + imp:: + } +} + +pub struct FFinishBuilder; + +impl ParameterizedFunc for FFinishBuilder { + type Function = fn(Builder) -> Result; +} + +impl ParameterizedOn for FFinishBuilder +where + T: ArrowAssoc, +{ + fn parameterize() -> Self::Function { + fn imp(mut builder: Builder) -> Result + where + T: ArrowAssoc, + { + Ok(MutableArray::as_arc( + builder + .as_mut_any() + .downcast_mut::() + .ok_or_else(|| anyhow!("cannot cast arrow builder for finish"))?, + )) + } + imp:: + } +} + +pub struct FNewField; + +impl ParameterizedFunc for FNewField { + type Function = fn(header: &str) -> Field; +} + +impl ParameterizedOn for FNewField +where + T: ArrowAssoc, +{ + fn parameterize() -> Self::Function { + fn imp(header: &str) -> Field + where + T: ArrowAssoc, + { + T::field(header) + } + imp:: + } +} diff --git a/connectorx/src/destinations/arrow2/mod.rs b/connectorx/src/destinations/arrow2/mod.rs new file mode 100644 index 0000000000..bce3c603b3 --- /dev/null +++ b/connectorx/src/destinations/arrow2/mod.rs @@ -0,0 +1,188 @@ +//! Destination implementation for Arrow2. + +use std::sync::Arc; + +mod arrow_assoc; +mod errors; +mod funcs; +pub mod typesystem; + +use anyhow::anyhow; +use fehler::throw; +use fehler::throws; + +use arrow2::array::MutableArray; +use arrow2::datatypes::Schema; +use arrow2::record_batch::RecordBatch; + +use crate::data_order::DataOrder; +use crate::typesystem::{Realize, TypeAssoc, TypeSystem}; + +use super::{Consume, Destination, DestinationPartition}; + +use arrow_assoc::ArrowAssoc; +pub use errors::{ArrowDestinationError, Result}; +use funcs::{FFinishBuilder, FNewBuilder, FNewField}; +pub use typesystem::ArrowTypeSystem; + +type Builder = Box; +type Builders = Vec; + +pub struct ArrowDestination { + nrows: usize, + schema: Vec, + names: Vec, + builders: Vec, +} + +impl Default for ArrowDestination { + fn default() -> Self { + ArrowDestination { + nrows: 0, + schema: vec![], + builders: vec![], + names: vec![], + } + } +} + +impl ArrowDestination { + pub fn new() -> Self { + Self::default() + } +} + +impl Destination for ArrowDestination { + const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::ColumnMajor, DataOrder::RowMajor]; + type TypeSystem = ArrowTypeSystem; + type Partition<'a> = ArrowPartitionWriter<'a>; + type Error = ArrowDestinationError; + + #[throws(ArrowDestinationError)] + fn allocate>( + &mut self, + nrows: usize, + names: &[S], + schema: &[ArrowTypeSystem], + data_order: DataOrder, + ) { + // todo: support colmajor + if !matches!(data_order, DataOrder::RowMajor) { + throw!(crate::errors::ConnectorXError::UnsupportedDataOrder( + data_order + )) + } + // cannot really create the builders since do not know each partition size here + self.nrows = nrows; + self.schema = schema.to_vec(); + self.names = names.iter().map(|n| n.as_ref().to_string()).collect(); + } + + #[throws(ArrowDestinationError)] + fn partition(&mut self, counts: &[usize]) -> Vec> { + assert_eq!(counts.iter().sum::(), self.nrows); + assert_eq!(self.builders.len(), 0); + + for &c in counts { + let builders = self + .schema + .iter() + .map(|&dt| Ok(Realize::::realize(dt)?(c))) + .collect::>>()?; + + self.builders.push(builders); + } + + let schema = self.schema.clone(); + self.builders + .iter_mut() + .zip(counts) + .map(|(builders, &c)| ArrowPartitionWriter::new(schema.clone(), builders, c)) + .collect() + } + + fn schema(&self) -> &[ArrowTypeSystem] { + self.schema.as_slice() + } +} + +impl ArrowDestination { + #[throws(ArrowDestinationError)] + pub fn arrow(self) -> Vec { + assert_eq!(self.schema.len(), self.names.len()); + let fields = self + .schema + .iter() + .zip(self.names) + .map(|(&dt, h)| Ok(Realize::::realize(dt)?(h.as_str()))) + .collect::>>()?; + + let arrow_schema = Arc::new(Schema::new(fields)); + let schema = self.schema.clone(); + self.builders + .into_iter() + .map(|pbuilder| { + let columns = pbuilder + .into_iter() + .zip(schema.iter()) + .map(|(builder, &dt)| Realize::::realize(dt)?(builder)) + .collect::, crate::errors::ConnectorXError>>()?; + Ok(RecordBatch::try_new(Arc::clone(&arrow_schema), columns)?) + }) + .collect::>>()? + } +} + +pub struct ArrowPartitionWriter<'a> { + nrows: usize, + schema: Vec, + builders: &'a mut Builders, + current_col: usize, +} + +impl<'a> ArrowPartitionWriter<'a> { + fn new(schema: Vec, builders: &'a mut Builders, nrows: usize) -> Self { + ArrowPartitionWriter { + nrows, + schema, + builders, + current_col: 0, + } + } +} + +impl<'a> DestinationPartition<'a> for ArrowPartitionWriter<'a> { + type TypeSystem = ArrowTypeSystem; + type Error = ArrowDestinationError; + + fn nrows(&self) -> usize { + self.nrows + } + + fn ncols(&self) -> usize { + self.schema.len() + } +} + +impl<'a, T> Consume for ArrowPartitionWriter<'a> +where + T: TypeAssoc<>::TypeSystem> + ArrowAssoc + 'static, +{ + type Error = ArrowDestinationError; + + #[throws(ArrowDestinationError)] + fn consume(&mut self, value: T) { + let col = self.current_col; + self.current_col = (self.current_col + 1) % self.ncols(); + + self.schema[col].check::()?; + + ::push( + self.builders[col] + .as_mut_any() + .downcast_mut::() + .ok_or_else(|| anyhow!("cannot cast arrow builder for append"))?, + value, + ); + } +} diff --git a/connectorx/src/destinations/arrow2/typesystem.rs b/connectorx/src/destinations/arrow2/typesystem.rs new file mode 100644 index 0000000000..a6997a2ba2 --- /dev/null +++ b/connectorx/src/destinations/arrow2/typesystem.rs @@ -0,0 +1,38 @@ +use crate::impl_typesystem; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum ArrowTypeSystem { + Int32(bool), + Int64(bool), + UInt32(bool), + UInt64(bool), + Float32(bool), + Float64(bool), + Boolean(bool), + LargeUtf8(bool), + LargeBinary(bool), + Date32(bool), + Date64(bool), + Time64(bool), + DateTimeTz(bool), +} + +impl_typesystem! { + system = ArrowTypeSystem, + mappings = { + { Int32 => i32 } + { Int64 => i64 } + { UInt32 => u32 } + { UInt64 => u64 } + { Float64 => f64 } + { Float32 => f32 } + { Boolean => bool } + { LargeUtf8 => String } + { LargeBinary => Vec } + { Date32 => NaiveDate } + { Date64 => NaiveDateTime } + { Time64 => NaiveTime } + { DateTimeTz => DateTime } + } +} diff --git a/connectorx/src/destinations/mod.rs b/connectorx/src/destinations/mod.rs index 064e19da1c..4369ad8268 100644 --- a/connectorx/src/destinations/mod.rs +++ b/connectorx/src/destinations/mod.rs @@ -4,6 +4,9 @@ #[cfg(feature = "dst_arrow")] pub mod arrow; +#[cfg(feature = "dst_arrow2")] +pub mod arrow2; + use crate::data_order::DataOrder; use crate::errors::ConnectorXError; use crate::typesystem::{TypeAssoc, TypeSystem};