|
23 | 23 | import zmq |
24 | 24 | from tornado import ioloop |
25 | 25 | from tornado import gen |
| 26 | +from tornado.locks import Semaphore |
26 | 27 | from zmq.eventloop.zmqstream import ZMQStream |
27 | 28 |
|
28 | 29 | from traitlets.config.configurable import SingletonConfigurable |
|
38 | 39 |
|
39 | 40 | from ._version import kernel_protocol_version |
40 | 41 |
|
| 42 | +from collections import defaultdict |
| 43 | + |
| 44 | +# shell handlers are now coroutine (for async await code), |
| 45 | +# so we may not want to consume all the message and block while it yields |
| 46 | +_msg_locks = defaultdict(Semaphore) |
| 47 | + |
| 48 | + |
41 | 49 | class Kernel(SingletonConfigurable): |
42 | 50 |
|
43 | 51 | #--------------------------------------------------------------------------- |
@@ -233,7 +241,9 @@ def dispatch_shell(self, stream, msg): |
233 | 241 | self.log.debug("%s: %s", msg_type, msg) |
234 | 242 | self.pre_handler_hook() |
235 | 243 | try: |
236 | | - yield gen.maybe_future(handler(stream, idents, msg)) |
| 244 | + lock = _msg_locks[msg_type] |
| 245 | + with (yield lock.acquire()): |
| 246 | + yield gen.maybe_future(handler(stream, idents, msg)) |
237 | 247 | except Exception: |
238 | 248 | self.log.error("Exception in message handler:", exc_info=True) |
239 | 249 | finally: |
@@ -376,7 +386,6 @@ def finish_metadata(self, parent, metadata, reply_content): |
376 | 386 | @gen.coroutine |
377 | 387 | def execute_request(self, stream, ident, parent): |
378 | 388 | """handle an execute_request""" |
379 | | - |
380 | 389 | try: |
381 | 390 | content = parent[u'content'] |
382 | 391 | code = py3compat.cast_unicode_py2(content[u'code']) |
|
0 commit comments