diff --git a/Cargo.lock b/Cargo.lock index 8301e301a3..14e73dd851 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -568,6 +568,7 @@ dependencies = [ "fehler", "iai", "itertools", + "lazy_static", "libc", "log", "ndarray", diff --git a/Justfile b/Justfile index a973abb0df..3c94048114 100644 --- a/Justfile +++ b/Justfile @@ -1,3 +1,5 @@ +set dotenv-load := true + build-release: cargo build --release diff --git a/connectorx-python/Cargo.toml b/connectorx-python/Cargo.toml index 253ae56fcd..ea5e581f67 100644 --- a/connectorx-python/Cargo.toml +++ b/connectorx-python/Cargo.toml @@ -37,6 +37,7 @@ tokio = {version = "1", features = ["rt", "rt-multi-thread", "net"]} tokio-util = "0.6" url = "2" uuid = "0.8" +lazy_static = "1.4.0" [build-dependencies] built = {version = "0.5", features = ["chrono"]} diff --git a/connectorx-python/connectorx/tests/test_postgres.py b/connectorx-python/connectorx/tests/test_postgres.py index 98b3ae15d1..e23b06bda7 100644 --- a/connectorx-python/connectorx/tests/test_postgres.py +++ b/connectorx-python/connectorx/tests/test_postgres.py @@ -318,8 +318,9 @@ def test_read_sql_on_utf8(postgres_url: str) -> None: def test_types_binary(postgres_url: str) -> None: - query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum FROM test_types" - df = read_sql(postgres_url, query) + query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum, test_farray, test_iarray FROM test_types" + df = read_sql(postgres_url, query, + partition_on="test_int16", partition_num=3) expected = pd.DataFrame( index=range(4), data={ @@ -367,14 +368,18 @@ def test_types_binary(postgres_url: str) -> None: "test_enum": pd.Series( ["happy", "very happy", "ecstatic", "ecstatic"], dtype="object" ), + "test_farray": pd.Series([[], None, [0.0123], [0.000234, -12.987654321]], dtype="object"), + "test_iarray": pd.Series([[-1, 0, 1123], [], [-324324], None], dtype="object"), }, ) + print(df) assert_frame_equal(df, expected, check_names=True) def test_types_csv(postgres_url: str) -> None: - query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text FROM test_types" - df = read_sql(postgres_url, query, protocol="csv") + query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_farray, test_iarray FROM test_types" + df = read_sql(postgres_url, query, protocol="csv", + partition_on="test_int16", partition_num=2) expected = pd.DataFrame( index=range(4), data={ @@ -422,14 +427,17 @@ def test_types_csv(postgres_url: str) -> None: "test_enum": pd.Series( ["happy", "very happy", "ecstatic", "ecstatic"], dtype="object" ), + "test_farray": pd.Series([[], None, [0.0123], [0.000234, -12.987654321]], dtype="object"), + "test_iarray": pd.Series([[-1, 0, 1123], [], [-324324], None], dtype="object"), }, ) assert_frame_equal(df, expected, check_names=True) def test_types_cursor(postgres_url: str) -> None: - query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text FROM test_types" - df = read_sql(postgres_url, query, protocol="cursor") + query = "SELECT test_int16, test_char, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum::text, test_farray, test_iarray FROM test_types" + df = read_sql(postgres_url, query, protocol="cursor", + partition_on="test_int16", partition_num=4) expected = pd.DataFrame( index=range(4), data={ @@ -477,6 +485,8 @@ def test_types_cursor(postgres_url: str) -> None: "test_enum": pd.Series( ["happy", "very happy", "ecstatic", "ecstatic"], dtype="object" ), + "test_farray": pd.Series([[], None, [0.0123], [0.000234, -12.987654321]], dtype="object"), + "test_iarray": pd.Series([[-1, 0, 1123], [], [-324324], None], dtype="object"), }, ) assert_frame_equal(df, expected, check_names=True) diff --git a/connectorx-python/src/lib.rs b/connectorx-python/src/lib.rs index cd8472f4a4..6bce953d0e 100644 --- a/connectorx-python/src/lib.rs +++ b/connectorx-python/src/lib.rs @@ -11,6 +11,9 @@ use pyo3::prelude::*; use pyo3::{wrap_pyfunction, PyResult}; use std::sync::Once; +#[macro_use] +extern crate lazy_static; + static START: Once = Once::new(); // https://github.com/PyO3/pyo3-built/issues/21 diff --git a/connectorx-python/src/pandas/destination.rs b/connectorx-python/src/pandas/destination.rs index 07f21e94e1..b0c056396d 100644 --- a/connectorx-python/src/pandas/destination.rs +++ b/connectorx-python/src/pandas/destination.rs @@ -1,7 +1,7 @@ use super::{ pandas_columns::{ - BooleanBlock, BytesBlock, DateTimeBlock, Float64Block, HasPandasColumn, Int64Block, - PandasColumn, PandasColumnObject, PyBytes, StringBlock, + ArrayBlock, BooleanBlock, BytesBlock, DateTimeBlock, Float64Block, HasPandasColumn, + Int64Block, PandasColumn, PandasColumnObject, PyBytes, StringBlock, }, pystring::PyString, typesystem::{PandasArrayType, PandasBlockType, PandasTypeSystem}, @@ -158,6 +158,12 @@ impl<'a> Destination for PandasDestination<'a> { PandasBlockType::Float64 => { self.allocate_array::(dt, placement)?; } + PandasBlockType::Float64Array => { + self.allocate_array::(dt, placement)?; + } + PandasBlockType::Int64Array => { + self.allocate_array::(dt, placement)?; + } PandasBlockType::String => { self.allocate_array::(dt, placement)?; } @@ -210,6 +216,28 @@ impl<'a> Destination for PandasDestination<'a> { .collect() } } + PandasBlockType::Float64Array => { + let fblock = ArrayBlock::::extract(buf)?; + let fcols = fblock.split()?; + for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { + partitioned_columns[cid] = fcol + .partition(&counts) + .into_iter() + .map(|c| Box::new(c) as _) + .collect() + } + } + PandasBlockType::Int64Array => { + let fblock = ArrayBlock::::extract(buf)?; + let fcols = fblock.split()?; + for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { + partitioned_columns[cid] = fcol + .partition(&counts) + .into_iter() + .map(|c| Box::new(c) as _) + .collect() + } + } PandasBlockType::Int64(_) => { let ublock = Int64Block::extract(buf)?; let ucols = ublock.split()?; diff --git a/connectorx-python/src/pandas/pandas_columns/array.rs b/connectorx-python/src/pandas/pandas_columns/array.rs new file mode 100644 index 0000000000..f4b5680ad0 --- /dev/null +++ b/connectorx-python/src/pandas/pandas_columns/array.rs @@ -0,0 +1,227 @@ +use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX}; +use crate::errors::ConnectorXPythonError; +use anyhow::anyhow; +use fehler::throws; +use ndarray::{ArrayViewMut2, Axis, Ix2}; +use numpy::{npyffi::NPY_TYPES, Element, PyArray, PyArrayDescr}; +use pyo3::{FromPyObject, Py, PyAny, PyResult, Python, ToPyObject}; +use std::any::TypeId; +use std::marker::PhantomData; + +#[derive(Clone)] +#[repr(transparent)] +pub struct PyList(Py); + +// In order to put it into a numpy array +impl Element for PyList { + const DATA_TYPE: numpy::DataType = numpy::DataType::Object; + fn is_same_type(dtype: &PyArrayDescr) -> bool { + unsafe { *dtype.as_dtype_ptr() }.type_num == NPY_TYPES::NPY_OBJECT as i32 + } +} + +pub struct ArrayBlock<'a, V> { + data: ArrayViewMut2<'a, PyList>, + buf_size_mb: usize, + _value_type: PhantomData, +} + +impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> { + fn extract(ob: &'a PyAny) -> PyResult { + check_dtype(ob, "object")?; + let array = ob.downcast::>()?; + let data = unsafe { array.as_array_mut() }; + Ok(ArrayBlock:: { + data, + buf_size_mb: 16, // in MB + _value_type: PhantomData, + }) + } +} + +impl<'a, V> ArrayBlock<'a, V> { + #[throws(ConnectorXPythonError)] + pub fn split(self) -> Vec> { + let mut ret = vec![]; + let mut view = self.data; + + let nrows = view.ncols(); + while view.nrows() > 0 { + let (col, rest) = view.split_at(Axis(0), 1); + view = rest; + ret.push(ArrayColumn:: { + data: col + .into_shape(nrows)? + .into_slice() + .ok_or_else(|| anyhow!("get None for splitted FloatArray data"))?, + next_write: 0, + lengths: vec![], + buffer: Vec::with_capacity(self.buf_size_mb * (1 << 17) * 11 / 10), // allocate a little bit more memory to avoid Vec growth + buf_size: self.buf_size_mb * (1 << 17), + }) + } + ret + } +} + +pub struct ArrayColumn<'a, V> { + data: &'a mut [PyList], + next_write: usize, + buffer: Vec, + lengths: Vec, // usize::MAX if the string is None + buf_size: usize, +} + +impl<'a, V> PandasColumnObject for ArrayColumn<'a, V> +where + V: Send + ToPyObject, +{ + fn typecheck(&self, id: TypeId) -> bool { + id == TypeId::of::() || id == TypeId::of::>() + } + fn len(&self) -> usize { + self.data.len() + } + fn typename(&self) -> &'static str { + std::any::type_name::() + } + + #[throws(ConnectorXPythonError)] + fn finalize(&mut self) { + self.flush()?; + } +} + +impl<'a> PandasColumn> for ArrayColumn<'a, f64> { + #[throws(ConnectorXPythonError)] + fn write(&mut self, val: Vec) { + self.lengths.push(val.len()); + self.buffer.extend_from_slice(&val[..]); + self.try_flush()?; + } +} + +impl<'a> PandasColumn>> for ArrayColumn<'a, f64> { + #[throws(ConnectorXPythonError)] + fn write(&mut self, val: Option>) { + match val { + Some(v) => { + self.lengths.push(v.len()); + self.buffer.extend_from_slice(&v[..]); + self.try_flush()?; + } + None => { + self.lengths.push(usize::MAX); + } + } + } +} + +impl<'a> PandasColumn> for ArrayColumn<'a, i64> { + #[throws(ConnectorXPythonError)] + fn write(&mut self, val: Vec) { + self.lengths.push(val.len()); + self.buffer.extend_from_slice(&val[..]); + self.try_flush()?; + } +} + +impl<'a> PandasColumn>> for ArrayColumn<'a, i64> { + #[throws(ConnectorXPythonError)] + fn write(&mut self, val: Option>) { + match val { + Some(v) => { + self.lengths.push(v.len()); + self.buffer.extend_from_slice(&v[..]); + self.try_flush()?; + } + None => { + self.lengths.push(usize::MAX); + } + } + } +} + +impl HasPandasColumn for Vec { + type PandasColumn<'a> = ArrayColumn<'a, f64>; +} + +impl HasPandasColumn for Option> { + type PandasColumn<'a> = ArrayColumn<'a, f64>; +} + +impl HasPandasColumn for Vec { + type PandasColumn<'a> = ArrayColumn<'a, i64>; +} + +impl HasPandasColumn for Option> { + type PandasColumn<'a> = ArrayColumn<'a, i64>; +} +impl<'a, V> ArrayColumn<'a, V> +where + V: Send + ToPyObject, +{ + pub fn partition(self, counts: &[usize]) -> Vec> { + let mut partitions = vec![]; + let mut data = self.data; + + for &c in counts { + let (splitted, rest) = data.split_at_mut(c); + data = rest; + partitions.push(ArrayColumn { + data: splitted, + next_write: 0, + lengths: vec![], + buffer: Vec::with_capacity(self.buf_size), + buf_size: self.buf_size, + }); + } + partitions + } + + #[throws(ConnectorXPythonError)] + pub fn flush(&mut self) { + let nvecs = self.lengths.len(); + + if nvecs > 0 { + let py = unsafe { Python::assume_gil_acquired() }; + + { + // allocation in python is not thread safe + let _guard = GIL_MUTEX + .lock() + .map_err(|e| anyhow!("mutex poisoned {}", e))?; + let mut start = 0; + for (i, &len) in self.lengths.iter().enumerate() { + if len != usize::MAX { + let end = start + len; + unsafe { + // allocate and write in the same time + *self.data.get_unchecked_mut(self.next_write + i) = PyList( + pyo3::types::PyList::new(py, &self.buffer[start..end]).into(), + ); + }; + start = end; + } else { + unsafe { + let n: &pyo3::types::PyList = + py.from_borrowed_ptr(pyo3::ffi::Py_None()); + *self.data.get_unchecked_mut(self.next_write + i) = PyList(n.into()); + } + } + } + } + + self.buffer.truncate(0); + self.lengths.truncate(0); + self.next_write += nvecs; + } + } + + #[throws(ConnectorXPythonError)] + pub fn try_flush(&mut self) { + if self.buffer.len() >= self.buf_size { + self.flush()?; + } + } +} diff --git a/connectorx-python/src/pandas/pandas_columns/bytes.rs b/connectorx-python/src/pandas/pandas_columns/bytes.rs index 7546581a11..c6e0619dc4 100644 --- a/connectorx-python/src/pandas/pandas_columns/bytes.rs +++ b/connectorx-python/src/pandas/pandas_columns/bytes.rs @@ -1,4 +1,4 @@ -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject}; +use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; @@ -6,7 +6,6 @@ use ndarray::{ArrayViewMut2, Axis, Ix2}; use numpy::{npyffi::NPY_TYPES, Element, PyArray, PyArrayDescr}; use pyo3::{FromPyObject, Py, PyAny, PyResult, Python}; use std::any::TypeId; -use std::sync::{Arc, Mutex}; #[derive(Clone)] #[repr(transparent)] @@ -22,7 +21,6 @@ impl Element for PyBytes { pub struct BytesBlock<'a> { data: ArrayViewMut2<'a, PyBytes>, - mutex: Arc>, buf_size_mb: usize, } @@ -33,8 +31,7 @@ impl<'a> FromPyObject<'a> for BytesBlock<'a> { let data = unsafe { array.as_array_mut() }; Ok(BytesBlock { data, - mutex: Arc::new(Mutex::new(())), // allocate the lock here since only BytesBlock needs to aquire the GIL for now - buf_size_mb: 16, // in MB + buf_size_mb: 16, // in MB }) } } @@ -58,7 +55,6 @@ impl<'a> BytesBlock<'a> { bytes_lengths: vec![], bytes_buf: Vec::with_capacity(self.buf_size_mb * (1 << 20) * 11 / 10), // allocate a little bit more memory to avoid Vec growth buf_size: self.buf_size_mb * (1 << 20), - mutex: self.mutex.clone(), }) } ret @@ -71,7 +67,6 @@ pub struct BytesColumn<'a> { bytes_buf: Vec, bytes_lengths: Vec, // usize::MAX if the string is None buf_size: usize, - mutex: Arc>, } impl<'a> PandasColumnObject for BytesColumn<'a> { @@ -171,7 +166,6 @@ impl<'a> BytesColumn<'a> { bytes_lengths: vec![], bytes_buf: Vec::with_capacity(self.buf_size), buf_size: self.buf_size, - mutex: self.mutex.clone(), }); } @@ -187,8 +181,7 @@ impl<'a> BytesColumn<'a> { { // allocation in python is not thread safe - let _guard = self - .mutex + let _guard = GIL_MUTEX .lock() .map_err(|e| anyhow!("mutex poisoned {}", e))?; let mut start = 0; @@ -214,6 +207,7 @@ impl<'a> BytesColumn<'a> { } self.bytes_buf.truncate(0); + self.bytes_lengths.truncate(0); self.next_write += nstrings; } } diff --git a/connectorx-python/src/pandas/pandas_columns/mod.rs b/connectorx-python/src/pandas/pandas_columns/mod.rs index 8ff80d1696..6203081651 100644 --- a/connectorx-python/src/pandas/pandas_columns/mod.rs +++ b/connectorx-python/src/pandas/pandas_columns/mod.rs @@ -1,3 +1,4 @@ +mod array; mod boolean; mod bytes; mod datetime; @@ -7,6 +8,7 @@ mod string; // TODO: use macro for integers use crate::errors::Result; +pub use crate::pandas::pandas_columns::array::{ArrayBlock, ArrayColumn, PyList}; pub use crate::pandas::pandas_columns::bytes::{BytesBlock, BytesColumn, PyBytes}; pub use boolean::{BooleanBlock, BooleanColumn}; pub use datetime::{DateTimeBlock, DateTimeColumn}; @@ -15,8 +17,14 @@ pub use float64::{Float64Block, Float64Column}; pub use int64::{Int64Block, Int64Column}; use pyo3::{exceptions::PyRuntimeError, PyAny, PyResult}; use std::any::TypeId; +use std::sync::Mutex; pub use string::{StringBlock, StringColumn}; +// A global GIL lock for Python object allocations like string, bytes and list +lazy_static! { + static ref GIL_MUTEX: Mutex<()> = Mutex::new(()); +} + pub trait PandasColumnObject: Send { fn typecheck(&self, _: TypeId) -> bool; fn typename(&self) -> &'static str; diff --git a/connectorx-python/src/pandas/pandas_columns/string.rs b/connectorx-python/src/pandas/pandas_columns/string.rs index 5835ed84a5..8dc2c03195 100644 --- a/connectorx-python/src/pandas/pandas_columns/string.rs +++ b/connectorx-python/src/pandas/pandas_columns/string.rs @@ -1,5 +1,5 @@ use super::super::pystring::{PyString, StringInfo}; -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject}; +use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; @@ -8,11 +8,9 @@ use ndarray::{ArrayViewMut2, Axis, Ix2}; use numpy::PyArray; use pyo3::{FromPyObject, PyAny, PyResult, Python}; use std::any::TypeId; -use std::sync::{Arc, Mutex}; pub struct StringBlock<'a> { data: ArrayViewMut2<'a, PyString>, - mutex: Arc>, buf_size_mb: usize, } @@ -23,8 +21,7 @@ impl<'a> FromPyObject<'a> for StringBlock<'a> { let data = unsafe { array.as_array_mut() }; Ok(StringBlock { data, - mutex: Arc::new(Mutex::new(())), // allocate the lock here since only StringBlock needs to aquire the GIL for now - buf_size_mb: 4, // in MB + buf_size_mb: 4, // in MB }) } } @@ -48,7 +45,6 @@ impl<'a> StringBlock<'a> { string_lengths: vec![], string_buf: Vec::with_capacity(self.buf_size_mb * (1 << 20) * 11 / 10), // allocate a little bit more memory to avoid Vec growth buf_size: self.buf_size_mb * (1 << 20), - mutex: self.mutex.clone(), }) } ret @@ -61,7 +57,6 @@ pub struct StringColumn<'a> { string_buf: Vec, string_lengths: Vec, // usize::MAX for empty string buf_size: usize, - mutex: Arc>, } impl<'a> PandasColumnObject for StringColumn<'a> { @@ -236,7 +231,6 @@ impl<'a> StringColumn<'a> { string_lengths: vec![], string_buf: Vec::with_capacity(self.buf_size), buf_size: self.buf_size, - mutex: self.mutex.clone(), }); } @@ -252,11 +246,11 @@ impl<'a> StringColumn<'a> { let py = unsafe { Python::assume_gil_acquired() }; let _guard = if force { - self.mutex + GIL_MUTEX .lock() .map_err(|e| anyhow!("mutex poisoned {}", e))? } else { - match self.mutex.try_lock() { + match GIL_MUTEX.try_lock() { Ok(guard) => guard, Err(_) => return, } diff --git a/connectorx-python/src/pandas/transports/postgres.rs b/connectorx-python/src/pandas/transports/postgres.rs index 54f8f32699..0194bb2043 100644 --- a/connectorx-python/src/pandas/transports/postgres.rs +++ b/connectorx-python/src/pandas/transports/postgres.rs @@ -31,6 +31,13 @@ macro_rules! impl_postgres_transport { { Int2[i16] => I64[i64] | conversion auto } { Int4[i32] => I64[i64] | conversion auto } { Int8[i64] => I64[i64] | conversion auto } + { CharArray[Vec] => I64Array[Vec] | conversion auto_vec } + { Int2Array[Vec] => I64Array[Vec] | conversion auto_vec } + { Int4Array[Vec] => I64Array[Vec] | conversion auto_vec } + { Int8Array[Vec] => I64Array[Vec] | conversion auto } + { Float4Array[Vec] => F64Array[Vec] | conversion auto_vec } + { Float8Array[Vec] => F64Array[Vec] | conversion auto } + { NumericArray[Vec] => F64Array[Vec] | conversion option } { Bool[bool] => Bool[bool] | conversion auto } { Char[i8] => Char[char] | conversion option } { Text[&'r str] => Str[&'r str] | conversion auto } @@ -57,6 +64,17 @@ impl_postgres_transport!(CSVProtocol, MakeTlsConnector); impl_postgres_transport!(CursorProtocol, NoTls); impl_postgres_transport!(CursorProtocol, MakeTlsConnector); +impl<'py, P, C> TypeConversion, Vec> for PostgresPandasTransport<'py, P, C> { + fn convert(val: Vec) -> Vec { + val.into_iter() + .map(|v| { + v.to_f64() + .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", v)) + }) + .collect() + } +} + impl<'py, P, C> TypeConversion for PostgresPandasTransport<'py, P, C> { fn convert(val: Decimal) -> f64 { val.to_f64() diff --git a/connectorx-python/src/pandas/typesystem.rs b/connectorx-python/src/pandas/typesystem.rs index 7981eeb278..1cbffd559d 100644 --- a/connectorx-python/src/pandas/typesystem.rs +++ b/connectorx-python/src/pandas/typesystem.rs @@ -6,6 +6,8 @@ use connectorx::impl_typesystem; pub enum PandasTypeSystem { F64(bool), I64(bool), + F64Array(bool), + I64Array(bool), Bool(bool), Char(bool), Str(bool), @@ -21,6 +23,8 @@ pub enum PandasBlockType { Boolean(bool), // bool indicates nullablity Int64(bool), Float64, + Int64Array, + Float64Array, String, DateTime, Bytes, @@ -50,6 +54,8 @@ impl From for PandasBlockType { PandasTypeSystem::Bool(nullable) => PandasBlockType::Boolean(nullable), PandasTypeSystem::I64(nullable) => PandasBlockType::Int64(nullable), PandasTypeSystem::F64(_) => PandasBlockType::Float64, + PandasTypeSystem::F64Array(_) => PandasBlockType::Float64Array, + PandasTypeSystem::I64Array(_) => PandasBlockType::Int64Array, PandasTypeSystem::String(_) | PandasTypeSystem::BoxStr(_) | PandasTypeSystem::Str(_) @@ -65,6 +71,8 @@ impl_typesystem! { mappings = { { F64 => f64 } { I64 => i64 } + { F64Array => Vec } + { I64Array => Vec } { Bool => bool } { Char => char } { Str => &'r str } diff --git a/connectorx-python/src/source_router.rs b/connectorx-python/src/source_router.rs index dd29a9b5b6..df44a5ea76 100644 --- a/connectorx-python/src/source_router.rs +++ b/connectorx-python/src/source_router.rs @@ -122,6 +122,11 @@ fn pg_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) { let col_type = PostgresTypeSystem::from(row.columns()[0].type_()); let (min_v, max_v) = match col_type { + PostgresTypeSystem::Int2(_) => { + let min_v: Option = row.get(0); + let max_v: Option = row.get(1); + (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64) + } PostgresTypeSystem::Int4(_) => { let min_v: Option = row.get(0); let max_v: Option = row.get(1); diff --git a/connectorx/src/macros.rs b/connectorx/src/macros.rs index 9368047fe7..059428a01e 100644 --- a/connectorx/src/macros.rs +++ b/connectorx/src/macros.rs @@ -292,6 +292,15 @@ macro_rules! impl_transport { impl_transport!(@cvt option $TP, $T1, $T2); }; + (@cvt auto_vec $TP:ty, $T1:ty, $T2:ty) => { + impl<'tp, 'r> $crate::typesystem::TypeConversion<$T1, $T2> for $TP { + fn convert(val: $T1) -> $T2 { + val.into_iter().map(|v| v as _).collect() + } + } + + impl_transport!(@cvt option $TP, $T1, $T2); + }; (@cvt owned $TP:ty, $T1:ty, $T2:ty) => { impl<'tp, 'r> $crate::typesystem::TypeConversion<$T1, $T2> for $TP { fn convert(val: $T1) -> $T2 { diff --git a/connectorx/src/sources/postgres/mod.rs b/connectorx/src/sources/postgres/mod.rs index ef777ae67f..e9977b5dbc 100644 --- a/connectorx/src/sources/postgres/mod.rs +++ b/connectorx/src/sources/postgres/mod.rs @@ -451,6 +451,13 @@ impl_produce!( f32, f64, Decimal, + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, bool, &'r str, Vec, @@ -556,6 +563,59 @@ macro_rules! impl_csv_produce { impl_csv_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid,); +macro_rules! impl_csv_vec_produce { + ($($t: ty,)+) => { + $( + impl<'r, 'a> Produce<'r, Vec<$t>> for PostgresCSVSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&mut self) -> Vec<$t> { + let (ridx, cidx) = self.next_loc()?; + let s = &self.rowbuf[ridx][cidx][..]; + match s { + "{}" => vec![], + _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))), + s => s[1..s.len() - 1] + .split(",") + .map(|v| { + v.parse() + .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into()))) + }) + .collect::, ConnectorXError>>()?, + } + } + } + + impl<'r, 'a> Produce<'r, Option>> for PostgresCSVSourceParser<'a> { + type Error = PostgresSourceError; + + #[throws(PostgresSourceError)] + fn produce(&mut self) -> Option> { + let (ridx, cidx) = self.next_loc()?; + let s = &self.rowbuf[ridx][cidx][..]; + match s { + "" => None, + "{}" => Some(vec![]), + _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))), + s => Some( + s[1..s.len() - 1] + .split(",") + .map(|v| { + v.parse() + .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into()))) + }) + .collect::, ConnectorXError>>()?, + ), + } + } + } + )+ + }; +} + +impl_csv_vec_produce!(i8, i16, i32, i64, f32, f64, Decimal,); + impl<'r, 'a> Produce<'r, bool> for PostgresCSVSourceParser<'a> { type Error = PostgresSourceError; @@ -876,6 +936,13 @@ impl_produce!( f32, f64, Decimal, + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, bool, &'r str, Vec, diff --git a/connectorx/src/sources/postgres/typesystem.rs b/connectorx/src/sources/postgres/typesystem.rs index 9eaa92b5ca..3218af1693 100644 --- a/connectorx/src/sources/postgres/typesystem.rs +++ b/connectorx/src/sources/postgres/typesystem.rs @@ -13,6 +13,13 @@ pub enum PostgresTypeSystem { Int2(bool), Int4(bool), Int8(bool), + Float4Array(bool), + Float8Array(bool), + NumericArray(bool), + CharArray(bool), + Int2Array(bool), + Int4Array(bool), + Int8Array(bool), Date(bool), Char(bool), BpChar(bool), @@ -37,6 +44,13 @@ impl_typesystem! { { Float4 => f32 } { Float8 => f64 } { Numeric => Decimal } + { CharArray => Vec } + { Int2Array => Vec } + { Int4Array => Vec } + { Int8Array => Vec } + { Float4Array => Vec } + { Float8Array => Vec } + { NumericArray => Vec } { Bool => bool } { Char => i8 } { Text | BpChar | VarChar | Enum => &'r str } @@ -60,6 +74,13 @@ impl<'a> From<&'a Type> for PostgresTypeSystem { "float4" => Float4(true), "float8" => Float8(true), "numeric" => Numeric(true), + "_char" => CharArray(true), + "_int2" => Int2Array(true), + "_int4" => Int4Array(true), + "_int8" => Int8Array(true), + "_float4" => Float4Array(true), + "_float8" => Float8Array(true), + "_numeric" => NumericArray(true), "bool" => Bool(true), "char" => Char(true), "text" => Text(true), @@ -92,6 +113,13 @@ impl<'a> From for Type { Float4(_) => Type::FLOAT4, Float8(_) => Type::FLOAT8, Numeric(_) => Type::NUMERIC, + CharArray(_) => Type::CHAR_ARRAY, + Int2Array(_) => Type::INT2_ARRAY, + Int4Array(_) => Type::INT4_ARRAY, + Int8Array(_) => Type::INT8_ARRAY, + Float4Array(_) => Type::FLOAT4_ARRAY, + Float8Array(_) => Type::FLOAT8_ARRAY, + NumericArray(_) => Type::NUMERIC_ARRAY, Bool(_) => Type::BOOL, Text(_) => Type::TEXT, BpChar(_) => Type::BPCHAR, diff --git a/scripts/postgres.sql b/scripts/postgres.sql index 19ac43c508..c554a24289 100644 --- a/scripts/postgres.sql +++ b/scripts/postgres.sql @@ -44,13 +44,15 @@ CREATE TABLE IF NOT EXISTS test_types( test_json JSON, test_jsonb JSONB, test_bytea BYTEA, - test_enum happiness + test_enum happiness, + test_farray DOUBLE PRECISION[], + test_iarray Integer[] ); -INSERT INTO test_types VALUES (0, 'a', '86b494cc-96b2-11eb-9298-3e22fbb9fe9d', '08:12:40', '1 year 2 months 3 days', '{"customer": "John Doe", "items": {"product": "Beer","qty": 6}}', '{"product": "Beer","qty": 6}', NULL, 'happy'); -INSERT INTO test_types VALUES (1, 'b', '86b49b84-96b2-11eb-9298-3e22fbb9fe9d', '10:03:00', '2 weeks ago', '{"customer": "Lily Bush", "items": {"product": "Diaper","qty": 24}}', '{"product": "Diaper","qty": 24}', 'Здра́вствуйте', 'very happy'); -INSERT INTO test_types VALUES (2, NULL, '86b49c42-96b2-11eb-9298-3e22fbb9fe9d', '23:00:10', '3 months 2 days ago', '{"customer": "Josh William", "items": {"product": "Toy Car","qty": 1}}', '{"product": "Toy Car","qty": 1}', '', 'ecstatic'); -INSERT INTO test_types VALUES (3, 'd', '86b49cce-96b2-11eb-9298-3e22fbb9fe9d', '18:30:00', '3 year', '{"customer": "Mary Clark", "items": {"product": "Toy Train","qty": 2}}', '{"product": "Toy Train","qty": 2}', '😜', 'ecstatic'); +INSERT INTO test_types VALUES (0, 'a', '86b494cc-96b2-11eb-9298-3e22fbb9fe9d', '08:12:40', '1 year 2 months 3 days', '{"customer": "John Doe", "items": {"product": "Beer","qty": 6}}', '{"product": "Beer","qty": 6}', NULL, 'happy', '{}', '{-1, 0, 1123}'); +INSERT INTO test_types VALUES (1, 'b', '86b49b84-96b2-11eb-9298-3e22fbb9fe9d', '10:03:00', '2 weeks ago', '{"customer": "Lily Bush", "items": {"product": "Diaper","qty": 24}}', '{"product": "Diaper","qty": 24}', 'Здра́вствуйте', 'very happy', NULL, '{}'); +INSERT INTO test_types VALUES (2, NULL, '86b49c42-96b2-11eb-9298-3e22fbb9fe9d', '23:00:10', '3 months 2 days ago', '{"customer": "Josh William", "items": {"product": "Toy Car","qty": 1}}', '{"product": "Toy Car","qty": 1}', '', 'ecstatic', '{0.0123}', '{-324324}'); +INSERT INTO test_types VALUES (3, 'd', '86b49cce-96b2-11eb-9298-3e22fbb9fe9d', '18:30:00', '3 year', '{"customer": "Mary Clark", "items": {"product": "Toy Train","qty": 2}}', '{"product": "Toy Train","qty": 2}', '😜', 'ecstatic', '{0.000234, -12.987654321}', NULL); CREATE OR REPLACE FUNCTION increment(i integer) RETURNS integer AS $$ BEGIN