Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/bindings_python_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ jobs:
shell: bash
run: |
set -e
pip install dist/pyiceberg_core-*.whl --force-reinstall
pip install pytest
pytest -v
pip install hatch==1.12.0
hatch run dev:pip install dist/pyiceberg_core-*.whl --force-reinstall
hatch run dev:test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ dist/*
**/venv
*.so
*.pyc
*.whl
*.tar.gz
2 changes: 2 additions & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ header:
- '**/DEPENDENCIES.*.tsv'
# Release distributions
- 'dist/*'
# Generated content by poetry
- poetry.lock
comment: on-failure
2 changes: 2 additions & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ crate-type = ["cdylib"]
[dependencies]
iceberg = { path = "../../crates/iceberg" }
pyo3 = { version = "0.22", features = ["extension-module"] }
arrow = { version = "52.2.0", features = ["ffi"] }
libc = "0.2"
10 changes: 3 additions & 7 deletions bindings/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,17 @@ This project is used to build an iceberg-rust powered core for pyiceberg.
## Setup

```shell
python -m venv venv
source ./venv/bin/activate

pip install maturin
pip install hatch==1.12.0
```

## Build

```shell
maturin develop
hatch run dev:develop
```

## Test

```shell
maturin develop -E test
pytest -v
hatch run dev:test
```
15 changes: 12 additions & 3 deletions bindings/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,22 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]

[project.optional-dependencies]
test = ["pytest"]

[tool.maturin]
features = ["pyo3/extension-module"]
python-source = "python"
module-name = "pyiceberg_core.pyiceberg_core_rust"

[tool.ruff.lint]
ignore = ["F403", "F405"]

[tool.hatch.envs.dev]
dependencies = [
"maturin>=1.0,<2.0",
"pytest>=8.3.2",
"pyarrow>=17.0.0",
]

[tool.hatch.envs.dev.scripts]
develop = "maturin develop"
build = "maturin build --out dist --sdist"
test = "pytest"
5 changes: 5 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

use iceberg::io::FileIOBuilder;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;

mod transform;

#[pyfunction]
fn hello_world() -> PyResult<String> {
Expand All @@ -26,6 +29,8 @@ fn hello_world() -> PyResult<String> {

#[pymodule]
fn pyiceberg_core_rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
use transform::bucket_transform;
m.add_function(wrap_pyfunction!(hello_world, m)?)?;
m.add_function(wrap_pyfunction!(bucket_transform, m)?)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyo3 supports modules, how about adding a transform module like this?

use pyo3::prelude::*;

#[pymodule]
fn pyiceberg_core_rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
    let transform_module = PyModule::new_bound(parent_module.py(), "transform")?;
    child_module.add_function(wrap_pyfunction!(bucket_transform, &transform_module)?)?;
    parent_module.add_submodule(&transform_module)
}

Ok(())
}
142 changes: 142 additions & 0 deletions bindings/python/src/transform.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{error, fmt, sync::Arc};
use iceberg::spec::Transform;
use iceberg::transform::create_transform_function;
use iceberg::Error;

use arrow::{
array::{make_array, Array, ArrayData, ArrayRef},
error::ArrowError,
ffi::{from_ffi, to_ffi},
};
use libc::uintptr_t;
use pyo3::{exceptions::PyOSError, exceptions::PyValueError, prelude::*};

#[derive(Debug)]
enum PyO3ArrowError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe most ffi related to arrow has been handled in arrow-rs/pyarrow. Would you like to take a look? Maybe we can reuse them.

ArrowError(ArrowError),
}

impl fmt::Display for PyO3ArrowError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PyO3ArrowError::ArrowError(ref e) => e.fmt(f),
}
}
}

impl error::Error for PyO3ArrowError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
// The cause is the underlying implementation error type. Is implicitly
// cast to the trait object `&error::Error`. This works because the
// underlying type already implements the `Error` trait.
PyO3ArrowError::ArrowError(ref e) => Some(e),
}
}
}

