Skip to content

Commit 8679e42

Browse files
devlib: Protect against too many BackgroundCommand
Since some servers (SSH) limit the number of active commands, ensure that we will not go over the limit for both the user-API Target.background() and using Target.execute.asyn() either. Achieve that by partitionning the slots in two bins: * User calls to Target.background(). Going over the limit will raise a TooManyBackgroundCommandsError exception. * Internal uses in Target.execute.asyn(). The number of background commands is tracked using a semaphore, and it will fallback to the blocking path if necessary.
1 parent f6ca4b5 commit 8679e42

File tree

3 files changed

+205
-53
lines changed

3 files changed

+205
-53
lines changed

devlib/connection.py

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@
2929
import logging
3030
import select
3131
import fcntl
32+
import asyncio
3233

33-
from devlib.utils.misc import InitCheckpoint
34+
from devlib.utils.misc import InitCheckpoint, memoized
3435

3536
_KILL_TIMEOUT = 3
3637

@@ -61,17 +62,127 @@ class ConnectionBase(InitCheckpoint):
6162
"""
6263
Base class for all connections.
6364
"""
65+
66+
_MAX_BACKGROUND_CMD = 50
67+
"""
68+
Check for up to this amount of available background commands.
69+
"""
70+
6471
def __init__(self):
6572
self._current_bg_cmds = WeakSet()
6673
self._closed = False
6774
self._close_lock = threading.Lock()
6875
self.busybox = None
76+
self._bg_spawn_lock = threading.RLock()
6977

7078
def cancel_running_command(self):
7179
bg_cmds = set(self._current_bg_cmds)
7280
for bg_cmd in bg_cmds:
7381
bg_cmd.cancel()
7482

83+
@property
84+
def _bg_async_sem(self):
85+
# If we have not installed busybox yet, we won't be able to poll for
86+
# the max number of background commands so we return a dummy semaphore.
87+
if self.busybox is None:
88+
return asyncio.BoundedSemaphore(1)
89+
else:
90+
return self._get_bg_async_sem()
91+
92+
# Memoization ensures we will get the same semaphore back all the time.
93+
@memoized
94+
def _get_bg_async_sem(self):
95+
# Ensure we have a running asyncio loop, otherwise the semaphore will
96+
# be useless
97+
asyncio.get_running_loop()
98+
return asyncio.BoundedSemaphore(self._max_async_bg)
99+
100+
@property
101+
@memoized
102+
def _max_bg(self):
103+
"""
104+
Find the maximum number of background command that can be spawned at
105+
once on this connection.
106+
107+
.. note:: This is done as a cached property so that it will only be
108+
done on first use, leaving the opportunity to the caller to check
109+
that the connection is in working order before we get here.
110+
"""
111+
# We need busybox in order to kill the commands we spawn
112+
assert self.busybox is not None
113+
114+
kill_list = []
115+
116+
# Keep a command alive for the whole duration of the probing so that we
117+
# don't need to spawn a new command to kill the sleeps. Otherwise, we
118+
# would end up not being able to since we reach the server's limit
119+
killer_bg = self.background(
120+
'{} xargs kill -9'.format(self.busybox),
121+
stdout=subprocess.PIPE,
122+
stderr=subprocess.PIPE,
123+
as_root=False,
124+
)
125+
126+
try:
127+
with killer_bg:
128+
for i in range(self._MAX_BACKGROUND_CMD - 1):
129+
try:
130+
# Do not pollute the log with misleading connection
131+
# failures.
132+
logging.disable()
133+
bg = self.background(
134+
# Sleep for a year
135+
'{} sleep 31536000'.format(self.busybox),
136+
stdout=subprocess.PIPE,
137+
stderr=subprocess.PIPE,
138+
as_root=False,
139+
)
140+
# We reached the limit of commands to spawn
141+
except Exception:
142+
break
143+
else:
144+
kill_list.append(bg)
145+
finally:
146+
logging.disable(logging.NOTSET)
147+
148+
to_kill = ' '.join(
149+
# Prefix with "-" to kill the process group
150+
'-{}'.format(bg.pid)
151+
for bg in kill_list
152+
)
153+
killer_bg.communicate(input=to_kill.encode())
154+
155+
finally:
156+
for bg in kill_list:
157+
bg.__exit__(None, None, None)
158+
159+
# avoid off-by-one since range() starts from 0
160+
max_ = i + 1
161+
162+
# Add the killer_bg command
163+
max_ = i + 1
164+
165+
# Ensure we can always do side things requiring a channel on some
166+
# connections like connecting to SFTP or running synchronously a
167+
# command.
168+
max_ -= 1
169+
return max_
170+
171+
@property
172+
@memoized
173+
def _max_user_bg(self):
174+
# Allocate half of the available background commands to the user, half
175+
# for the exclusive use of _execute_async(). Partitioning is necessary
176+
# to avoid deadlocks where the user would spawn a number of
177+
# BackgroundCommand manually and then try to use an execute() which
178+
# would require another command as well.
179+
return int(self._max_bg / 2)
180+
181+
@property
182+
@memoized
183+
def _max_async_bg(self):
184+
return self._max_bg - self._max_user_bg
185+
75186
@abstractmethod
76187
def _close(self):
77188
"""

devlib/exception.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,13 @@ class KernelConfigKeyError(KeyError, IndexError, DevlibError):
164164
pass
165165

166166

167+
class TooManyBackgroundCommandsError(DevlibError):
168+
"""
169+
Exception raised when too many background commands are started at once.
170+
"""
171+
pass
172+
173+
167174
def get_traceback(exc=None):
168175
"""
169176
Returns the string with the traceback for the specifiec exc

