diff --git a/playwright/_impl/_connection.py b/playwright/_impl/_connection.py index de6962e16..a89c6fb64 100644 --- a/playwright/_impl/_connection.py +++ b/playwright/_impl/_connection.py @@ -180,6 +180,7 @@ def __init__( self.playwright_future: asyncio.Future["Playwright"] = loop.create_future() self._error: Optional[BaseException] = None self.is_remote = False + self._init_task: Optional[asyncio.Task] = None def mark_as_remote(self) -> None: self.is_remote = True @@ -196,12 +197,13 @@ async def init() -> None: self.playwright_future.set_result(await self._root_object.initialize()) await self._transport.connect() - self._loop.create_task(init()) + self._init_task = self._loop.create_task(init()) await self._transport.run() def stop_sync(self) -> None: self._transport.request_stop() self._dispatcher_fiber.switch() + self._loop.run_until_complete(self._transport.wait_until_stopped()) self.cleanup() async def stop_async(self) -> None: @@ -210,6 +212,8 @@ async def stop_async(self) -> None: self.cleanup() def cleanup(self) -> None: + if self._init_task and not self._init_task.done(): + self._init_task.cancel() for ws_connection in self._child_ws_connections: ws_connection._transport.dispose() self.emit("close") diff --git a/playwright/_impl/_transport.py b/playwright/_impl/_transport.py index 9215a4087..54ad1dfdd 100644 --- a/playwright/_impl/_transport.py +++ b/playwright/_impl/_transport.py @@ -110,7 +110,6 @@ def request_stop(self) -> None: async def wait_until_stopped(self) -> None: await self._stopped_future - await self._proc.wait() async def connect(self) -> None: self._stopped_future: asyncio.Future = asyncio.Future() @@ -147,22 +146,30 @@ async def run(self) -> None: while not self._stopped: try: buffer = await self._proc.stdout.readexactly(4) + if self._stopped: + break length = int.from_bytes(buffer, byteorder="little", signed=False) buffer = bytes(0) while length: to_read = min(length, 32768) data = await self._proc.stdout.readexactly(to_read) + if self._stopped: + break length -= to_read if len(buffer): buffer = buffer + data else: buffer = data + if self._stopped: + break obj = self.deserialize_message(buffer) self.on_message(obj) except asyncio.IncompleteReadError: break await asyncio.sleep(0) + + await self._proc.wait() self._stopped_future.set_result(None) def send(self, message: Dict) -> None: diff --git a/playwright/async_api/_context_manager.py b/playwright/async_api/_context_manager.py index 1b40ad2f1..b5bdbbbb3 100644 --- a/playwright/async_api/_context_manager.py +++ b/playwright/async_api/_context_manager.py @@ -37,7 +37,7 @@ async def __aenter__(self) -> AsyncPlaywright: loop.create_task(self._connection.run()) playwright_future = self._connection.playwright_future - done, pending = await asyncio.wait( + done, _ = await asyncio.wait( {self._connection._transport.on_error_future, playwright_future}, return_when=asyncio.FIRST_COMPLETED, ) diff --git a/playwright/sync_api/_context_manager.py b/playwright/sync_api/_context_manager.py index 47a41794e..926639cf3 100644 --- a/playwright/sync_api/_context_manager.py +++ b/playwright/sync_api/_context_manager.py @@ -13,7 +13,8 @@ # limitations under the License. import asyncio -from typing import Any +import sys +from typing import Any, Optional from greenlet import greenlet @@ -29,34 +30,45 @@ class PlaywrightContextManager: def __init__(self) -> None: self._playwright: SyncPlaywright + self._loop: asyncio.AbstractEventLoop + self._own_loop = False + self._watcher: Optional[asyncio.AbstractChildWatcher] = None def __enter__(self) -> SyncPlaywright: - loop: asyncio.AbstractEventLoop - own_loop = None try: - loop = asyncio.get_running_loop() + self._loop = asyncio.get_running_loop() except RuntimeError: - loop = asyncio.new_event_loop() - own_loop = loop - if loop.is_running(): + self._loop = asyncio.new_event_loop() + self._own_loop = True + if self._loop.is_running(): raise Error( """It looks like you are using Playwright Sync API inside the asyncio loop. Please use the Async API instead.""" ) - def greenlet_main() -> None: - loop.run_until_complete(self._connection.run_as_sync()) + # In Python 3.7, asyncio.Process.wait() hangs because it does not use ThreadedChildWatcher + # which is used in Python 3.8+. This is unix specific and also takes care about + # cleaning up zombie processes. See https://bugs.python.org/issue35621 + if ( + sys.version_info[0] == 3 + and sys.version_info[1] == 7 + and sys.platform != "win32" + and isinstance(asyncio.get_child_watcher(), asyncio.SafeChildWatcher) + ): + from ._py37ThreadedChildWatcher import ThreadedChildWatcher # type: ignore + + self._watcher = ThreadedChildWatcher() + asyncio.set_child_watcher(self._watcher) # type: ignore - if own_loop: - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() + def greenlet_main() -> None: + self._loop.run_until_complete(self._connection.run_as_sync()) dispatcher_fiber = greenlet(greenlet_main) self._connection = Connection( dispatcher_fiber, create_remote_object, - PipeTransport(loop, compute_driver_executable()), - loop, + PipeTransport(self._loop, compute_driver_executable()), + self._loop, ) g_self = greenlet.getcurrent() @@ -77,3 +89,8 @@ def start(self) -> SyncPlaywright: def __exit__(self, *args: Any) -> None: self._connection.stop_sync() + if self._watcher: + self._watcher.close() + if self._own_loop: + self._loop.run_until_complete(self._loop.shutdown_asyncgens()) + self._loop.close() diff --git a/playwright/sync_api/_py37ThreadedChildWatcher.py b/playwright/sync_api/_py37ThreadedChildWatcher.py new file mode 100644 index 000000000..9fb57f33e --- /dev/null +++ b/playwright/sync_api/_py37ThreadedChildWatcher.py @@ -0,0 +1,166 @@ +# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2 +# -------------------------------------------- +# +# 1. This LICENSE AGREEMENT is between the Python Software Foundation +# ("PSF"), and the Individual or Organization ("Licensee") accessing and +# otherwise using this software ("Python") in source or binary form and +# its associated documentation. +# +# 2. Subject to the terms and conditions of this License Agreement, PSF hereby +# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce, +# analyze, test, perform and/or display publicly, prepare derivative works, +# distribute, and otherwise use Python alone or in any derivative version, +# provided, however, that PSF's License Agreement and PSF's notice of copyright, +# i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Python Software Foundation; +# All Rights Reserved" are retained in Python alone or in any derivative version +# prepared by Licensee. +# +# 3. In the event Licensee prepares a derivative work that is based on +# or incorporates Python or any part thereof, and wants to make +# the derivative work available to others as provided herein, then +# Licensee hereby agrees to include in any such work a brief summary of +# the changes made to Python. +# +# 4. PSF is making Python available to Licensee on an "AS IS" +# basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR +# IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND +# DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS +# FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT +# INFRINGE ANY THIRD PARTY RIGHTS. +# +# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON +# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS +# A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, +# OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. +# +# 6. This License Agreement will automatically terminate upon a material +# breach of its terms and conditions. +# +# 7. Nothing in this License Agreement shall be deemed to create any +# relationship of agency, partnership, or joint venture between PSF and +# Licensee. This License Agreement does not grant permission to use PSF +# trademarks or trade name in a trademark sense to endorse or promote +# products or services of Licensee, or any third party. +# +# 8. By copying, installing or otherwise using Python, Licensee +# agrees to be bound by the terms and conditions of this License +# Agreement. +# +# type: ignore + +import itertools +import os +import threading +import warnings +from asyncio import AbstractChildWatcher, events +from asyncio.log import logger + + +class ThreadedChildWatcher(AbstractChildWatcher): + """Threaded child watcher implementation. + The watcher uses a thread per process + for waiting for the process finish. + It doesn't require subscription on POSIX signal + but a thread creation is not free. + The watcher has O(1) complexity, its performance doesn't depend + on amount of spawn processes. + """ + + def __init__(self): + self._pid_counter = itertools.count(0) + self._threads = {} + + def is_active(self): + return True + + def close(self): + self._join_threads() + + def _join_threads(self): + """Internal: Join all non-daemon threads""" + threads = [ + thread + for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon + ] + for thread in threads: + thread.join() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __del__(self, _warn=warnings.warn): + threads = [ + thread for thread in list(self._threads.values()) if thread.is_alive() + ] + if threads: + _warn( + f"{self.__class__} has registered but not finished child processes", + ResourceWarning, + source=self, + ) + + def add_child_handler(self, pid, callback, *args): + loop = events.get_running_loop() + thread = threading.Thread( + target=self._do_waitpid, + name=f"waitpid-{next(self._pid_counter)}", + args=(loop, pid, callback, args), + daemon=True, + ) + self._threads[pid] = thread + thread.start() + + def remove_child_handler(self, pid): + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classe requires it + return True + + def attach_loop(self, loop): + pass + + def _do_waitpid(self, loop, expected_pid, callback, args): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", pid + ) + else: + returncode = _compute_returncode(status) + if loop.get_debug(): + logger.debug( + "process %s exited with returncode %s", expected_pid, returncode + ) + + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is closed", loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + self._threads.pop(expected_pid) + + +def _compute_returncode(status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status