Skip to content

gh-84570: Factor Out _channel_send_wait() #110949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 43 additions & 45 deletions Modules/_xxinterpchannelsmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,34 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj,
return 0;
}

static int
_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
PyErr_NoMemory();
return -1;
}
PyThread_acquire_lock(mutex, NOWAIT_LOCK);

/* Queue up the object. */
int res = _channel_send(channels, cid, obj, mutex);
if (res < 0) {
PyThread_release_lock(mutex);
goto finally;
}

/* Wait until the object is received. */
wait_for_lock(mutex);

/* success! */
res = 0;

finally:
// XXX Delete the lock.
return res;
}

static int
_channel_recv(_channels *channels, int64_t id, PyObject **res)
{
Expand Down Expand Up @@ -2526,30 +2554,16 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds)
}
cid = cid_data.cid;

/* Queue up the object. */
int err = 0;
if (blocking) {
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
PyErr_NoMemory();
return NULL;
}
PyThread_acquire_lock(mutex, WAIT_LOCK);

/* Queue up the object. */
int err = _channel_send(&_globals.channels, cid, obj, mutex);
if (handle_channel_error(err, self, cid)) {
PyThread_release_lock(mutex);
return NULL;
}

/* Wait until the object is received. */
wait_for_lock(mutex);
err = _channel_send_wait(&_globals.channels, cid, obj);
}
else {
/* Queue up the object. */
int err = _channel_send(&_globals.channels, cid, obj, NULL);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
err = _channel_send(&_globals.channels, cid, obj, NULL);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
}

Py_RETURN_NONE;
Expand Down Expand Up @@ -2584,33 +2598,17 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
return NULL;
}

/* Queue up the object. */
int err = 0;
if (blocking) {
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
Py_DECREF(tempobj);
PyErr_NoMemory();
return NULL;
}
PyThread_acquire_lock(mutex, WAIT_LOCK);

/* Queue up the buffer. */
int err = _channel_send(&_globals.channels, cid, tempobj, mutex);
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
PyThread_acquire_lock(mutex, WAIT_LOCK);
return NULL;
}

/* Wait until the buffer is received. */
wait_for_lock(mutex);
err = _channel_send_wait(&_globals.channels, cid, tempobj);
}
else {
/* Queue up the buffer. */
int err = _channel_send(&_globals.channels, cid, tempobj, NULL);
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
err = _channel_send(&_globals.channels, cid, tempobj, NULL);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
return NULL;
}

Py_RETURN_NONE;
Expand Down