Skip to content

amiyamandal-dev/makeParallel

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

11 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

makeParallel πŸš€

The easiest way to speed up your Python code using all your CPU cores.

PyPI version Tests Python Version License

Got a slow, CPU-heavy task in Python? makeParallel lets you run it on a separate core with a single line of code, so you can get results up to 4x, 8x, or even 16x faster without blocking your main program.

It's powered by Rust to safely bypass Python's Global Interpreter Lock (GIL), giving you true parallelism without the complexity of multiprocessing.


πŸ“‹ Table of Contents


πŸ€” What's the "GIL"?

Python has a rule called the Global Interpreter Lock (GIL) that only lets one thread run at a time, even on a multi-core CPU. For tasks that just wait for networks (I/O-bound), this is fine. But for heavy calculations (CPU-bound), it means Python can't use all the power your computer has. makeParallel fixes this.


✨ Why You'll Love makeParallel

  • So Simple: Just add the @parallel decorator to any function. That's it!
  • True Speed-Up: Uses Rust threads to run your code on all available CPU cores.
  • Doesn't Block: Your main application stays responsive while the work happens in the background.
  • Smart Callbacks: Monitor progress, handle completion, catch errors - all with simple callbacks.
  • Task Dependencies: Build complex pipelines where tasks automatically wait for their dependencies.
  • Auto Progress Tracking: Report progress from within tasks without managing task IDs.
  • No multiprocessing Headaches: Avoids the complexity, memory overhead, and data-sharing issues of multiprocessing.
  • Production Ready: Built-in error handling, timeouts, cancellation, and graceful shutdown.
  • Works with Your Code: Decorate any function, even class methods.

πŸ“¦ Installation

Installing is as simple as:

pip install makeparallel

Or, to build it from the source:

# Clone the repository
git clone https://github.com/amiyamandal-dev/makeParallel.git
cd makeParallel

# Build and install locally
pip install .

πŸš€ Quick Start

Let's say you have a function that does a lot of math and slows down your program.

Before: Your code waits...

import time

def cpu_intensive_task(n):
    # A slow calculation
    return sum(i * i for i in range(n))

start = time.time()
result = cpu_intensive_task(20_000_000) # This blocks everything!
print(f"Got result: {result} in {time.time() - start:.2f}s")

After: Instant and non-blocking!

import time
from makeparallel import parallel

@parallel # Just add this decorator!
def cpu_intensive_task(n):
    # The same slow calculation
    return sum(i * i for i in range(n))

start = time.time()
# The function returns instantly with a "handle"
handle = cpu_intensive_task(20_000_000)

print("The task is running in the background, my app is still responsive!")
# You can do other work here...

# Now, get the result (this will wait until it's ready)
result = handle.get()
print(f"Got result: {result} in {time.time() - start:.2f}s")

In the example above, handle.get() blocks until the result is ready. You can also check if it's done without waiting:

if handle.is_ready():
    print("It's done!")
else:
    print("Still working...")

πŸ€” When Should I Use This?

makeParallel is for CPU-bound tasks. These are operations that require a lot of computation, like:

  • heavy data processing, or scientific computing.
  • Image or video processing.
  • Complex simulations.

For I/O-bound tasks (like waiting for a web request or reading a file), Python's built-in threading or asyncio are usually a better fit.

πŸ“š Complete Feature Guide

makeParallel comes with many powerful decorators and utilities.

πŸ”₯ Parallel Execution Decorators

@parallel - Full-featured parallel execution with callbacks and advanced control

from makeparallel import parallel, report_progress