devlib/target.py

Lines changed: 86 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151
from devlib.exception import (DevlibTransientError, TargetStableError,
5252
TargetNotRespondingError, TimeoutError,
5353
TargetTransientError, KernelConfigKeyError,
54-
TargetError, HostError, TargetCalledProcessError) # pylint: disable=redefined-builtin
54+
TargetError, HostError, TargetCalledProcessError,
55+
TooManyBackgroundCommandsError) # pylint: disable=redefined-builtin
5556
from devlib.utils.ssh import SshConnection
5657
from devlib.utils.android import AdbConnection, AndroidProperties, LogcatMonitor, adb_command, adb_disconnect, INTENT_FLAGS
5758
from devlib.utils.misc import memoized, isiterable, convert_new_lines, groupby_value
@@ -370,6 +371,9 @@ def connect(self, timeout=None, check_boot_completed=True):
370371
self.execute('mkdir -p {}'.format(quote(self.executables_directory)))
371372
self.busybox = self.install(os.path.join(PACKAGE_BIN_DIRECTORY, self.abi, 'busybox'), timeout=30)
372373
self.conn.busybox = self.busybox
374+
# Hit the cached property early but after having checked the connection
375+
# works, and after having set self.busybox
376+
self.conn._max_bg
373377
self.platform.update_from_target(self)
374378
self._update_modules('connected')
375379
if self.platform.big_core and self.load_default_modules:
@@ -755,53 +759,70 @@ def _prepare_cmd(self, command, force_locale):
755759
@call_conn
756760
@asyn.asyncf
757761
async def _execute_async(self, command, timeout=None, as_root=False, strip_colors=True, will_succeed=False, check_exit_code=True, force_locale='C'):
758-
bg = self.background(
759-
command=command,
760-
as_root=as_root,
761-
force_locale=force_locale,
762-
)
763-
764-
def process(streams):
765-
# Make sure we don't accidentally end up with "\n" if both streams
766-
# are empty
767-
res = b'\n'.join(x for x in streams if x).decode()
768-
if strip_colors:
769-
res = strip_bash_colors(res)
770-
return res
771-
772-
def thread_f():
773-
streams = (None, None)
774-
excep = None
775-
try:
776-
with bg as _bg:
777-
streams = _bg.communicate(timeout=timeout)
778-
except BaseException as e:
779-
excep = e
780-
781-
if isinstance(excep, subprocess.CalledProcessError):
782-
if check_exit_code:
783-
excep = TargetStableError(excep)
784-
else:
785-
streams = (excep.output, excep.stderr)
786-
excep = None
762+
sem = self.conn._bg_async_sem
763+
# If there is no BackgroundCommand slot available, fall back on the
764+
# blocking path. This ensures complete separation between the internal
765+
# use of background() to provide the async API and the external uses of
766+
# background()
767+
if sem.locked():
768+
return self._execute(
769+
command,
770+
timeout=timeout,
771+
as_root=as_root,
772+
strip_colors=strip_colors,
773+
will_succeed=will_succeed,
774+
check_exit_code=check_exit_code,
775+
force_locale='C'
776+
)
777+
else:
778+
async with sem:
779+
bg = self.background(
780+
command=command,
781+
as_root=as_root,
782+
force_locale=force_locale,
783+
)
787784

