|
29 | 29 | import sys |
30 | 30 | import warnings |
31 | 31 | import weakref |
32 | | -from contextlib import contextmanager |
33 | | -from heapq import heappop |
34 | 32 |
|
35 | 33 | try: |
36 | 34 | import ssl |
@@ -388,10 +386,7 @@ async def wait_closed(self): |
388 | 386 |
|
389 | 387 | class BaseEventLoop(events.AbstractEventLoop): |
390 | 388 |
|
391 | | - _is_proactorloop = False |
392 | | - |
393 | 389 | def __init__(self): |
394 | | - self._num_runs_pending = 0 |
395 | 390 | self._timer_cancelled_count = 0 |
396 | 391 | self._closed = False |
397 | 392 | self._stopping = False |
@@ -586,75 +581,75 @@ def _do_shutdown(self, future): |
586 | 581 | except Exception as ex: |
587 | 582 | self.call_soon_threadsafe(future.set_exception, ex) |
588 | 583 |
|
589 | | - def _check_running(self): |
590 | | - pass |
591 | | - |
592 | | - @contextmanager |
593 | | - def manage_run(self): |
594 | | - """Set up the loop for running.""" |
| 584 | + def _check_running(self, running_ok=False): |
| 585 | + if self.is_running(): |
| 586 | + raise RuntimeError('This event loop is already running') |
| 587 | + if not running_ok and events._get_running_loop() is not None: |
| 588 | + raise RuntimeError( |
| 589 | + 'Cannot run the event loop while another loop is running') |
| 590 | + |
| 591 | + def run_forever(self, running_ok=False): |
| 592 | + """Run until stop() is called.""" |
595 | 593 | self._check_closed() |
| 594 | + self._check_running(running_ok=running_ok) |
| 595 | + self._set_coroutine_origin_tracking(self._debug) |
596 | 596 | old_thread_id = self._thread_id |
597 | 597 | old_running_loop = events._get_running_loop() |
| 598 | + self._thread_id = threading.get_ident() |
| 599 | + |
| 600 | + old_agen_hooks = sys.get_asyncgen_hooks() |
| 601 | + sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, |
| 602 | + finalizer=self._asyncgen_finalizer_hook) |
598 | 603 | try: |
599 | | - self._thread_id = threading.get_ident() |
600 | 604 | events._set_running_loop(self) |
601 | | - self._num_runs_pending += 1 |
602 | | - if self._is_proactorloop: |
603 | | - if self._self_reading_future is None: |
604 | | - self.call_soon(self._loop_self_reading) |
605 | | - yield |
| 605 | + while True: |
| 606 | + self._run_once() |
| 607 | + if self._stopping: |
| 608 | + break |
606 | 609 | finally: |
| 610 | + self._stopping = False |
607 | 611 | self._thread_id = old_thread_id |
608 | 612 | events._set_running_loop(old_running_loop) |
609 | | - self._num_runs_pending -= 1 |
610 | | - if self._is_proactorloop: |
611 | | - if (self._num_runs_pending == 0 |
612 | | - and self._self_reading_future is not None): |
613 | | - ov = self._self_reading_future._ov |
614 | | - self._self_reading_future.cancel() |
615 | | - if ov is not None: |
616 | | - self._proactor._unregister(ov) |
617 | | - self._self_reading_future = None |
618 | | - |
619 | | - @contextmanager |
620 | | - def manage_asyncgens(self): |
621 | | - if not hasattr(sys, 'get_asyncgen_hooks'): |
622 | | - # Python version is too old. |
623 | | - return |
624 | | - old_agen_hooks = sys.get_asyncgen_hooks() |
625 | | - try: |
626 | | - self._set_coroutine_origin_tracking(self._debug) |
627 | | - if self._asyncgens is not None: |
628 | | - sys.set_asyncgen_hooks( |
629 | | - firstiter=self._asyncgen_firstiter_hook, |
630 | | - finalizer=self._asyncgen_finalizer_hook) |
631 | | - yield |
632 | | - finally: |
633 | 613 | self._set_coroutine_origin_tracking(False) |
634 | | - if self._asyncgens is not None: |
635 | | - sys.set_asyncgen_hooks(*old_agen_hooks) |
| 614 | + sys.set_asyncgen_hooks(*old_agen_hooks) |
636 | 615 |
|
637 | | - def run_forever(self): |
638 | | - with self.manage_run(), self.manage_asyncgens(): |
639 | | - while True: |
640 | | - self._run_once() |
641 | | - if self._stopping: |
642 | | - break |
643 | | - self._stopping = False |
| 616 | + def run_until_complete(self, future, running_ok=False): |
| 617 | + """Run until the Future is done. |
644 | 618 |
|
645 | | - def run_until_complete(self, future): |
646 | | - with self.manage_run(): |
647 | | - f = tasks.ensure_future(future, loop=self) |
648 | | - if f is not future: |
649 | | - f._log_destroy_pending = False |
650 | | - while not f.done(): |
651 | | - self._run_once() |
652 | | - if self._stopping: |
653 | | - break |
654 | | - if not f.done(): |
655 | | - raise RuntimeError( |
656 | | - 'Event loop stopped before Future completed.') |
657 | | - return f.result() |
| 619 | + If the argument is a coroutine, it is wrapped in a Task. |
| 620 | +
|
| 621 | + WARNING: It would be disastrous to call run_until_complete() |
| 622 | + with the same coroutine twice -- it would wrap it in two |
| 623 | + different Tasks and that can't be good. |
| 624 | +
|
| 625 | + Return the Future's result, or raise its exception. |
| 626 | + """ |
| 627 | + self._check_closed() |
| 628 | + self._check_running(running_ok=running_ok) |
| 629 | + |
| 630 | + new_task = not futures.isfuture(future) |
| 631 | + future = tasks.ensure_future(future, loop=self) |
| 632 | + if new_task: |
| 633 | + # An exception is raised if the future didn't complete, so there |
| 634 | + # is no need to log the "destroy pending task" message |
| 635 | + future._log_destroy_pending = False |
| 636 | + |
| 637 | + future.add_done_callback(_run_until_complete_cb) |
| 638 | + try: |
| 639 | + self.run_forever(running_ok=running_ok) |
| 640 | + except: |
| 641 | + if new_task and future.done() and not future.cancelled(): |
| 642 | + # The coroutine raised a BaseException. Consume the exception |
| 643 | + # to not log a warning, the caller doesn't have access to the |
| 644 | + # local task. |
| 645 | + future.exception() |
| 646 | + raise |
| 647 | + finally: |
| 648 | + future.remove_done_callback(_run_until_complete_cb) |
| 649 | + if not future.done(): |
| 650 | + raise RuntimeError('Event loop stopped before Future completed.') |
| 651 | + |
| 652 | + return future.result() |
658 | 653 |
|
659 | 654 | def stop(self): |
660 | 655 | """Stop running the event loop. |
@@ -1839,35 +1834,82 @@ def _timer_handle_cancelled(self, handle): |
1839 | 1834 | self._timer_cancelled_count += 1 |
1840 | 1835 |
|
1841 | 1836 | def _run_once(self): |
| 1837 | + """Run one full iteration of the event loop. |
| 1838 | +
|
| 1839 | + This calls all currently ready callbacks, polls for I/O, |
| 1840 | + schedules the resulting callbacks, and finally schedules |
| 1841 | + 'call_later' callbacks. |
1842 | 1842 | """ |
1843 | | - Simplified re-implementation of asyncio's _run_once that |
1844 | | - runs handles as they become ready. |
1845 | | - """ |
1846 | | - ready = self._ready |
1847 | | - scheduled = self._scheduled |
1848 | | - while scheduled and scheduled[0]._cancelled: |
1849 | | - heappop(scheduled) |
1850 | | - |
1851 | | - timeout = ( |
1852 | | - 0 if ready or self._stopping |
1853 | | - else min(max( |
1854 | | - scheduled[0]._when - self.time(), 0), 86400) if scheduled |
1855 | | - else None) |
| 1843 | + |
| 1844 | + sched_count = len(self._scheduled) |
| 1845 | + if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and |
| 1846 | + self._timer_cancelled_count / sched_count > |
| 1847 | + _MIN_CANCELLED_TIMER_HANDLES_FRACTION): |
| 1848 | + # Remove delayed calls that were cancelled if their number |
| 1849 | + # is too high |
| 1850 | + new_scheduled = [] |
| 1851 | + for handle in self._scheduled: |
| 1852 | + if handle._cancelled: |
| 1853 | + handle._scheduled = False |
| 1854 | + else: |
| 1855 | + new_scheduled.append(handle) |
| 1856 | + |
| 1857 | + heapq.heapify(new_scheduled) |
| 1858 | + self._scheduled = new_scheduled |
| 1859 | + self._timer_cancelled_count = 0 |
| 1860 | + else: |
| 1861 | + # Remove delayed calls that were cancelled from head of queue. |
| 1862 | + while self._scheduled and self._scheduled[0]._cancelled: |
| 1863 | + self._timer_cancelled_count -= 1 |
| 1864 | + handle = heapq.heappop(self._scheduled) |
| 1865 | + handle._scheduled = False |
| 1866 | + |
| 1867 | + timeout = None |
| 1868 | + if self._ready or self._stopping: |
| 1869 | + timeout = 0 |
| 1870 | + elif self._scheduled: |
| 1871 | + # Compute the desired timeout. |
| 1872 | + when = self._scheduled[0]._when |
| 1873 | + timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) |
| 1874 | + |
1856 | 1875 | event_list = self._selector.select(timeout) |
1857 | 1876 | self._process_events(event_list) |
1858 | 1877 |
|
| 1878 | + # Handle 'later' callbacks that are ready. |
1859 | 1879 | end_time = self.time() + self._clock_resolution |
1860 | | - while scheduled and scheduled[0]._when < end_time: |
1861 | | - handle = heappop(scheduled) |
1862 | | - ready.append(handle) |
1863 | | - |
1864 | | - for _ in range(len(ready)): |
1865 | | - if not ready: |
| 1880 | + while self._scheduled: |
| 1881 | + handle = self._scheduled[0] |
| 1882 | + if handle._when >= end_time: |
1866 | 1883 | break |
1867 | | - handle = ready.popleft() |
1868 | | - if not handle._cancelled: |
| 1884 | + handle = heapq.heappop(self._scheduled) |
| 1885 | + handle._scheduled = False |
| 1886 | + self._ready.append(handle) |
| 1887 | + |
| 1888 | + # This is the only place where callbacks are actually *called*. |
| 1889 | + # All other places just add them to ready. |
| 1890 | + # Note: We run all currently scheduled callbacks, but not any |
| 1891 | + # callbacks scheduled by callbacks run this time around -- |
| 1892 | + # they will be run the next time (after another I/O poll). |
| 1893 | + # Use an idiom that is thread-safe without using locks. |
| 1894 | + ntodo = len(self._ready) |
| 1895 | + for i in range(ntodo): |
| 1896 | + handle = self._ready.popleft() |
| 1897 | + if handle._cancelled: |
| 1898 | + continue |
| 1899 | + if self._debug: |
| 1900 | + try: |
| 1901 | + self._current_handle = handle |
| 1902 | + t0 = self.time() |
| 1903 | + handle._run() |
| 1904 | + dt = self.time() - t0 |
| 1905 | + if dt >= self.slow_callback_duration: |
| 1906 | + logger.warning('Executing %s took %.3f seconds', |
| 1907 | + _format_handle(handle), dt) |
| 1908 | + finally: |
| 1909 | + self._current_handle = None |
| 1910 | + else: |
1869 | 1911 | handle._run() |
1870 | | - handle = None |
| 1912 | + handle = None # Needed to break cycles when an exception occurs. |
1871 | 1913 |
|
1872 | 1914 | def _set_coroutine_origin_tracking(self, enabled): |
1873 | 1915 | if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): |
|
0 commit comments