Skip to content

fix: enhance process close procedure #1213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion playwright/_impl/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(
self.playwright_future: asyncio.Future["Playwright"] = loop.create_future()
self._error: Optional[BaseException] = None
self.is_remote = False
self._init_task: Optional[asyncio.Task] = None

def mark_as_remote(self) -> None:
self.is_remote = True
Expand All @@ -196,12 +197,13 @@ async def init() -> None:
self.playwright_future.set_result(await self._root_object.initialize())

await self._transport.connect()
self._loop.create_task(init())
self._init_task = self._loop.create_task(init())
await self._transport.run()

def stop_sync(self) -> None:
self._transport.request_stop()
self._dispatcher_fiber.switch()
self._loop.run_until_complete(self._transport.wait_until_stopped())
self.cleanup()

async def stop_async(self) -> None:
Expand All @@ -210,6 +212,8 @@ async def stop_async(self) -> None:
self.cleanup()

def cleanup(self) -> None:
if self._init_task and not self._init_task.done():
self._init_task.cancel()
for ws_connection in self._child_ws_connections:
ws_connection._transport.dispose()
self.emit("close")
Expand Down
9 changes: 8 additions & 1 deletion playwright/_impl/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def request_stop(self) -> None:

async def wait_until_stopped(self) -> None:
await self._stopped_future
await self._proc.wait()

async def connect(self) -> None:
self._stopped_future: asyncio.Future = asyncio.Future()
Expand Down Expand Up @@ -147,22 +146,30 @@ async def run(self) -> None:
while not self._stopped:
try:
buffer = await self._proc.stdout.readexactly(4)
if self._stopped:
break
length = int.from_bytes(buffer, byteorder="little", signed=False)
buffer = bytes(0)
while length:
to_read = min(length, 32768)
data = await self._proc.stdout.readexactly(to_read)
if self._stopped:
break
length -= to_read
if len(buffer):
buffer = buffer + data
else:
buffer = data
if self._stopped:
break

obj = self.deserialize_message(buffer)
self.on_message(obj)
except asyncio.IncompleteReadError:
break
await asyncio.sleep(0)

await self._proc.wait()
self._stopped_future.set_result(None)

def send(self, message: Dict) -> None:
Expand Down
2 changes: 1 addition & 1 deletion playwright/async_api/_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def __aenter__(self) -> AsyncPlaywright:
loop.create_task(self._connection.run())
playwright_future = self._connection.playwright_future

