Skip to content

Commit a4ab0dd

Browse files
committed
use global mutex for gil
1 parent aeb9a41 commit a4ab0dd

File tree

8 files changed

+23
-30
lines changed

8 files changed

+23
-30
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Justfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
set dotenv-load := true
2+
13
build-release:
24
cargo build --release
35

connectorx-python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ tokio = {version = "1", features = ["rt", "rt-multi-thread", "net"]}
3737
tokio-util = "0.6"
3838
url = "2"
3939
uuid = "0.8"
40+
lazy_static = "1.4.0"
4041

4142
[build-dependencies]
4243
built = {version = "0.5", features = ["chrono"]}

connectorx-python/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ use pyo3::prelude::*;
1111
use pyo3::{wrap_pyfunction, PyResult};
1212
use std::sync::Once;
1313

14+
#[macro_use]
15+
extern crate lazy_static;
16+
1417
static START: Once = Once::new();
1518

1619
// https://github.com/PyO3/pyo3-built/issues/21

connectorx-python/src/pandas/pandas_columns/array.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
1+
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX};
22
use crate::errors::ConnectorXPythonError;
33
use anyhow::anyhow;
44
use fehler::throws;
@@ -7,7 +7,6 @@ use numpy::{npyffi::NPY_TYPES, Element, PyArray, PyArrayDescr};
77
use pyo3::{FromPyObject, Py, PyAny, PyResult, Python, ToPyObject};
88
use std::any::TypeId;
99
use std::marker::PhantomData;
10-
use std::sync::{Arc, Mutex};
1110

1211
#[derive(Clone)]
1312
#[repr(transparent)]
@@ -23,7 +22,6 @@ impl Element for PyList {
2322

2423
pub struct ArrayBlock<'a, V> {
2524
data: ArrayViewMut2<'a, PyList>,
26-
mutex: Arc<Mutex<()>>,
2725
buf_size_mb: usize,
2826
_value_type: PhantomData<V>,
2927
}
@@ -35,8 +33,7 @@ impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> {
3533
let data = unsafe { array.as_array_mut() };
3634
Ok(ArrayBlock::<V> {
3735
data,
38-
mutex: Arc::new(Mutex::new(())), // allocate the lock here since only a few blocks needs to aquire the GIL for now
39-
buf_size_mb: 16, // in MB
36+
buf_size_mb: 16, // in MB
4037
_value_type: PhantomData,
4138
})
4239
}
@@ -61,7 +58,6 @@ impl<'a, V> ArrayBlock<'a, V> {
6158
lengths: vec![],
6259
buffer: Vec::with_capacity(self.buf_size_mb * (1 << 17) * 11 / 10), // allocate a little bit more memory to avoid Vec growth
6360
buf_size: self.buf_size_mb * (1 << 17),
64-
mutex: self.mutex.clone(),
6561
})
6662
}
6763
ret
@@ -74,7 +70,6 @@ pub struct ArrayColumn<'a, V> {
7470
buffer: Vec<V>,
7571
lengths: Vec<usize>, // usize::MAX if the string is None
7672
buf_size: usize,
77-
mutex: Arc<Mutex<()>>,
7873
}
7974

8075
impl<'a, V> PandasColumnObject for ArrayColumn<'a, V>
@@ -179,7 +174,6 @@ where
179174
lengths: vec![],
180175
buffer: Vec::with_capacity(self.buf_size),
181176
buf_size: self.buf_size,
182-
mutex: self.mutex.clone(),
183177
});
184178
}
185179
partitions
@@ -194,8 +188,7 @@ where
194188

195189
{
196190
// allocation in python is not thread safe
197-
let _guard = self
198-
.mutex
191+
let _guard = GIL_MUTEX
199192
.lock()
200193
.map_err(|e| anyhow!("mutex poisoned {}", e))?;
201194
let mut start = 0;

connectorx-python/src/pandas/pandas_columns/bytes.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
1+
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX};
22
use crate::errors::ConnectorXPythonError;
33
use anyhow::anyhow;
44
use fehler::throws;
55
use ndarray::{ArrayViewMut2, Axis, Ix2};
66
use numpy::{npyffi::NPY_TYPES, Element, PyArray, PyArrayDescr};
77
use pyo3::{FromPyObject, Py, PyAny, PyResult, Python};
88
use std::any::TypeId;
9-
use std::sync::{Arc, Mutex};
109

1110
#[derive(Clone)]
1211
#[repr(transparent)]
@@ -22,7 +21,6 @@ impl Element for PyBytes {
2221

2322
pub struct BytesBlock<'a> {
2423
data: ArrayViewMut2<'a, PyBytes>,
25-
mutex: Arc<Mutex<()>>,
2624
buf_size_mb: usize,
2725
}
2826

@@ -33,8 +31,7 @@ impl<'a> FromPyObject<'a> for BytesBlock<'a> {
3331
let data = unsafe { array.as_array_mut() };
3432
Ok(BytesBlock {
3533
data,
36-
mutex: Arc::new(Mutex::new(())), // allocate the lock here since only a few blocks needs to aquire the GIL for now
37-
buf_size_mb: 16, // in MB
34+
buf_size_mb: 16, // in MB
3835
})
3936
}
4037
}
@@ -58,7 +55,6 @@ impl<'a> BytesBlock<'a> {
5855
bytes_lengths: vec![],
5956
bytes_buf: Vec::with_capacity(self.buf_size_mb * (1 << 20) * 11 / 10), // allocate a little bit more memory to avoid Vec growth
6057
buf_size: self.buf_size_mb * (1 << 20),
61-
mutex: self.mutex.clone(),
6258
})
6359
}
6460
ret
@@ -71,7 +67,6 @@ pub struct BytesColumn<'a> {
7167
bytes_buf: Vec<u8>,
7268
bytes_lengths: Vec<usize>, // usize::MAX if the string is None
7369
buf_size: usize,
74-
mutex: Arc<Mutex<()>>,
7570
}
7671

7772
impl<'a> PandasColumnObject for BytesColumn<'a> {
@@ -171,7 +166,6 @@ impl<'a> BytesColumn<'a> {
171166
bytes_lengths: vec![],
172167
bytes_buf: Vec::with_capacity(self.buf_size),
173168
buf_size: self.buf_size,
174-
mutex: self.mutex.clone(),
175169
});
176170
}
177171

@@ -187,8 +181,7 @@ impl<'a> BytesColumn<'a> {
187181

188182
{
189183
// allocation in python is not thread safe
190-
let _guard = self
191-
.mutex
184+
let _guard = GIL_MUTEX
192185
.lock()
193186
.map_err(|e| anyhow!("mutex poisoned {}", e))?;
194187
let mut start = 0;

connectorx-python/src/pandas/pandas_columns/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@ pub use float64::{Float64Block, Float64Column};
1717
pub use int64::{Int64Block, Int64Column};
1818
use pyo3::{exceptions::PyRuntimeError, PyAny, PyResult};
1919
use std::any::TypeId;
20+
use std::sync::Mutex;
2021
pub use string::{StringBlock, StringColumn};
2122

23+
// A global GIL lock for Python object allocations like string, bytes and list
24+
lazy_static! {
25+
static ref GIL_MUTEX: Mutex<()> = Mutex::new(());
26+
}
27+
2228
pub trait PandasColumnObject: Send {
2329
fn typecheck(&self, _: TypeId) -> bool;
2430
fn typename(&self) -> &'static str;

connectorx-python/src/pandas/pandas_columns/string.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::super::pystring::{PyString, StringInfo};
2-
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
2+
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX};
33
use crate::errors::ConnectorXPythonError;
44
use anyhow::anyhow;
55
use fehler::throws;
@@ -8,11 +8,9 @@ use ndarray::{ArrayViewMut2, Axis, Ix2};
88
use numpy::PyArray;
99
use pyo3::{FromPyObject, PyAny, PyResult, Python};
1010
use std::any::TypeId;
11-
use std::sync::{Arc, Mutex};
1211

1312
pub struct StringBlock<'a> {
1413
data: ArrayViewMut2<'a, PyString>,
15-
mutex: Arc<Mutex<()>>,
1614
buf_size_mb: usize,
1715
}
1816

@@ -23,8 +21,7 @@ impl<'a> FromPyObject<'a> for StringBlock<'a> {
2321
let data = unsafe { array.as_array_mut() };
2422
Ok(StringBlock {
2523
data,
26-
mutex: Arc::new(Mutex::new(())), // allocate the lock here since only a few blocks needs to aquire the GIL for now
27-
buf_size_mb: 4, // in MB
24+
buf_size_mb: 4, // in MB
2825
})
2926
}
3027
}
@@ -48,7 +45,6 @@ impl<'a> StringBlock<'a> {
4845
string_lengths: vec![],
4946
string_buf: Vec::with_capacity(self.buf_size_mb * (1 << 20) * 11 / 10), // allocate a little bit more memory to avoid Vec growth
5047
buf_size: self.buf_size_mb * (1 << 20),
51-
mutex: self.mutex.clone(),
5248
})
5349
}
5450
ret
@@ -61,7 +57,6 @@ pub struct StringColumn<'a> {
6157
string_buf: Vec<u8>,
6258
string_lengths: Vec<usize>, // usize::MAX for empty string
6359
buf_size: usize,
64-
mutex: Arc<Mutex<()>>,
6560
}
6661

6762
impl<'a> PandasColumnObject for StringColumn<'a> {
@@ -236,7 +231,6 @@ impl<'a> StringColumn<'a> {
236231
string_lengths: vec![],
237232
string_buf: Vec::with_capacity(self.buf_size),
238233
buf_size: self.buf_size,
239-
mutex: self.mutex.clone(),
240234
});
241235
}
242236

@@ -252,11 +246,11 @@ impl<'a> StringColumn<'a> {
252246

253247
let py = unsafe { Python::assume_gil_acquired() };
254248
let _guard = if force {
255-
self.mutex
249+
GIL_MUTEX
256250
.lock()
257251
.map_err(|e| anyhow!("mutex poisoned {}", e))?
258252
} else {
259-
match self.mutex.try_lock() {
253+
match GIL_MUTEX.try_lock() {
260254
Ok(guard) => guard,
261255
Err(_) => return,
262256
}

0 commit comments

Comments
 (0)