impl From<ArrowError> for PyO3ArrowError {
fn from(err: ArrowError) -> PyO3ArrowError {
PyO3ArrowError::ArrowError(err)
}
}

impl From<PyO3ArrowError> for PyErr {
fn from(err: PyO3ArrowError) -> PyErr {
PyOSError::new_err(err.to_string())
}
}

#[derive(Debug)]
enum PyO3IcebergError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about implementing a simple fn to_py_err(err: iceberg::Error) -> PyError. So we can use:

let bucket = create_transform_function(&Transform::Bucket(num_buckets)).map_err(to_py_err)?;

But no need for implementing Display, From by hand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That worked like a charm! Thanks @Xuanwo

Error(Error),
}

impl fmt::Display for PyO3IcebergError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PyO3IcebergError::Error(ref e) => e.fmt(f),
}
}
}

impl error::Error for PyO3IcebergError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
// The cause is the underlying implementation error type. Is implicitly
// cast to the trait object `&error::Error`. This works because the
// underlying type already implements the `Error` trait.
PyO3IcebergError::Error(ref e) => Some(e),
}
}
}

impl From<Error> for PyO3IcebergError {
fn from(err: Error) -> PyO3IcebergError {
PyO3IcebergError::Error(err)
}
}

impl From<PyO3IcebergError> for PyErr {
fn from(err: PyO3IcebergError) -> PyErr {
PyValueError::new_err(err.to_string())
}
}

fn to_rust_arrow_array(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let (array, schema) = to_ffi(&ArrayData::new_empty(&arrow::datatypes::DataType::Null))
.map_err(PyO3ArrowError::from)?;
let array_pointer = &array as *const _ as uintptr_t;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not comfortable using libc directly. How about using pyo3::ffi::Py_uintptr_t instead?

let schema_pointer = &schema as *const _ as uintptr_t;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (array_pointer, schema_pointer))?;

let array = unsafe { from_ffi(array, &schema) }.map_err(PyO3ArrowError::from)?;
let array = make_array(array);
Ok(array)
}

fn to_pyarrow_array(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array, schema) = to_ffi(&array.to_data()).map_err(PyO3ArrowError::from)?;
let array_pointer = &array as *const _ as uintptr_t;
let schema_pointer = &schema as *const _ as uintptr_t;

let pa = py.import_bound("pyarrow")?;

let array = pa.getattr("Array")?.call_method1(
"_import_from_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;
Ok(array.to_object(py))
}

#[pyfunction]
pub fn bucket_transform(array: PyObject, num_buckets: u32, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust_arrow_array(array, py)?;
let bucket = create_transform_function(&Transform::Bucket(num_buckets)).map_err(PyO3IcebergError::from)?;
let array = bucket.transform(array).map_err(PyO3IcebergError::from)?;
let array = Arc::new(array);
// export
to_pyarrow_array(array, py)
}
49 changes: 49 additions & 0 deletions bindings/python/tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from pyiceberg_core import bucket_transform

import pytest
import pyarrow as pa


def test_bucket_pyarrow_array():
arr = pa.array([1, 2])
result = bucket_transform(arr, 10)
expected = pa.array([6, 2], type=pa.int32())
assert result == expected


def test_bucket_pyarrow_array_list_type_fails():
arr = pa.array([[1, 2], [3, 4]])
with pytest.raises(
ValueError,
match=r"FeatureUnsupported => Unsupported data type for bucket transform",
):
bucket_transform(arr, 10)


def test_bucket_chunked_array():
chunked = pa.chunked_array([pa.array([1, 2]), pa.array([3, 4])])
result_chunks = []
for arr in chunked.iterchunks():
result_chunks.append(bucket_transform(arr, 10))

expected = pa.chunked_array(
[pa.array([6, 2], type=pa.int32()), pa.array([5, 0], type=pa.int32())]
)
assert pa.chunked_array(result_chunks).equals(expected)