done, pending = await asyncio.wait(
done, _ = await asyncio.wait(
{self._connection._transport.on_error_future, playwright_future},
return_when=asyncio.FIRST_COMPLETED,
)
Expand Down
45 changes: 31 additions & 14 deletions playwright/sync_api/_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.

import asyncio
from typing import Any
import sys
from typing import Any, Optional

from greenlet import greenlet

Expand All @@ -29,34 +30,45 @@
class PlaywrightContextManager:
def __init__(self) -> None:
self._playwright: SyncPlaywright
self._loop: asyncio.AbstractEventLoop
self._own_loop = False
self._watcher: Optional[asyncio.AbstractChildWatcher] = None

def __enter__(self) -> SyncPlaywright:
loop: asyncio.AbstractEventLoop
own_loop = None
try:
loop = asyncio.get_running_loop()
self._loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
own_loop = loop
if loop.is_running():
self._loop = asyncio.new_event_loop()
self._own_loop = True
if self._loop.is_running():
raise Error(
"""It looks like you are using Playwright Sync API inside the asyncio loop.
Please use the Async API instead."""
)

def greenlet_main() -> None:
loop.run_until_complete(self._connection.run_as_sync())
# In Python 3.7, asyncio.Process.wait() hangs because it does not use ThreadedChildWatcher
# which is used in Python 3.8+. This is unix specific and also takes care about
# cleaning up zombie processes. See https://bugs.python.org/issue35621
if (
sys.version_info[0] == 3
and sys.version_info[1] == 7
and sys.platform != "win32"
and isinstance(asyncio.get_child_watcher(), asyncio.SafeChildWatcher)
):
from ._py37ThreadedChildWatcher import ThreadedChildWatcher # type: ignore

self._watcher = ThreadedChildWatcher()
asyncio.set_child_watcher(self._watcher) # type: ignore

if own_loop:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
def greenlet_main() -> None:
self._loop.run_until_complete(self._connection.run_as_sync())

dispatcher_fiber = greenlet(greenlet_main)
self._connection = Connection(
dispatcher_fiber,
create_remote_object,
PipeTransport(loop, compute_driver_executable()),
loop,
PipeTransport(self._loop, compute_driver_executable()),
self._loop,
)

g_self = greenlet.getcurrent()
Expand All @@ -77,3 +89,8 @@ def start(self) -> SyncPlaywright:

def __exit__(self, *args: Any) -> None:
self._connection.stop_sync()
if self._watcher:
self._watcher.close()
if self._own_loop:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
self._loop.close()
166 changes: 166 additions & 0 deletions playwright/sync_api/_py37ThreadedChildWatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
# --------------------------------------------
#
# 1. This LICENSE AGREEMENT is between the Python Software Foundation
# ("PSF"), and the Individual or Organization ("Licensee") accessing and
# otherwise using this software ("Python") in source or binary form and
# its associated documentation.
#
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
# analyze, test, perform and/or display publicly, prepare derivative works,
# distribute, and otherwise use Python alone or in any derivative version,
# provided, however, that PSF's License Agreement and PSF's notice of copyright,
# i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Python Software Foundation;
# All Rights Reserved" are retained in Python alone or in any derivative version
# prepared by Licensee.
#
# 3. In the event Licensee prepares a derivative work that is based on
# or incorporates Python or any part thereof, and wants to make
# the derivative work available to others as provided herein, then
# Licensee hereby agrees to include in any such work a brief summary of
# the changes made to Python.
#
# 4. PSF is making Python available to Licensee on an "AS IS"
# basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
# IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
# DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
# FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
# INFRINGE ANY THIRD PARTY RIGHTS.
#
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
# A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
# OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
#
# 6. This License Agreement will automatically terminate upon a material
# breach of its terms and conditions.
#
# 7. Nothing in this License Agreement shall be deemed to create any
# relationship of agency, partnership, or joint venture between PSF and
# Licensee. This License Agreement does not grant permission to use PSF
# trademarks or trade name in a trademark sense to endorse or promote
# products or services of Licensee, or any third party.
#
# 8. By copying, installing or otherwise using Python, Licensee
# agrees to be bound by the terms and conditions of this License
# Agreement.
#
# type: ignore

import itertools
import os
import threading
import warnings
from asyncio import AbstractChildWatcher, events
from asyncio.log import logger


class ThreadedChildWatcher(AbstractChildWatcher):
"""Threaded child watcher implementation.
The watcher uses a thread per process
for waiting for the process finish.
It doesn't require subscription on POSIX signal
but a thread creation is not free.
The watcher has O(1) complexity, its performance doesn't depend
on amount of spawn processes.
"""

def __init__(self):
self._pid_counter = itertools.count(0)
self._threads = {}

def is_active(self):
return True

def close(self):
self._join_threads()

def _join_threads(self):
"""Internal: Join all non-daemon threads"""
threads = [
thread
for thread in list(self._threads.values())
if thread.is_alive() and not thread.daemon
]
for thread in threads:
thread.join()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def __del__(self, _warn=warnings.warn):
threads = [
thread for thread in list(self._threads.values()) if thread.is_alive()
]
if threads:
_warn(
f"{self.__class__} has registered but not finished child processes",
ResourceWarning,
source=self,
)

def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
thread = threading.Thread(
target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True,
)
self._threads[pid] = thread
thread.start()

def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classe requires it
return True

def attach_loop(self, loop):
pass

def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0

try:
pid, status = os.waitpid(expected_pid, 0)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255", pid
)
else:
returncode = _compute_returncode(status)
if loop.get_debug():
logger.debug(
"process %s exited with returncode %s", expected_pid, returncode
)

if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)

self._threads.pop(expected_pid)


def _compute_returncode(status):
if os.WIFSIGNALED(status):
# The child process died because of a signal.
return -os.WTERMSIG(status)
elif os.WIFEXITED(status):
# The child process exited (e.g sys.exit()).
return os.WEXITSTATUS(status)
else:
# The child exited, but we don't understand its status.
# This shouldn't happen, but if it does, let's just
# return that status; perhaps that helps debug it.
return status