@parallel
def cpu_intensive_task(n):
    for i in range(0, n, n//10):
        # Report progress automatically (no task_id needed!)
        report_progress(i / n)
        # Do work...
    return sum(i * i for i in range(n))

# Returns immediately with an AsyncHandle
handle = cpu_intensive_task(20_000_000, timeout=5.0)

# Set up callbacks (execute automatically when task completes)
handle.on_progress(lambda p: print(f"Progress: {p*100:.0f}%"))
handle.on_complete(lambda result: print(f"Success! Result: {result}"))
handle.on_error(lambda error: print(f"Error occurred: {error}"))

# Check status
if handle.is_ready():
    result = handle.get()  # Callbacks fire here

# Try to get result without blocking
result = handle.try_get()  # Returns None if not ready

# Cancel a running task
handle.cancel()
if handle.is_cancelled():
    print("Task was cancelled")

# Get task info
print(f"Task ID: {handle.get_task_id()}")
print(f"Elapsed: {handle.elapsed_time()}s")
print(f"Progress: {handle.get_progress()}")

# Add metadata
handle.set_metadata("user_id", "user-123")
metadata = handle.get_all_metadata()

@parallel_fast - Optimized with lock-free channels (crossbeam)

from makeparallel import parallel_fast

@parallel_fast
def fast_task(x):
    return x ** 2

handle = fast_task(10)
result = handle.get()  # Faster channel communication

@parallel_pool - Uses Rayon thread pool (best for many small tasks)

from makeparallel import parallel_pool

@parallel_pool
def small_task(x):
    return x * 2

# Efficiently handles many concurrent tasks
handles = [small_task(i) for i in range(1000)]
results = [h.get() for h in handles]

@parallel_priority - Priority-based execution

from makeparallel import parallel_priority, start_priority_worker, stop_priority_worker

# Start the priority worker before using priority tasks
start_priority_worker()

@parallel_priority
def task(data):
    return process(data)

# High priority tasks execute first (higher number = higher priority)
low = task(data1, priority=1)
high = task(data2, priority=10)  # Executes first

# Get results
low_result = low.get()
high_result = high.get()

# Stop the worker when done
stop_priority_worker()

@parallel_with_deps - Task dependencies and pipelines

from makeparallel import parallel_with_deps

@parallel_with_deps
def step1():
    return "data from step 1"

@parallel_with_deps
def step2(deps):
    # deps is a tuple of all dependency results
    data = deps[0]  # Result from step1
    return f"processed {data}"

@parallel_with_deps
def step3(deps):
    result = deps[0]  # Result from step2
    return f"final: {result}"

# Build dependency chain
h1 = step1()
h2 = step2(depends_on=[h1])  # Automatically waits for h1
h3 = step3(depends_on=[h2])  # Automatically waits for h2

# Execute entire pipeline
final = h3.get()  # Returns: "final: processed data from step 1"

🎯 Callbacks and Event Handling

makeParallel provides a powerful callback system for monitoring task execution:

from makeparallel import parallel, report_progress

@parallel
def download_file(url):
    # Simulate download with progress
    for i in range(100):
        download_chunk(url, i)
        # Report progress (task_id is automatic!)
        report_progress(i / 100.0)
    return f"Downloaded {url}"

handle = download_file("https://example.com/large_file.zip")

# Set up callbacks
handle.on_progress(lambda p: print(f"Downloaded: {p*100:.1f}%"))
handle.on_complete(lambda result: notify_user(result))
handle.on_error(lambda error: log_error(error))

# Callbacks fire automatically when you get the result
result = handle.get()

Callback Types:

  • on_progress(callback) - Called when report_progress() is called inside task
  • on_complete(callback) - Called when task succeeds (receives result)
  • on_error(callback) - Called when task fails (receives error string)

Key Features:

  • βœ… Automatic task_id tracking (no need to pass task_id!)
  • βœ… Thread-safe callback execution
  • βœ… Error isolation (callback failures don't crash tasks)
  • βœ… Progress validation (NaN/Infinity rejected)

πŸ—ΊοΈ Batch Processing

parallel_map - Process lists in parallel

from makeparallel import parallel_map

def process_data(item):
    return item * 2

my_large_list = list(range(10000))
results = parallel_map(process_data, my_large_list)

gather - Collect results from multiple handles

from makeparallel import parallel, gather

@parallel
def task(x):
    return x ** 2

handles = [task(i) for i in range(10)]

# Wait for all and collect results
results = gather(handles, on_error="raise")  # or "skip" or "none"

ParallelContext - Context manager for parallel tasks

from makeparallel import ParallelContext, parallel

@parallel
def task(x):
    return x * 2

with ParallelContext(timeout=10.0) as ctx:
    handle1 = ctx.submit(task, (5,))
    handle2 = ctx.submit(task, (10,))
    # All tasks complete when exiting context

πŸ’Ύ Caching Decorators

@memoize - Cache function results

from makeparallel import memoize

@memoize
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

fibonacci(35)  # Slow first time
fibonacci(35)  # Instant second time

@memoize_fast - Lock-free concurrent cache (DashMap)

from makeparallel import memoize_fast

@memoize_fast
def expensive_computation(x, y):
    return x ** y

# Safe for concurrent access from multiple threads

πŸ” Retry Logic

@retry - Simple retry with fixed delays

from makeparallel import retry

@retry(max_retries=3)
def flaky_api_call():
    # Will retry up to 3 times on failure
    return make_request()

@retry_backoff - Retry with exponential backoff

from makeparallel import retry_backoff

@retry_backoff(
    max_attempts=5,
    backoff="exponential",  # or "linear"
    initial_delay=1.0,
    max_delay=60.0
)
def unreliable_task():
    return do_something()

πŸ“Š Performance Monitoring

@profiled - Automatic performance tracking

from makeparallel import profiled, get_metrics, get_all_metrics

@profiled
def tracked_function(x):
    return x ** 2

for i in range(100):
    tracked_function(i)

# Get metrics for specific function
metrics = get_metrics("tracked_function")
print(f"Total tasks: {metrics.total_tasks}")
print(f"Completed: {metrics.completed_tasks}")
print(f"Failed: {metrics.failed_tasks}")
print(f"Avg time: {metrics.average_execution_time_ms}ms")

# Get all metrics
all_metrics = get_all_metrics()

@timer - Simple execution timing

from makeparallel import timer

@timer
def my_function():
    # Do work...
    pass

my_function()  # Prints execution time

@CallCounter - Count function invocations

from makeparallel import CallCounter

@CallCounter
def counted_function():
    return "result"

counted_function()
counted_function()
print(counted_function.call_count)  # 2
counted_function.reset()

βš™οΈ Advanced Configuration

Thread Pool Configuration

from makeparallel import configure_thread_pool, get_thread_pool_info

# Configure global thread pool
configure_thread_pool(num_threads=8, stack_size=2*1024*1024)

# Get current info
info = get_thread_pool_info()
print(info["current_num_threads"])

Backpressure and Resource Management

from makeparallel import set_max_concurrent_tasks, configure_memory_limit

# Limit concurrent tasks to prevent overload
set_max_concurrent_tasks(100)

# Set memory limit (percentage)
configure_memory_limit(max_memory_percent=80.0)

Progress Reporting and Callbacks

from makeparallel import parallel, report_progress

@parallel
def long_task():
    for i in range(100):
        # Report progress from within task (task_id is automatic!)
        report_progress(i / 100.0)
        # Do work...
    return "done"

handle = long_task()

# Set up callbacks
handle.on_progress(lambda p: print(f"Progress: {p*100:.1f}%"))
handle.on_complete(lambda result: print(f"Finished: {result}"))
handle.on_error(lambda error: print(f"Error: {error}"))

# Get result (callbacks fire automatically)
result = handle.get()

Task Dependencies

from makeparallel import parallel_with_deps

@parallel_with_deps
def fetch_data():
    return {"users": 100, "orders": 500}

@parallel_with_deps
def process_data(deps):
    # deps[0] contains result from fetch_data
    data = deps[0]
    return f"Processed {data['users']} users"

@parallel_with_deps
def save_results(deps):
    # deps[0] contains result from process_data
    processed = deps[0]
    return f"Saved: {processed}"

# Build a dependency pipeline
h1 = fetch_data()
h2 = process_data(depends_on=[h1])  # Waits for h1
h3 = save_results(depends_on=[h2])  # Waits for h2

# Execute the entire pipeline
final_result = h3.get()  # Returns: "Saved: Processed 100 users"

Graceful Shutdown

from makeparallel import shutdown, get_active_task_count, reset_shutdown

# Get active task count
print(f"Active tasks: {get_active_task_count()}")

# Graceful shutdown with timeout
success = shutdown(timeout_secs=30.0, cancel_pending=True)

# Reset after shutdown (for testing)
reset_shutdown()

🎯 Choosing the Right Decorator

Decorator Best For Performance Features
@parallel Most use cases Good Full control: timeout, cancel, metadata, progress
@parallel_fast High-throughput tasks Better Lock-free channels (crossbeam)
@parallel_pool Many small tasks Best Rayon thread pool, efficient resource usage
@parallel_priority Priority-based scheduling Good Priority queue execution
parallel_map Batch processing lists Best Automatic parallelization across items

Quick Decision Guide:

  • Single long task with monitoring? β†’ @parallel
  • Thousands of small tasks? β†’ @parallel_pool
  • Processing a large list? β†’ parallel_map
  • Need priority scheduling? β†’ @parallel_priority
  • Maximum throughput? β†’ @parallel_fast

πŸ—οΈ How It Works

Here's a simple breakdown of what happens when you call a @parallel function:

  1. Python Side: Your main program calls the function but doesn't run it directly. Instead, it sends the function and its arguments to the Rust backend.
  2. Rust Backend:
    • It immediately returns the AsyncHandle object to your Python code so it doesn't have to wait.
    • It releases Python's Global Interpreter Lock (GIL).
    • It spawns a new Rust OS thread (a real parallel thread).
    • Inside the new thread, it re-acquires the GIL to safely execute your Python function.
  3. Result: The result is sent back to the AsyncHandle, which your main program can access with .get().

This GIL-release-and-reacquire step is the key to unlocking true parallelism for CPU-bound Python code.

πŸ”§ Best Practices

βœ… Do's

  • Use for CPU-bound tasks: Heavy computation, data processing, mathematical operations
  • Combine with caching: Use @memoize or @memoize_fast to avoid redundant calculations
  • Monitor with profiling: Use @profiled to track performance and identify bottlenecks
  • Set resource limits: Use set_max_concurrent_tasks() to prevent overload
  • Handle errors gracefully: Use gather() with appropriate on_error strategy
  • Use timeouts for long tasks: Add timeout parameter to prevent hanging

❌ Don'ts

  • Don't use for I/O-bound tasks: Use asyncio or threading instead
  • Don't pass large objects: Minimize data transfer between threads
  • Don't ignore error handling: Always check results or use try/except
  • Don't spawn unlimited tasks: Use set_max_concurrent_tasks() for backpressure
  • Don't forget cleanup: Use shutdown() for graceful termination

πŸ’‘ Performance Tips

# βœ… Good: Process large batches efficiently
results = parallel_map(heavy_computation, large_list)

# ❌ Bad: Creating too many individual parallel tasks
handles = [parallel_task(x) for x in range(10000)]  # Overhead!

# βœ… Good: Use memoization for repeated calls
@memoize_fast
@parallel_pool
def cached_parallel_task(x):
    return expensive_operation(x)

# βœ… Good: Configure thread pool for your workload
configure_thread_pool(num_threads=8)  # Match your CPU cores

πŸ†š Comparison with Alternatives

Feature makeParallel multiprocessing threading asyncio
True Parallelism βœ… Yes βœ… Yes ❌ No (GIL) ❌ No (GIL)
CPU-bound Tasks βœ… Excellent βœ… Good ❌ Poor ❌ Poor
I/O-bound Tasks ⚠️ Okay ⚠️ Okay βœ… Good βœ… Excellent
Memory Overhead βœ… Low ❌ High βœ… Low βœ… Low
Easy to Use βœ… Very Easy ⚠️ Complex βœ… Easy ⚠️ Moderate
Data Sharing βœ… Simple ❌ Complex βœ… Simple βœ… Simple
Performance βœ…βœ… Fast (Rust) βœ… Good ⚠️ Limited βœ… Good
Cancellation βœ… Built-in ⚠️ Manual ⚠️ Manual βœ… Built-in
Progress Tracking βœ… Built-in ❌ Manual ❌ Manual ⚠️ Manual

πŸ“– Real-World Examples

Example 1: Image Processing Pipeline

from makeparallel import parallel_map, profiled
from PIL import Image
import os

@profiled
def process_image(image_path):
    img = Image.open(image_path)
    # Resize, apply filters, etc.
    img_resized = img.resize((800, 600))
    # Save processed image
    output_path = f"processed_{os.path.basename(image_path)}"
    img_resized.save(output_path)
    return output_path

# Process 1000 images in parallel
image_files = [f"image_{i}.jpg" for i in range(1000)]
processed = parallel_map(process_image, image_files)

print(f"Processed {len(processed)} images")

Example 2: Web Scraping with Retry Logic

from makeparallel import parallel_pool, retry_backoff
import requests

@retry_backoff(max_attempts=3, backoff="exponential")
@parallel_pool
def fetch_url(url):
    response = requests.get(url, timeout=10)
    return response.text

urls = ["https://example.com/page1", "https://example.com/page2", ...]
handles = [fetch_url(url) for url in urls]
results = [h.get() for h in handles]

Example 3: Data Analysis with Progress Tracking and Callbacks

from makeparallel import parallel, report_progress
import pandas as pd

@parallel
def analyze_dataset(file_path):
    df = pd.read_csv(file_path)
    total_rows = len(df)

    results = []
    for i, row in df.iterrows():
        # Report progress (task_id is automatic!)
        report_progress(i / total_rows)

        # Perform analysis
        result = complex_analysis(row)
        results.append(result)

    return results

handle = analyze_dataset("large_dataset.csv")

# Set up callbacks for monitoring
handle.on_progress(lambda p: print(f"Analyzed: {p*100:.1f}%"))
handle.on_complete(lambda results: print(f"Analysis complete! {len(results)} rows"))
handle.on_error(lambda e: print(f"Analysis failed: {e}"))

# Get results (callbacks fire automatically)
final_results = handle.get()

Example 4: ETL Pipeline with Task Dependencies

from makeparallel import parallel_with_deps

@parallel_with_deps
def extract_data(source):
    # Fetch data from database/API
    print(f"Extracting from {source}...")
    return fetch_raw_data(source)

@parallel_with_deps
def transform_data(deps):
    # deps[0] contains result from extract_data
    raw_data = deps[0]
    print("Transforming data...")
    return clean_and_transform(raw_data)

@parallel_with_deps
def validate_data(deps):
    # deps[0] contains result from transform_data
    transformed = deps[0]
    print("Validating data...")
    return run_validation_checks(transformed)

@parallel_with_deps
def load_data(deps):
    # deps[0] contains result from validate_data
    validated = deps[0]
    print("Loading to warehouse...")
    return insert_into_warehouse(validated)

# Build ETL pipeline with dependencies
h1 = extract_data("production_db")
h2 = transform_data(depends_on=[h1])   # Waits for extract
h3 = validate_data(depends_on=[h2])    # Waits for transform
h4 = load_data(depends_on=[h3])        # Waits for validate

# Execute entire pipeline
result = h4.get()  # Blocks until all dependencies complete
print(f"Pipeline complete: {result}")

Example 5: Machine Learning Model Training

from makeparallel import parallel, gather, configure_thread_pool
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

# Configure thread pool for ML workload
configure_thread_pool(num_threads=4)

@parallel
def train_model(params):
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    score = model.score(X_test, y_test)
    return {"params": params, "score": score}

# Train multiple models with different hyperparameters
param_grid = [
    {"n_estimators": 100, "max_depth": 10},
    {"n_estimators": 200, "max_depth": 15},
    {"n_estimators": 300, "max_depth": 20},
]

handles = [train_model(params) for params in param_grid]
results = gather(handles)

# Find best model
best = max(results, key=lambda x: x["score"])
print(f"Best params: {best['params']}, Score: {best['score']}")

πŸ› Troubleshooting

My tasks are running slowly

  • Check if your task is CPU-bound (use @profiled to measure)
  • For I/O-bound tasks, use asyncio instead
  • Ensure you're not creating too many tasks (use parallel_map for batch processing)
  • Configure thread pool: configure_thread_pool(num_threads=<cpu_cores>)

Tasks are hanging

  • Add timeouts: @parallel def task(): ... then task(timeout=10.0)
  • Use cancel() to stop stuck tasks
  • Check for deadlocks in your Python code

Memory usage is too high

  • Limit concurrent tasks: set_max_concurrent_tasks(100)
  • Set memory limit: configure_memory_limit(max_memory_percent=80.0)
  • Use @parallel_pool instead of spawning individual threads
  • Process data in smaller batches

Errors are being swallowed

  • Always check handle.get() in a try/except block
  • Use gather() with on_error="raise" to see all errors
  • Enable profiling to see failed task counts: @profiled
  • Use on_error callbacks to capture errors: handle.on_error(lambda e: print(e))

Callbacks not firing

  • Make sure you call handle.get() or handle.wait() to trigger callbacks
  • Callbacks execute during result retrieval
  • Check callback syntax: handle.on_progress(lambda p: print(p))

Dependencies hanging

  • Check for circular dependencies (task A depends on B, B depends on A)
  • Verify all dependencies complete successfully
  • Use timeouts: task(depends_on=[h1], timeout=60.0)
  • Enable logging to see dependency errors: RUST_LOG=makeparallel=debug

🀝 Contributing

Contributions are welcome! If you want to help improve makeParallel, please feel free to open an issue or submit a pull request.

Development Setup

# Clone the repository
git clone https://github.com/amiyamandal-dev/makeParallel.git
cd makeParallel

# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install development dependencies
pip install maturin

# Build and install in development mode
maturin develop

# Run tests
python tests/test_all.py

Running Tests

# Run all tests
python tests/test_all.py

# The test suite includes:
# - 37 core tests covering all decorators and features
# - 3 callback tests (on_progress, on_complete, on_error)
# - 5 progress tracking tests
# - Performance benchmarks
# - Edge case validation
# - Error handling verification

# Run specific test suites
python test_simple_callbacks.py      # Callback functionality
python test_progress_fix.py          # Progress tracking

Code Quality

# Format Rust code
cargo fmt

# Lint Rust code
cargo clippy

# Format Python code (if you have ruff)
ruff format .

# Check Python code
ruff check .

πŸ“ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ™ Acknowledgments

  • Built with PyO3 for Python-Rust interop
  • Uses Rayon for efficient thread pool management
  • Uses Crossbeam for lock-free channels
  • Uses DashMap for concurrent caching

πŸ“¬ Contact & Support


Made with ❀️ and Rust πŸ¦€

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •