@@ -1640,6 +1640,34 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj,
1640
1640
return 0 ;
1641
1641
}
1642
1642
1643
+ static int
1644
+ _channel_send_wait (_channels * channels , int64_t cid , PyObject * obj )
1645
+ {
1646
+ PyThread_type_lock mutex = PyThread_allocate_lock ();
1647
+ if (mutex == NULL ) {
1648
+ PyErr_NoMemory ();
1649
+ return -1 ;
1650
+ }
1651
+ PyThread_acquire_lock (mutex , NOWAIT_LOCK );
1652
+
1653
+ /* Queue up the object. */
1654
+ int res = _channel_send (channels , cid , obj , mutex );
1655
+ if (res < 0 ) {
1656
+ PyThread_release_lock (mutex );
1657
+ goto finally ;
1658
+ }
1659
+
1660
+ /* Wait until the object is received. */
1661
+ wait_for_lock (mutex );
1662
+
1663
+ /* success! */
1664
+ res = 0 ;
1665
+
1666
+ finally :
1667
+ // XXX Delete the lock.
1668
+ return res ;
1669
+ }
1670
+
1643
1671
static int
1644
1672
_channel_recv (_channels * channels , int64_t id , PyObject * * res )
1645
1673
{
@@ -2526,30 +2554,16 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2526
2554
}
2527
2555
cid = cid_data .cid ;
2528
2556
2557
+ /* Queue up the object. */
2558
+ int err = 0 ;
2529
2559
if (blocking ) {
2530
- PyThread_type_lock mutex = PyThread_allocate_lock ();
2531
- if (mutex == NULL ) {
2532
- PyErr_NoMemory ();
2533
- return NULL ;
2534
- }
2535
- PyThread_acquire_lock (mutex , WAIT_LOCK );
2536
-
2537
- /* Queue up the object. */
2538
- int err = _channel_send (& _globals .channels , cid , obj , mutex );
2539
- if (handle_channel_error (err , self , cid )) {
2540
- PyThread_release_lock (mutex );
2541
- return NULL ;
2542
- }
2543
-
2544
- /* Wait until the object is received. */
2545
- wait_for_lock (mutex );
2560
+ err = _channel_send_wait (& _globals .channels , cid , obj );
2546
2561
}
2547
2562
else {
2548
- /* Queue up the object. */
2549
- int err = _channel_send (& _globals .channels , cid , obj , NULL );
2550
- if (handle_channel_error (err , self , cid )) {
2551
- return NULL ;
2552
- }
2563
+ err = _channel_send (& _globals .channels , cid , obj , NULL );
2564
+ }
2565
+ if (handle_channel_error (err , self , cid )) {
2566
+ return NULL ;
2553
2567
}
2554
2568
2555
2569
Py_RETURN_NONE ;
@@ -2584,33 +2598,17 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
2584
2598
return NULL ;
2585
2599
}
2586
2600
2601
+ /* Queue up the object. */
2602
+ int err = 0 ;
2587
2603
if (blocking ) {
2588
- PyThread_type_lock mutex = PyThread_allocate_lock ();
2589
- if (mutex == NULL ) {
2590
- Py_DECREF (tempobj );
2591
- PyErr_NoMemory ();
2592
- return NULL ;
2593
- }
2594
- PyThread_acquire_lock (mutex , WAIT_LOCK );
2595
-
2596
- /* Queue up the buffer. */
2597
- int err = _channel_send (& _globals .channels , cid , tempobj , mutex );
2598
- Py_DECREF (tempobj );
2599
- if (handle_channel_error (err , self , cid )) {
2600
- PyThread_acquire_lock (mutex , WAIT_LOCK );
2601
- return NULL ;
2602
- }
2603
-
2604
- /* Wait until the buffer is received. */
2605
- wait_for_lock (mutex );
2604
+ err = _channel_send_wait (& _globals .channels , cid , tempobj );
2606
2605
}
2607
2606
else {
2608
- /* Queue up the buffer. */
2609
- int err = _channel_send (& _globals .channels , cid , tempobj , NULL );
2610
- Py_DECREF (tempobj );
2611
- if (handle_channel_error (err , self , cid )) {
2612
- return NULL ;
2613
- }
2607
+ err = _channel_send (& _globals .channels , cid , tempobj , NULL );
2608
+ }
2609
+ Py_DECREF (tempobj );
2610
+ if (handle_channel_error (err , self , cid )) {
2611
+ return NULL ;
2614
2612
}
2615
2613
2616
2614
Py_RETURN_NONE ;
0 commit comments