788-
if will_succeed and isinstance(excep, TargetStableError):
789-
excep = TargetTransientError(excep)
785+
def process(streams):
786+
# Make sure we don't accidentally end up with "\n" if both streams
787+
# are empty
788+
res = b'\n'.join(x for x in streams if x).decode()
789+
if strip_colors:
790+
res = strip_bash_colors(res)
791+
return res
790792

791-
if excep is None:
792-
res = process(streams)
793-
loop.call_soon_threadsafe(future.set_result, res)
794-
else:
795-
loop.call_soon_threadsafe(future.set_exception, excep)
793+
def thread_f(loop, future):
794+
streams = (None, None)
795+
excep = None
796+
try:
797+
with bg as _bg:
798+
streams = _bg.communicate(timeout=timeout)
799+
except BaseException as e:
800+
excep = e
801+
802+
if isinstance(excep, subprocess.CalledProcessError):
803+
if check_exit_code:
804+
excep = TargetStableError(excep)
805+
else:
806+
streams = (excep.output, excep.stderr)
807+
excep = None
808+
809+
if will_succeed and isinstance(excep, TargetStableError):
810+
excep = TargetTransientError(excep)
811+
812+
if excep is None:
813+
res = process(streams)
814+
loop.call_soon_threadsafe(future.set_result, res)
815+
else:
816+
loop.call_soon_threadsafe(future.set_exception, excep)
796817

797-
loop = asyncio.get_running_loop()
798-
future = asyncio.Future()
799-
thread = threading.Thread(
800-
target=thread_f,
801-
daemon=True,
802-
)
803-
thread.start()
804-
return await future
818+
future = asyncio.Future()
819+
thread = threading.Thread(
820+
target=thread_f,
821+
args=(asyncio.get_running_loop(), future),
822+
daemon=True,
823+
)
824+
thread.start()
825+
return await future
805826

806827
@call_conn
807828
def _execute(self, command, timeout=None, check_exit_code=True,
@@ -821,13 +842,26 @@ def _execute(self, command, timeout=None, check_exit_code=True,
821842
@call_conn
822843
def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False,
823844
force_locale='C', timeout=None):
824-
command = self._prepare_cmd(command, force_locale)
825-
bg_cmd = self.conn.background(command, stdout, stderr, as_root)
826-
if timeout is not None:
827-
timer = threading.Timer(timeout, function=bg_cmd.cancel)
828-
timer.daemon = True
829-
timer.start()
830-
return bg_cmd
845+
conn = self.conn
846+
# Make sure only one thread tries to spawn a background command at the
847+
# same time, so the count of _current_bg_cmds is accurate.
848+
with conn._bg_spawn_lock:
849+
alive = list(conn._current_bg_cmds)
850+
alive = [bg for bg in alive if bg.poll() is None]
851+
# Since the async path self-regulates using a
852+
# asyncio.BoundedSemaphore(), going over the combined max means the
853+
# culprit is the user spawning too many BackgroundCommand.
854+
if len(alive) >= conn._max_bg:
855+
raise TooManyBackgroundCommandsError(
856+
'{} sessions allowed for one connection on this server. Modify MaxSessions parameter for OpenSSH to allow more.'.format(conn._max_bg))
857+
858+
command = self._prepare_cmd(command, force_locale)
859+
bg_cmd = self.conn.background(command, stdout, stderr, as_root)
860+
if timeout is not None:
861+
timer = threading.Timer(timeout, function=bg_cmd.cancel)
862+
timer.daemon = True
863+
timer.start()
864+
return bg_cmd
831865

832866
def invoke(self, binary, args=None, in_directory=None, on_cpus=None,
833867
redirect_stderr=False, as_root=False, timeout=30):

0 commit comments

Comments
 (0)