diff --git a/.travis.yml b/.travis.yml index 3b3c66c0a..794e2128b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,27 +1,51 @@ language: python python: - - "nightly" - - "3.7-dev" - - 3.6 - - 3.5 - - 3.4 - - 2.7 + - "nightly" + - "3.7-dev" + - 3.6 + - 3.5 + - 3.4 sudo: false install: - - | - pip install --upgrade setuptools pip - pip install --pre . - pip install ipykernel[test] codecov - - | - if [[ "$TRAVIS_PYTHON_VERSION" == "3.6" || "$TRAVIS_PYTHON_VERSION" == "2.7" ]]; then - pip install matplotlib + - | + # pip install + pip install --upgrade setuptools pip + pip install --pre . + pip install ipykernel[test] codecov + - | + # install matplotlib + if [[ "$TRAVIS_PYTHON_VERSION" == "3.6" ]]; then + pip install matplotlib curio trio + fi + - | + # pin tornado + if [[ ! -z "$TORNADO" ]]; then + pip install tornado=="$TORNADO" + fi + - | + # pin IPython + if [[ ! -z "$IPYTHON" ]]; then + if [[ "$IPYTHON" == "master" ]]; then + SPEC=git+https://github.com/ipython/ipython#egg=ipython + else + SPEC="ipython==$IPYTHON" fi - - pip freeze + pip install --upgrade --pre "$SPEC" + fi + - pip freeze script: - - jupyter kernelspec list - - pytest --cov ipykernel --durations 10 -v ipykernel + - jupyter kernelspec list + - pytest --cov ipykernel --durations 10 -v ipykernel after_success: - - codecov + - codecov matrix: - allow_failures: - - python: "nightly" + include: + - python: 3.5 + env: + - TORNADO="4.5.*" + - IPYTHON=master + - python: 3.6 + env: + - IPYTHON=master + allow_failures: + - python: "nightly" diff --git a/appveyor.yml b/appveyor.yml index 7cdcc34ef..e11ae0242 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -6,8 +6,6 @@ clone_depth: 1 environment: matrix: - - python: "C:/Python27-x64" - - python: "C:/Python27" - python: "C:/Python36-x64" - python: "C:/Python36" diff --git a/docs/changelog.rst b/docs/changelog.rst index 38e2048c8..b213928a5 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -196,7 +196,7 @@ Changes in IPython kernel - Publish all IO in a thread, via :class:`IOPubThread`. This solves the problem of requiring :meth:`sys.stdout.flush` to be called in the notebook to produce output promptly during long-running cells. -- Remove refrences to outdated IPython guiref in kernel banner. +- Remove references to outdated IPython guiref in kernel banner. - Patch faulthandler to use ``sys.__stderr__`` instead of forwarded ``sys.stderr``, which has no fileno when forwarded. - Deprecate some vestiges of the Big Split: diff --git a/ipykernel/_version.py b/ipykernel/_version.py index 4ffba6a93..21c09a70c 100644 --- a/ipykernel/_version.py +++ b/ipykernel/_version.py @@ -1,4 +1,4 @@ -version_info = (4, 9, 0) +version_info = (5, 0, 0, 'dev') __version__ = '.'.join(map(str, version_info)) kernel_protocol_version_info = (5, 1) diff --git a/ipykernel/connect.py b/ipykernel/connect.py index faaa402ec..3106c9983 100644 --- a/ipykernel/connect.py +++ b/ipykernel/connect.py @@ -40,7 +40,7 @@ def get_connection_file(app=None): def find_connection_file(filename='kernel-*.json', profile=None): """DEPRECATED: find a connection file, and return its absolute path. - THIS FUNCION IS DEPRECATED. Use juptyer_client.find_connection_file instead. + THIS FUNCTION IS DEPRECATED. Use juptyer_client.find_connection_file instead. Parameters ---------- diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index 68b0b5cfb..7d3d32365 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -4,6 +4,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +from functools import partial import os import sys import platform @@ -14,6 +15,7 @@ from traitlets.config.application import Application from IPython.utils import io + def _use_appnope(): """Should we use appnope for dealing with OS X app nap? @@ -21,6 +23,7 @@ def _use_appnope(): """ return sys.platform == 'darwin' and V(platform.mac_ver()[0]) >= V('10.9') + def _notify_stream_qt(kernel, stream): from IPython.external.qt_for_kernel import QtCore @@ -34,9 +37,14 @@ def context(): yield def process_stream_events(): - while stream.getsockopt(zmq.EVENTS) & zmq.POLLIN: - with context(): - kernel.do_one_iteration() + """fall back to main loop when there's a socket event""" + # call flush to ensure that the stream doesn't lose events + # due to our consuming of the edge-triggered FD + # flush returns the number of events consumed. + # if there were any, wake it up + if stream.flush(limit=1): + notifier.setEnabled(False) + kernel.app.quit() fd = stream.getsockopt(zmq.FD) notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read, kernel.app) @@ -88,6 +96,7 @@ def exit_decorator(exit_func): to register a function to be called on exit """ func.exit_hook = exit_func + return exit_func func.exit = exit_decorator return func @@ -122,11 +131,6 @@ def loop_qt4(kernel): _loop_qt(kernel.app) -@loop_qt4.exit -def loop_qt4_exit(kernel): - kernel.app.exit() - - @register_integration('qt', 'qt5') def loop_qt5(kernel): """Start a kernel with PyQt5 event loop integration.""" @@ -134,8 +138,10 @@ def loop_qt5(kernel): return loop_qt4(kernel) +# exit and watch are the same for qt 4 and 5 +@loop_qt4.exit @loop_qt5.exit -def loop_qt5_exit(kernel): +def loop_qt_exit(kernel): kernel.app.exit() @@ -163,9 +169,15 @@ def loop_wx(kernel): from appnope import nope nope() - doi = kernel.do_one_iteration # Wx uses milliseconds - poll_interval = int(1000*kernel._poll_interval) + poll_interval = int(1000 * kernel._poll_interval) + + def wake(): + """wake from wx""" + for stream in kernel.shell_streams: + if stream.flush(limit=1): + kernel.app.ExitMainLoop() + return # We have to put the wx.Timer in a wx.Frame for it to fire properly. # We make the Frame hidden when we create it in the main app below. @@ -182,16 +194,20 @@ def on_timer(self, event): self.func() # We need a custom wx.App to create our Frame subclass that has the - # wx.Timer to drive the ZMQ event loop. + # wx.Timer to defer back to the tornado event loop. class IPWxApp(wx.App): def OnInit(self): - self.frame = TimerFrame(doi) + self.frame = TimerFrame(wake) self.frame.Show(False) return True # The redirect=False here makes sure that wx doesn't replace # sys.stdout/stderr with its own classes. - kernel.app = IPWxApp(redirect=False) + if not ( + getattr(kernel, 'app', None) + and isinstance(kernel.app, wx.App) + ): + kernel.app = IPWxApp(redirect=False) # The import of wx on Linux sets the handler for signal.SIGINT # to 0. This is a bug in wx or gtk. We fix by just setting it @@ -213,35 +229,31 @@ def loop_wx_exit(kernel): def loop_tk(kernel): """Start a kernel with the Tk event loop.""" - try: - from tkinter import Tk # Py 3 - except ImportError: - from Tkinter import Tk # Py 2 - doi = kernel.do_one_iteration - # Tk uses milliseconds - poll_interval = int(1000*kernel._poll_interval) - # For Tkinter, we create a Tk object and call its withdraw method. - class Timer(object): - def __init__(self, func): - self.app = Tk() - self.app.withdraw() - self.func = func + from tkinter import Tk, READABLE - def on_timer(self): - self.func() - self.app.after(poll_interval, self.on_timer) + def process_stream_events(stream, *a, **kw): + """fall back to main loop when there's a socket event""" + if stream.flush(limit=1): + app.tk.deletefilehandler(stream.getsockopt(zmq.FD)) + app.quit() - def start(self): - self.on_timer() # Call it once to get things going. - self.app.mainloop() + # For Tkinter, we create a Tk object and call its withdraw method. + kernel.app = app = Tk() + kernel.app.withdraw() + for stream in kernel.shell_streams: + notifier = partial(process_stream_events, stream) + # seems to be needed for tk + notifier.__name__ = 'notifier' + app.tk.createfilehandler(stream.getsockopt(zmq.FD), READABLE, notifier) + # schedule initial call after start + app.after(0, notifier) - kernel.timer = Timer(doi) - kernel.timer.start() + app.mainloop() @loop_tk.exit def loop_tk_exit(kernel): - kernel.timer.app.destroy() + kernel.app.destroy() @register_integration('gtk') @@ -299,8 +311,10 @@ def handle_int(etype, value, tb): # don't let interrupts during mainloop invoke crash_handler: sys.excepthook = handle_int mainloop(kernel._poll_interval) - sys.excepthook = real_excepthook - kernel.do_one_iteration() + for stream in kernel.shell_streams: + if stream.flush(limit=1): + # events to process, return control to kernel + return except: raise except KeyboardInterrupt: @@ -326,11 +340,24 @@ def loop_asyncio(kernel): if loop.is_running(): return - def kernel_handler(): - loop.call_soon(kernel.do_one_iteration) - loop.call_later(kernel._poll_interval, kernel_handler) + if loop.is_closed(): + # main loop is closed, create a new one + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop._should_close = False + + # pause eventloop when there's an event on a zmq socket + def process_stream_events(stream): + """fall back to main loop when there's a socket event""" + if stream.flush(limit=1): + loop.stop() + + for stream in kernel.shell_streams: + fd = stream.getsockopt(zmq.FD) + notifier = partial(process_stream_events, stream) + loop.add_reader(fd, notifier) + loop.call_soon(notifier) - loop.call_soon(kernel_handler) while True: error = None try: @@ -339,9 +366,8 @@ def kernel_handler(): continue except Exception as e: error = e - if hasattr(loop, 'shutdown_asyncgens'): - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() + if loop._should_close: + loop.close() if error is not None: raise error break @@ -352,8 +378,20 @@ def loop_asyncio_exit(kernel): """Exit hook for asyncio""" import asyncio loop = asyncio.get_event_loop() + + @asyncio.coroutine + def close_loop(): + if hasattr(loop, 'shutdown_asyncgens'): + yield from loop.shutdown_asyncgens() + loop._should_close = True + loop.stop() + if loop.is_running(): - loop.call_soon(loop.stop) + close_loop() + + elif not loop.is_closed(): + loop.run_until_complete(close_loop) + loop.close() def enable_gui(gui, kernel=None): diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 6304131f9..5144a015d 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -1,17 +1,26 @@ """The IPython kernel implementation""" +import asyncio +from contextlib import contextmanager +from functools import partial import getpass +import signal import sys from IPython.core import release from ipython_genutils.py3compat import builtin_mod, PY3, unicode_type, safe_unicode from IPython.utils.tokenutil import token_at_cursor, line_at_cursor +from tornado import gen from traitlets import Instance, Type, Any, List, Bool from .comm import CommManager from .kernelbase import Kernel as KernelBase from .zmqshell import ZMQInteractiveShell +try: + from IPython.core.interactiveshell import _asyncio_runner +except ImportError: + _asyncio_runner = None try: from IPython.core.completer import rectify_completions as _rectify_completions, provisionalcompleter as _provisionalcompleter @@ -193,10 +202,58 @@ def execution_count(self): @execution_count.setter def execution_count(self, value): - # Ignore the incrememnting done by KernelBase, in favour of our shell's + # Ignore the incrementing done by KernelBase, in favour of our shell's # execution counter. pass + @contextmanager + def _cancel_on_sigint(self, future): + """ContextManager for capturing SIGINT and cancelling a future + + SIGINT raises in the event loop when running async code, + but we want it to halt a coroutine. + + Ideally, it would raise KeyboardInterrupt, + but this turns it into a CancelledError. + At least it gets a decent traceback to the user. + """ + sigint_future = asyncio.Future() + + # whichever future finishes first, + # cancel the other one + def cancel_unless_done(f, _ignored): + if f.cancelled() or f.done(): + return + f.cancel() + + # when sigint finishes, + # abort the coroutine with CancelledError + sigint_future.add_done_callback( + partial(cancel_unless_done, future) + ) + # when the main future finishes, + # stop watching for SIGINT events + future.add_done_callback( + partial(cancel_unless_done, sigint_future) + ) + + def handle_sigint(*args): + def set_sigint_result(): + if sigint_future.cancelled() or sigint_future.done(): + return + sigint_future.set_result(1) + # use add_callback for thread safety + self.io_loop.add_callback(set_sigint_result) + + # set the custom sigint hander during this context + save_sigint = signal.signal(signal.SIGINT, handle_sigint) + try: + yield + finally: + # restore the previous sigint handler + signal.signal(signal.SIGINT, save_sigint) + + @gen.coroutine def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): shell = self.shell # we'll need this a lot here @@ -204,8 +261,37 @@ def do_execute(self, code, silent, store_history=True, self._forward_input(allow_stdin) reply_content = {} + if hasattr(shell, 'run_cell_async') and hasattr(shell, 'should_run_async'): + run_cell = shell.run_cell_async + should_run_async = shell.should_run_async + else: + should_run_async = lambda cell: False + # older IPython, + # use blocking run_cell and wrap it in coroutine + @gen.coroutine + def run_cell(*args, **kwargs): + return shell.run_cell(*args, **kwargs) try: - res = shell.run_cell(code, store_history=store_history, silent=silent) + + # default case: runner is asyncio and asyncio is already running + # TODO: this should check every case for "are we inside the runner", + # not just asyncio + if ( + _asyncio_runner + and should_run_async(code) + and shell.loop_runner is _asyncio_runner + and asyncio.get_event_loop().is_running() + ): + coro = run_cell(code, store_history=store_history, silent=silent) + coro_future = asyncio.ensure_future(coro) + + with self._cancel_on_sigint(coro_future): + res = yield coro_future + else: + # runner isn't already running, + # make synchronous call, + # letting shell dispatch to loop runners + res = shell.run_cell(code, store_history=store_history, silent=silent) finally: self._restore_input() diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index e0d729e2b..f0ca68bfb 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -302,9 +302,9 @@ def log_connection_info(self): # also raw print to the terminal if no parent_handle (`ipython kernel`) # unless log-level is CRITICAL (--quiet) if not self.parent_handle and self.log_level < logging.CRITICAL: - io.rprint(_ctrl_c_message) + io.raw_print(_ctrl_c_message) for line in lines: - io.rprint(line) + io.raw_print(line) self.ports = dict(shell=self.shell_port, iopub=self.iopub_port, stdin=self.stdin_port, hb=self.hb_port, @@ -378,6 +378,7 @@ def init_kernel(self): kernel_factory = self.kernel_class.instance kernel = kernel_factory(parent=self, session=self.session, + control_stream=control_stream, shell_streams=[shell_stream, control_stream], iopub_thread=self.iopub_thread, iopub_socket=self.iopub_socket, diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 72547fa30..ef280e653 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -5,12 +5,14 @@ from __future__ import print_function +from datetime import datetime +from functools import partial +import logging +from signal import signal, default_int_handler, SIGINT import sys import time -import logging import uuid -from datetime import datetime try: # jupyter_client >= 5, use tz-aware now from jupyter_client.session import utcnow as now @@ -18,10 +20,10 @@ # jupyter_client < 5, use local now() now = datetime.now -from signal import signal, default_int_handler, SIGINT - -import zmq from tornado import ioloop +from tornado import gen +from tornado.queues import PriorityQueue, QueueEmpty +import zmq from zmq.eventloop.zmqstream import ZMQStream from traitlets.config.configurable import SingletonConfigurable @@ -30,13 +32,31 @@ from ipython_genutils.py3compat import unicode_type, string_types from ipykernel.jsonutil import json_clean from traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, observe, default + Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, + observe, default ) from jupyter_client.session import Session from ._version import kernel_protocol_version +CONTROL_PRIORITY = 1 +SHELL_PRIORITY = 10 +ABORT_PRIORITY = 20 + +class _MessageEvent(tuple): + """class for priority message events + + ensures that comparison only invokes the priority entry, + not comparing the contents of the messages + """ + def __eq__(self, other): + return self[:2] == other[:2] + + def __lt__(self, other): + return self[:2] < other[:2] + + class Kernel(SingletonConfigurable): #--------------------------------------------------------------------------- @@ -80,7 +100,7 @@ def _default_ident(self): # Private interface _darwin_app_nap = Bool(True, - help="""Whether to use appnope for compatiblity with OS X App Nap. + help="""Whether to use appnope for compatibility with OS X App Nap. Only affects OS X >= 10.9. """ @@ -102,7 +122,7 @@ def _default_ident(self): # Frequency of the kernel's event loop. # Units are in seconds, kernel subclasses for GUI toolkits may need to # adapt to milliseconds. - _poll_interval = Float(0.05).tag(config=True) + _poll_interval = Float(0.01).tag(config=True) # If the shutdown was requested over the network, we leave here the # necessary reply message so it can be sent by our registered atexit @@ -145,9 +165,10 @@ def __init__(self, **kwargs): for msg_type in self.control_msg_types: self.control_handlers[msg_type] = getattr(self, msg_type) + @gen.coroutine def dispatch_control(self, msg): """dispatch control requests""" - idents,msg = self.session.feed_identities(msg, copy=False) + idents, msg = self.session.feed_identities(msg, copy=False) try: msg = self.session.deserialize(msg, content=True, copy=False) except: @@ -159,6 +180,10 @@ def dispatch_control(self, msg): # Set the parent message for side effects. self.set_parent(idents, msg) self._publish_status(u'busy') + if self._aborting: + self._send_abort_reply(self.control_stream, msg, idents) + self._publish_status(u'idle') + return header = msg['header'] msg_type = header['msg_type'] @@ -168,7 +193,7 @@ def dispatch_control(self, msg): self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) else: try: - handler(self.control_stream, idents, msg) + yield gen.maybe_future(handler(self.control_stream, idents, msg)) except Exception: self.log.error("Exception in control handler:", exc_info=True) @@ -186,22 +211,18 @@ def should_handle(self, stream, msg, idents): msg_type = msg['header']['msg_type'] # is it safe to assume a msg_id will not be resubmitted? self.aborted.remove(msg_id) - reply_type = msg_type.split('_')[0] + '_reply' - status = {'status' : 'aborted'} - md = {'engine' : self.ident} - md.update(status) - self.session.send(stream, reply_type, metadata=md, - content=status, parent=msg, ident=idents) + self._send_abort_reply(stream, msg, idents) return False return True + @gen.coroutine def dispatch_shell(self, stream, msg): """dispatch shell requests""" # flush control requests first if self.control_stream: self.control_stream.flush() - idents,msg = self.session.feed_identities(msg, copy=False) + idents, msg = self.session.feed_identities(msg, copy=False) try: msg = self.session.deserialize(msg, content=True, copy=False) except: @@ -212,6 +233,11 @@ def dispatch_shell(self, stream, msg): self.set_parent(idents, msg) self._publish_status(u'busy') + if self._aborting: + self._send_abort_reply(stream, msg, idents) + self._publish_status(u'idle') + return + msg_type = msg['header']['msg_type'] # Print some info about this message and leave a '--->' marker, so it's @@ -230,7 +256,7 @@ def dispatch_shell(self, stream, msg): self.log.debug("%s: %s", msg_type, msg) self.pre_handler_hook() try: - handler(stream, idents, msg) + yield gen.maybe_future(handler(stream, idents, msg)) except Exception: self.log.error("Exception in message handler:", exc_info=True) finally: @@ -251,52 +277,137 @@ def post_handler_hook(self): def enter_eventloop(self): """enter eventloop""" - self.log.info("entering eventloop %s", self.eventloop) - for stream in self.shell_streams: - # flush any pending replies, - # which may be skipped by entering the eventloop - stream.flush(zmq.POLLOUT) - # restore default_int_handler - self.pre_handler_hook() - while self.eventloop is not None: + self.log.info("Entering eventloop %s", self.eventloop) + # record handle, so we can check when this changes + eventloop = self.eventloop + def advance_eventloop(): + # check if eventloop changed: + if self.eventloop is not eventloop: + self.log.info("exiting eventloop %s", eventloop) + return + if self.msg_queue.qsize(): + self.log.debug("Delaying eventloop due to waiting messages") + # still messages to process, make the eventloop wait + schedule_next() + return + self.log.debug("Advancing eventloop %s", eventloop) try: - self.eventloop(self) + eventloop(self) except KeyboardInterrupt: # Ctrl-C shouldn't crash the kernel self.log.error("KeyboardInterrupt caught in kernel") - continue - else: - # eventloop exited cleanly, this means we should stop (right?) - self.eventloop = None - break - self.post_handler_hook() - self.log.info("exiting eventloop") + pass + if self.eventloop is eventloop: + # schedule advance again + schedule_next() + + def schedule_next(): + """Schedule the next advance of the eventloop""" + # flush the eventloop every so often, + # giving us a chance to handle messages in the meantime + self.log.debug("Scheduling eventloop advance") + self.io_loop.call_later(1, advance_eventloop) + + # begin polling the eventloop + schedule_next() + + @gen.coroutine + def do_one_iteration(self): + """Process a single shell message + + Any pending control messages will be flushed as well + + .. versionchanged:: 5 + This is now a coroutine + """ + # flush messages off of shell streams into the message queue + for stream in self.shell_streams: + stream.flush() + # process all messages higher priority than shell (control), + # and at most one shell message per iteration + priority = 0 + while priority is not None and priority < SHELL_PRIORITY: + priority = yield self.process_one(wait=False) + + @gen.coroutine + def process_one(self, wait=True): + """Process one request + + Returns priority of the message handled. + Returns None if no message was handled. + """ + if wait: + priority, t, dispatch, args = yield self.msg_queue.get() + else: + try: + priority, t, dispatch, args = self.msg_queue.get_nowait() + except QueueEmpty: + return None + yield gen.maybe_future(dispatch(*args)) + + @gen.coroutine + def dispatch_queue(self): + """Coroutine to preserve order of message handling + + Ensures that only one message is processing at a time, + even when the handler is async + """ + + while True: + # receive the next message and handle it + try: + yield self.process_one() + except Exception: + self.log.exception("Error in message handler") + + def schedule_dispatch(self, priority, dispatch, *args): + """schedule a message for dispatch""" + # via loop.add_callback to ensure everything gets scheduled + # on the eventloop + self.io_loop.add_callback( + lambda: self.msg_queue.put( + _MessageEvent(( + priority, + self.io_loop.time(), + dispatch, + args, + )) + ) + ) def start(self): """register dispatchers for streams""" self.io_loop = ioloop.IOLoop.current() - if self.control_stream: - self.control_stream.on_recv(self.dispatch_control, copy=False) + self.msg_queue = PriorityQueue() + self.io_loop.add_callback(self.dispatch_queue) + - def make_dispatcher(stream): - def dispatcher(msg): - return self.dispatch_shell(stream, msg) - return dispatcher + if self.control_stream: + self.control_stream.on_recv( + partial( + self.schedule_dispatch, + CONTROL_PRIORITY, + self.dispatch_control, + ), + copy=False, + ) for s in self.shell_streams: - s.on_recv(make_dispatcher(s), copy=False) + if s is self.control_stream: + continue + s.on_recv( + partial( + self.schedule_dispatch, + SHELL_PRIORITY, + self.dispatch_shell, + s, + ), + copy=False, + ) # publish idle status self._publish_status('starting') - def do_one_iteration(self): - """step eventloop just once""" - if self.control_stream: - self.control_stream.flush() - for stream in self.shell_streams: - # handle at most one request per iteration - stream.flush(zmq.POLLIN, 1) - stream.flush(zmq.POLLOUT) def record_ports(self, ports): """Record the ports that this kernel is using. @@ -370,6 +481,7 @@ def finish_metadata(self, parent, metadata, reply_content): """ return metadata + @gen.coroutine def execute_request(self, stream, ident, parent): """handle an execute_request""" @@ -395,8 +507,12 @@ def execute_request(self, stream, ident, parent): self.execution_count += 1 self._publish_execute_input(code, parent, self.execution_count) - reply_content = self.do_execute(code, silent, store_history, - user_expressions, allow_stdin) + reply_content = yield gen.maybe_future( + self.do_execute( + code, silent, store_history, + user_expressions, allow_stdin, + ) + ) # Flush output before sending the reply. sys.stdout.flush() @@ -418,7 +534,7 @@ def execute_request(self, stream, ident, parent): self.log.debug("%s", reply_msg) if not silent and reply_msg['content']['status'] == u'error' and stop_on_error: - self._abort_queues() + yield self._abort_queues() def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): @@ -426,12 +542,13 @@ def do_execute(self, code, silent, store_history=True, """ raise NotImplementedError + @gen.coroutine def complete_request(self, stream, ident, parent): content = parent['content'] code = content['code'] cursor_pos = content['cursor_pos'] - matches = self.do_complete(code, cursor_pos) + matches = yield gen.maybe_future(self.do_complete(code, cursor_pos)) matches = json_clean(matches) completion_msg = self.session.send(stream, 'complete_reply', matches, parent, ident) @@ -445,11 +562,16 @@ def do_complete(self, code, cursor_pos): 'metadata' : {}, 'status' : 'ok'} + @gen.coroutine def inspect_request(self, stream, ident, parent): content = parent['content'] - reply_content = self.do_inspect(content['code'], content['cursor_pos'], - content.get('detail_level', 0)) + reply_content = yield gen.maybe_future( + self.do_inspect( + content['code'], content['cursor_pos'], + content.get('detail_level', 0), + ) + ) # Before we send this object over, we scrub it for JSON usage reply_content = json_clean(reply_content) msg = self.session.send(stream, 'inspect_reply', @@ -461,10 +583,11 @@ def do_inspect(self, code, cursor_pos, detail_level=0): """ return {'status': 'ok', 'data': {}, 'metadata': {}, 'found': False} + @gen.coroutine def history_request(self, stream, ident, parent): content = parent['content'] - reply_content = self.do_history(**content) + reply_content = yield gen.maybe_future(self.do_history(**content)) reply_content = json_clean(reply_content) msg = self.session.send(stream, 'history_reply', @@ -523,8 +646,9 @@ def comm_info_request(self, stream, ident, parent): reply_content, parent, ident) self.log.debug("%s", msg) + @gen.coroutine def shutdown_request(self, stream, ident, parent): - content = self.do_shutdown(parent['content']['restart']) + content = yield gen.maybe_future(self.do_shutdown(parent['content']['restart'])) self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) # same content, but different msg_id for broadcasting on IOPub self._shutdown_message = self.session.msg(u'shutdown_reply', @@ -542,14 +666,15 @@ def do_shutdown(self, restart): """ return {'status': 'ok', 'restart': restart} + @gen.coroutine def is_complete_request(self, stream, ident, parent): content = parent['content'] code = content['code'] - reply_content = self.do_is_complete(code) + reply_content = yield gen.maybe_future(self.do_is_complete(code)) reply_content = json_clean(reply_content) reply_msg = self.session.send(stream, 'is_complete_reply', - reply_content, parent, ident) + reply_content, parent, ident) self.log.debug("%s", reply_msg) def do_is_complete(self, code): @@ -595,7 +720,7 @@ def do_apply(self, content, bufs, msg_id, reply_metadata): def abort_request(self, stream, ident, parent): """abort a specific msg by id""" - self.log.warning("abort_request is deprecated in kernel_base. It os only part of IPython parallel") + self.log.warning("abort_request is deprecated in kernel_base. It is only part of IPython parallel") msg_ids = parent['content'].get('msg_ids', None) if isinstance(msg_ids, string_types): msg_ids = [msg_ids] @@ -611,13 +736,13 @@ def abort_request(self, stream, ident, parent): def clear_request(self, stream, idents, parent): """Clear our namespace.""" - self.log.warning("clear_request is deprecated in kernel_base. It os only part of IPython parallel") + self.log.warning("clear_request is deprecated in kernel_base. It is only part of IPython parallel") content = self.do_clear() self.session.send(stream, 'clear_reply', ident=idents, parent=parent, content = content) def do_clear(self): - """DEPRECATED""" + """DEPRECATED since 4.0.3""" raise NotImplementedError #--------------------------------------------------------------------------- @@ -630,35 +755,38 @@ def _topic(self, topic): return py3compat.cast_bytes("%s.%s" % (base, topic)) + _aborting = Bool(False) + + @gen.coroutine def _abort_queues(self): for stream in self.shell_streams: - if stream: - self._abort_queue(stream) + stream.flush() + self._aborting = True - def _abort_queue(self, stream): - poller = zmq.Poller() - poller.register(stream.socket, zmq.POLLIN) - while True: - idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) - if msg is None: - return + self.schedule_dispatch( + ABORT_PRIORITY, + self._dispatch_abort, + ) - self.log.info("Aborting:") - self.log.info("%s", msg) - msg_type = msg['header']['msg_type'] - reply_type = msg_type.split('_')[0] + '_reply' - - status = {'status' : 'aborted'} - md = {'engine' : self.ident} - md.update(status) - self._publish_status('busy', parent=msg) - reply_msg = self.session.send(stream, reply_type, metadata=md, - content=status, parent=msg, ident=idents) - self._publish_status('idle', parent=msg) - self.log.debug("%s", reply_msg) - # We need to wait a bit for requests to come in. This can probably - # be set shorter for true asynchronous clients. - poller.poll(50) + @gen.coroutine + def _dispatch_abort(self): + self.log.info("Finishing abort") + yield gen.sleep(0.05) + self._aborting = False + + @gen.coroutine + def _send_abort_reply(self, stream, msg, idents): + """Send a reply to an aborted request""" + self.log.info("Aborting:") + self.log.info("%s", msg) + reply_type = msg['header']['msg_type'].rsplit('_', 1)[0] + '_reply' + status = {'status': 'aborted'} + md = {'engine': self.ident} + md.update(status) + self.session.send( + stream, reply_type, metadata=md, + content=status, parent=msg, ident=idents, + ) def _no_raw_input(self): """Raise StdinNotImplentedError if active frontend doesn't support @@ -747,7 +875,6 @@ def _input_request(self, prompt, ident, parent, password=False): def _at_shutdown(self): """Actions taken at shutdown by the kernel, called by python's atexit. """ - # io.rprint("Kernel at_shutdown") # dbg if self._shutdown_message is not None: self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) self.log.debug("%s", self._shutdown_message) diff --git a/ipykernel/tests/test_async.py b/ipykernel/tests/test_async.py new file mode 100644 index 000000000..01b022369 --- /dev/null +++ b/ipykernel/tests/test_async.py @@ -0,0 +1,74 @@ +"""Test async/await integration""" + +from distutils.version import LooseVersion as V +import sys + +import pytest +import IPython + + +from .utils import execute, flush_channels, start_new_kernel, TIMEOUT +from .test_message_spec import validate_message + + +KC = KM = None + + +def setup(): + """start the global kernel (if it isn't running) and return its client""" + global KM, KC + KM, KC = start_new_kernel() + flush_channels(KC) + + +def teardown(): + KC.stop_channels() + KM.shutdown_kernel(now=True) + + +skip_without_async = pytest.mark.skipif( + sys.version_info < (3, 5) or V(IPython.__version__) < V("7.0"), + reason="IPython >=7 with async/await required", +) + + +@skip_without_async +def test_async_await(): + flush_channels(KC) + msg_id, content = execute("import asyncio; await asyncio.sleep(0.1)", KC) + assert content["status"] == "ok", content + + +@pytest.mark.parametrize("asynclib", ["asyncio", "trio", "curio"]) +@skip_without_async +def test_async_interrupt(asynclib, request): + try: + __import__(asynclib) + except ImportError: + pytest.skip("Requires %s" % asynclib) + request.addfinalizer(lambda: execute("%autoawait asyncio", KC)) + + flush_channels(KC) + msg_id, content = execute("%autoawait " + asynclib, KC) + assert content["status"] == "ok", content + + flush_channels(KC) + msg_id = KC.execute( + "print('begin'); import {0}; await {0}.sleep(5)".format(asynclib) + ) + busy = KC.get_iopub_msg(timeout=TIMEOUT) + validate_message(busy, "status", msg_id) + assert busy["content"]["execution_state"] == "busy" + echo = KC.get_iopub_msg(timeout=TIMEOUT) + validate_message(echo, "execute_input") + stream = KC.get_iopub_msg(timeout=TIMEOUT) + # wait for the stream output to be sure kernel is in the async block + validate_message(stream, "stream") + assert stream["content"]["text"] == "begin\n" + + KM.interrupt_kernel() + reply = KC.get_shell_msg()["content"] + assert reply["status"] == "error", reply + assert reply["ename"] in {"CancelledError", "KeyboardInterrupt"} + + flush_channels(KC) diff --git a/ipykernel/tests/test_eventloop.py b/ipykernel/tests/test_eventloop.py index c487ee50b..a44907f10 100644 --- a/ipykernel/tests/test_eventloop.py +++ b/ipykernel/tests/test_eventloop.py @@ -1,9 +1,10 @@ """Test eventloop integration""" import sys -import time -import IPython.testing.decorators as dec +import pytest +import tornado + from .utils import flush_channels, start_new_kernel, execute KC = KM = None @@ -27,7 +28,8 @@ def teardown(): """ -@dec.skipif(sys.version_info < (3, 5), "async/await syntax required") +@pytest.mark.skipif(sys.version_info < (3, 5), reason="async/await syntax required") +@pytest.mark.skipif(tornado.version_info < (5,), reason="only relevant on tornado 5") def test_asyncio_interrupt(): flush_channels(KC) msg_id, content = execute('%gui asyncio', KC) diff --git a/ipykernel/tests/test_message_spec.py b/ipykernel/tests/test_message_spec.py index ebf9d9237..e75e219c6 100644 --- a/ipykernel/tests/test_message_spec.py +++ b/ipykernel/tests/test_message_spec.py @@ -464,9 +464,19 @@ def test_comm_info_request(): def test_single_payload(): + """ + We want to test the set_next_input is not triggered several time per cell. + This is (was ?) mostly due to the fact that `?` in a loop would trigger + several set_next_input. + + I'm tempted to thing that we actually want to _allow_ multiple + set_next_input (that's users' choice). But that `?` itself (and ?'s + transform) should avoid setting multiple set_next_input). + """ flush_channels() - msg_id, reply = execute(code="for i in range(3):\n"+ - " x=range?\n") + msg_id, reply = execute(code="ip = get_ipython()\n" + "for i in range(3):\n" + " ip.set_next_input('Hello There')\n") payload = reply['payload'] next_input_pls = [pl for pl in payload if pl["source"] == "set_next_input"] assert len(next_input_pls) == 1 diff --git a/ipykernel/zmqshell.py b/ipykernel/zmqshell.py index 3cdf6e6e3..a76240f61 100644 --- a/ipykernel/zmqshell.py +++ b/ipykernel/zmqshell.py @@ -174,7 +174,7 @@ def register_hook(self, hook): The DisplayHook objects must return a message from the __call__ method if they still require the - `session.send` method to be called after tranformation. + `session.send` method to be called after transformation. Returning `None` will halt that execution path, and session.send will not be called. """ @@ -253,7 +253,7 @@ def edit(self, parameter_s='', last_call=['','']): Arguments: - If arguments are given, the following possibilites exist: + If arguments are given, the following possibilities exist: - The arguments are numbers or pairs of colon-separated numbers (like 1 4:8 9). These are interpreted as lines of previous input to be diff --git a/setup.py b/setup.py index 46a3c05ea..f76807d4d 100644 --- a/setup.py +++ b/setup.py @@ -16,13 +16,11 @@ import sys v = sys.version_info -if v[:2] < (2,7) or (v[0] >= 3 and v[:2] < (3,4)): - error = "ERROR: %s requires Python version 2.7 or 3.4 or above." % name +if v[:2] < (3, 4): + error = "ERROR: %s requires Python version 3.4 or above." % name print(error, file=sys.stderr) sys.exit(1) -PY3 = (sys.version_info[0] >= 3) - #----------------------------------------------------------------------------- # get on with it #----------------------------------------------------------------------------- @@ -81,12 +79,12 @@ def run(self): long_description="The IPython kernel for Jupyter", platforms="Linux, Mac OS X, Windows", keywords=['Interactive', 'Interpreter', 'Shell', 'Web'], - python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*', + python_requires='>=3.4', install_requires=[ 'ipython>=4.0.0', 'traitlets>=4.1.0', 'jupyter_client', - 'tornado>=4.0', + 'tornado>=4.2', ], extras_require={ 'test:python_version=="2.7"': ['mock'], @@ -107,6 +105,7 @@ def run(self): ], ) + if any(a.startswith(('bdist', 'build', 'install')) for a in sys.argv): from ipykernel.kernelspec import write_kernel_spec, make_ipkernel_cmd, KERNEL_NAME