Skip to content

bpo-40390: Implement channel_send_wait for subinterpreters #19715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

benedwards14
Copy link
Contributor

@benedwards14 benedwards14 commented Apr 25, 2020

Copy link
Member

@ericsnowcurrently ericsnowcurrently left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! Overall the implementation seems good. I've left some comments on things to address.

Regarding the approach to providing a blocking "send()"... After having seen the block-in-the-function approach here, I still have reservations about it. There's a lot of value to keeping the "send" part separate from the "wait" part.

Here are some alternatives that offer that separation:

  • return an acquired threading.Lock which the receiving interpreter releases
  • return a wait(timeout=None) function which the receiving interpreter will un-block
  • caller passes in an acquired lock that the receiving interpreter releases
  • caller passes in a callback function that the receiving interpreter triggers (via _Py_AddPendingCall())

My preference is that first one (return a lock). Since we are crossing the interpreter boundary we need to be extra careful. A user-defined callback is too risky. Of the remaining options the first one is simplest and most consistent conceptually.

FWIW, I expect that most of the code in this PR should still work mostly as-is.

@@ -2366,6 +2531,50 @@ PyDoc_STRVAR(channel_recv_doc,
\n\
Return a new object from the data at the from of the channel's queue.");

static PyObject *
channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to right after _channel_send() (right before channel_recv()).

Also, how is channel_send(cid, obj) any different than channel_send_wait(cid, obj, 0)? It might make sense to have them be the same function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least the function name passed to PyArg_ParseTupleAndKeywords for error messages would be different?

Comment on lines +2547 to +2566
void *wait_lock = _channelitem_wait_lock_new();
if (_channel_send(&_globals.channels, cid, obj, wait_lock) != 0) {
return NULL;
}

long long microseconds;
if (timeout >= 0) {
microseconds = timeout * 1000000;
}
else {
microseconds = -1;
}
PyLockStatus lock_rc = _channelitem_wait_lock_wait(wait_lock, microseconds);

if (lock_rc == PY_LOCK_ACQUIRED) {
Py_RETURN_TRUE;
}
else {
Py_RETURN_FALSE;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've tried to avoid putting lower-level code like this in the actual extension module functions. Either move it up into _channel_send() (my preference) or into a new _channel_send_wait(). Then this code would look more like the following:

Suggested change
void *wait_lock = _channelitem_wait_lock_new();
if (_channel_send(&_globals.channels, cid, obj, wait_lock) != 0) {
return NULL;
}
long long microseconds;
if (timeout >= 0) {
microseconds = timeout * 1000000;
}
else {
microseconds = -1;
}
PyLockStatus lock_rc = _channelitem_wait_lock_wait(wait_lock, microseconds);
if (lock_rc == PY_LOCK_ACQUIRED) {
Py_RETURN_TRUE;
}
else {
Py_RETURN_FALSE;
}
long long microseconds = timeout * 1000000;
if (_channel_send(&_globals.channels, cid, obj, timeout) != 0) {
if (PyErr_Occurred()) {
return NULL;
}
Py_RETURN_FALSE;
}
Py_RETURN_TRUE;

return NULL;
}

void *wait_lock = _channelitem_wait_lock_new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why void * instead of _channelitem_wait_lock *?

Comment on lines +614 to +616
if (wait_lock != NULL) {
_channelitem_wait_lock_sent(item->wait_lock);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this here? I'd expect it to happen right next to where _channelitem_wait_lock_() was called.

@@ -393,6 +546,9 @@ _channelitem_free_all(_channelitem *item)
while (item != NULL) {
_channelitem *last = item;
item = item->next;
if (last->wait_lock != NULL) {
_channelitem_wait_lock_recv(last->wait_lock, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this is so that the sending interpreter stops blocking when the channel is closed.

Does this cause ChannelEmptyError or ChannelClosedError to get raised in those interpreters? Do we have tests for that?

What about in the channel_close(cid, send=True) case? I would expect no failure in that case. Are there tests for that?

Comment on lines +2547 to +2550
void *wait_lock = _channelitem_wait_lock_new();
if (_channel_send(&_globals.channels, cid, obj, wait_lock) != 0) {
return NULL;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If timeout is 0 then isn't this the same behavior as channel_send()? In that case you can add a short-circuit:

Suggested change
void *wait_lock = _channelitem_wait_lock_new();
if (_channel_send(&_globals.channels, cid, obj, wait_lock) != 0) {
return NULL;
}
if (timeout == 0) {
if (_channel_send(&_globals.channels, cid, obj, NULL) != 0) {
return NULL;
}
Py_RETURN_NONE;
}
void *wait_lock = _channelitem_wait_lock_new();
if (_channel_send(&_globals.channels, cid, obj, wait_lock) != 0) {
return NULL;
}

import math

start = time.time()
rc = _interpreters.channel_send_wait({cid}, b"send", timeout=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be worth making it clear (e.g. in a comment) that we expect this to time out (since it is happening in the same thread where we expect recv() to be called later).

import _xxsubinterpreters as _interpreters
import time

rc = _interpreters.channel_send_wait({cid}, b"send", timeout=10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test function says "no_timeout". Did you mean "with_timeout"? "timeout_not_hit"?

obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b"send")
t.join()
assert thread_exc is None, f"{thread_exc}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert thread_exc is None, f"{thread_exc}"
self.assertIsNone(thread_exc, f"{thread_exc}")

obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b"send")
t.join()
assert thread_exc is None, f"{thread_exc}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert thread_exc is None, f"{thread_exc}"
self.assertIsNone(thread_exc, f"{thread_exc}")

@bedevere-bot
Copy link

A Python core developer has requested some changes be made to your pull request before we can consider merging it. If you could please address their requests along with any other requests in other reviews from core developers that would be appreciated.

Once you have made the requested changes, please leave a comment on this pull request containing the phrase I have made the requested changes; please review again. I will then notify any core developers who have left a review that you're ready for them to take another look at this pull request.

@ericsnowcurrently
Copy link
Member

FYI, there is a decent chance that we will make threading.Lock shareable (via channels). So keep that in mind.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants