-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
I. Overview
1.1 Existing Architecture
Apache Paimon has already implemented a comprehensive Python API that supports multiple data format outputs:
Current supported output formats include:
- Arrow Format:
to_arrow(),to_arrow_batch_reader()- provides stream-based batch processing - Pandas:
to_pandas()- converts to DataFrame - Ray:
to_ray()- distributed computing integration - DuckDB:
to_duckdb()- SQL analytics - Iterator:
to_iterator()- native Python row-level iteration
1.2 Core Capabilities
- Sharded Reading: Split-based data sharding mechanism
- Predicate Pushdown: Filtering condition pushdown optimization
- Column Pruning: Projection pushdown reduces data transfer
- Streaming Processing: Arrow RecordBatch streaming reads
- Incremental Reading: Supports timestamp range incremental queries
1.3 Technical Gap
After code search confirmation, there is currently no PyTorch or TensorFlow integration code. Implementation is needed from scratch.
II. Technical Solution Design
2.1 PyTorch Dataset Interface Implementation Plan
2.1.1 Architecture Design
┌─────────────────────────────────────────────────────────────┐
│ PyTorch Training Loop │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ PaimonIterableDataset │
│ - Inherits torch.utils.data.IterableDataset │
│ - Supports worker sharding │
│ - Provides __iter__ method │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ PaimonMapDataset │
│ - Inherits torch.utils.data.Dataset │
│ - Supports random access via __getitem__ │
│ - Caches splits metadata │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ TableRead (Existing Paimon API) │
│ - to_arrow_batch_reader() for streaming │
│ - to_arrow() for full table │
│ - Split-based parallel reading │
└─────────────────────────────────────────────────────────────┘
2.1.2 Implementation Details
File Structure:
paimon-python/pypaimon/ml/
├── __init__.py
├── pytorch/
│ ├── __init__.py
│ ├── dataset.py # PaimonIterableDataset, PaimonMapDataset
│ ├── transforms.py # Data transformation utilities
│ └── collate.py # Batch collation functions
└── tensorflow/
├── __init__.py
└── dataset.py # TensorFlow Dataset implementation
Core Class Design - PaimonIterableDataset:
Based on the existing to_arrow_batch_reader() method, implements streaming iteration:
class PaimonIterableDataset(torch.utils.data.IterableDataset):
"""
Streaming dataset, suitable for large-scale data training
- Supports PyTorch DataLoader multiprocessing
- Automatically shards to different workers
- Memory-efficient streaming reads
"""Core Class Design - PaimonMapDataset:
Based on the to_arrow() method, implements random access:
class PaimonMapDataset(torch.utils.data.Dataset):
"""
Map-style dataset, supports random access
- Implements __len__ and __getitem__
- Suitable for scenarios requiring random sampling
- Can be used with PyTorch Sampler
"""2.1.3 Key Technical Points
1. Worker Sharding Support
Leveraging Paimon's Shard Read functionality, automatically allocates in __iter__:
def __iter__(self):
worker_info = torch.utils.data.get_worker_info()
if worker_info is None:
# Single process
splits = self.scan.plan().splits()
else:
# Multi-process: uses with_shard for automatic sharding
shard_index = worker_info.id
total_shards = worker_info.num_workers
splits = self.scan.with_shard(shard_index, total_shards).plan().splits()2. Data Transformation Pipeline
Supports flexible data transformation:
class PaimonDatasetConfig:
- transform: Optional[Callable] # Custom transformation function
- target_column: Optional[str] # Label column
- feature_columns: List[str] # Feature columns
- tensor_type: torch.dtype # Output tensor type3. Batch Processing Optimization
Leverages Arrow's efficient batch processing to reduce memory copies.
2.2 TensorFlow Dataset Interface Implementation Plan
2.2.1 Architecture Design
┌─────────────────────────────────────────────────────────────┐
│ TensorFlow Training Loop │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ PaimonTensorFlowDataset │
│ - Uses tf.data.Dataset.from_generator() │
│ - Supports prefetching and parallelism │
│ - TFRecord-compatible output │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ TableRead (Existing Paimon API) │
│ - to_arrow_batch_reader() for streaming │
│ - Automatic type conversion to TF tensors │
└─────────────────────────────────────────────────────────────┘
2.2.2 Implementation Strategy
Core Method: Using tf.data.Dataset.from_generator()
class PaimonTensorFlowDataset:
"""
TensorFlow Dataset wrapper
- Based on from_generator implementation
- Supports all tf.data API features
- Automatic type inference and conversion
"""
@staticmethod
def from_paimon(table, read_builder, splits):
def generator():
# Use to_arrow_batch_reader for streaming generation
batch_reader = table_read.to_arrow_batch_reader(splits)
for batch in batch_reader:
# Convert to TF compatible format
yield convert_arrow_to_tf(batch)
# Create tf.data.Dataset
return tf.data.Dataset.from_generator(
generator,
output_signature=infer_tf_signature(read_builder.read_type())
)2.2.3 Key Technical Points
1. Type Mapping
Establishes mapping from Paimon data types to TensorFlow types:
Paimon Type → TF Type
INT/BIGINT → tf.int32/tf.int64
FLOAT/DOUBLE → tf.float32/tf.float64
STRING → tf.string
BOOLEAN → tf.bool
TIMESTAMP → tf.int64 (milliseconds)
2. Performance Optimization
- Uses
.prefetch()to pre-fetch data - Uses
.batch()for dynamic batch processing - Uses
.map()for parallel data transformation - Supports
tf.distribute.Strategyfor distributed training
III. Implementation Plan
3.1 Phase One: PyTorch Dataset (High Priority)
Phase 1.1: Basic Implementation (2 weeks)
- Create
pypaimon/ml/pytorch/module structure - Implement
PaimonIterableDatasetbase class - Implement worker sharding logic
- Add basic type conversion (Arrow → PyTorch Tensor)
- Write unit tests
Phase 1.2: Feature Enhancement (1 week)
- Implement
PaimonMapDatasetfor random access support - Add data transformation pipeline (
transforms.py) - Implement custom
collate_fn - Support common ML scenarios (classification, regression, sequence)
Phase 1.3: Testing and Documentation (1 week)
- End-to-end integration tests
- Performance benchmark tests
- Write user documentation and example code
- Add to official documentation
3.2 Phase Two: TensorFlow Dataset (2 weeks)
Phase 2.1: Core Implementation
- Create
pypaimon/ml/tensorflow/module - Implement
PaimonTensorFlowDatasetclass - Establish type mapping system
- Implement streaming generator
Phase 2.2: Optimization and Testing
- Performance tuning (prefetch, cache, parallel map)
- Support
tf.distribute.Strategy - Unit and integration tests
- Documentation and examples
3.3 Phase Three: Production Optimization (1-2 weeks)
Performance Optimization:
- Zero-copy conversion (Arrow → Tensor)
- Memory pool management
- Adaptive batch sizing
Feature Completeness:
- Support online feature engineering
- Add caching mechanism
- Support data augmentation
Monitoring and Diagnostics:
- Add performance metrics
- Data sampling preview
- Error handling and retry logic
IV. Dependency Management
4.1 Update requirements.txt
Add optional dependencies:
# ML/AI framework support (optional)
torch>=1.9.0; extra == "pytorch"
tensorflow>=2.8.0; extra == "tensorflow"4.2 Update setup.py
extras_require={
'pytorch': ['torch>=1.9.0'],
'tensorflow': ['tensorflow>=2.8.0'],
'ml': ['torch>=1.9.0', 'tensorflow>=2.8.0'],
}V. API Usage Examples
5.1 PyTorch Usage Example
from pypaimon import CatalogFactory
from pypaimon.ml.pytorch import PaimonIterableDataset
import torch
from torch.utils.data import DataLoader
# 1. Create Catalog and get table
catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
table = catalog.get_table('default.training_data')
# 2. Configure reading
read_builder = table.new_read_builder() \
.with_projection(['feature1', 'feature2', 'label']) \
.with_filter(predicate_builder.greater_than('timestamp', '2024-01-01'))
# 3. Create PyTorch Dataset
dataset = PaimonIterableDataset(
read_builder=read_builder,
feature_columns=['feature1', 'feature2'],
target_column='label',
transform=lambda x: torch.tensor(x, dtype=torch.float32)
)
# 4. Create DataLoader (automatically shards to multiple workers)
dataloader = DataLoader(
dataset,
batch_size=32,
num_workers=4 # Automatically uses Paimon shard read
)
# 5. Training loop
for epoch in range(10):
for batch in dataloader:
features, labels = batch
# Train model...5.2 TensorFlow Usage Example
from pypaimon import CatalogFactory
from pypaimon.ml.tensorflow import PaimonTensorFlowDataset
import tensorflow as tf
# 1. Get table and configure reading
catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
table = catalog.get_table('default.training_data')
read_builder = table.new_read_builder()
# 2. Create TensorFlow Dataset
tf_dataset = PaimonTensorFlowDataset.from_paimon(
table=table,
read_builder=read_builder,
feature_columns=['feature1', 'feature2'],
label_column='label'
)
# 3. Apply TF transformations
tf_dataset = tf_dataset \
.batch(32) \
.prefetch(tf.data.AUTOTUNE) \
.cache()
# 4. Train model
model = create_model()
model.fit(tf_dataset, epochs=10)VI. Quality Assurance
6.1 Testing Strategy
Unit Tests: Based on existing test framework
tests/ml/
├── pytorch/
│ ├── test_iterable_dataset.py
│ ├── test_map_dataset.py
│ ├── test_transforms.py
│ └── test_integration.py
└── tensorflow/
├── test_tf_dataset.py
└── test_integration.py
Solution
No response
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!