Skip to content

Commit a8429fa

Browse files
connections: Unify BackgroundCommand API and use paramiko for SSH
* Unify the behavior of background commands in connections.BackgroundCommand(). This implements a subset of subprocess.Popen class, with a unified behavior across all connection types * Implement the SSH connection using paramiko rather than pxssh.
1 parent c95b00a commit a8429fa

File tree

5 files changed

+918
-65
lines changed

5 files changed

+918
-65
lines changed

devlib/connections.py

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
# Copyright 2019 ARM Limited
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
import os
17+
import time
18+
import subprocess
19+
import signal
20+
import threading
21+
from weakref import WeakSet
22+
from abc import ABC, abstractmethod
23+
24+
_KILL_TIMEOUT = 3
25+
26+
27+
def _kill_pgid_cmd(pgid, sig):
28+
return 'kill -{} -{}'.format(sig.name, pgid)
29+
30+
31+
class ConnectionBase:
32+
"""
33+
Base class for all connections.
34+
"""
35+
def __init__(self):
36+
self._current_bg_cmds = WeakSet()
37+
self._closed = False
38+
self._close_lock = threading.Lock()
39+
40+
def cancel_running_command(self):
41+
bg_cmds = set(self._current_bg_cmds)
42+
for bg_cmd in bg_cmds:
43+
bg_cmd.cancel()
44+
45+
@abstractmethod
46+
def _close(self):
47+
"""
48+
Close the connection.
49+
50+
The public :meth:`close` method makes sure that :meth:`_close` will
51+
only be called once, and will serialize accesses to it if it happens to
52+
be called from multiple threads at once.
53+
"""
54+
55+
def close(self):
56+
# Locking the closing allows any thread to safely call close() as long
57+
# as the connection can be closed from a thread that is not the one it
58+
# started its life in.
59+
with self._close_lock:
60+
if not self._closed:
61+
self._close()
62+
self._closed = True
63+
64+
# Ideally, that should not be relied upon but that will improve the chances
65+
# of the connection being properly cleaned up when it's not in use anymore.
66+
def __del__(self):
67+
self.close()
68+
69+
70+
class BackgroundCommand(ABC):
71+
"""
72+
Allows managing a running background command using a subset of the
73+
:class:`subprocess.Popen` API.
74+
75+
Instances of this class can be used as context managers, with the same
76+
semantic as :class:`subprocess.Popen`.
77+
"""
78+
@abstractmethod
79+
def send_signal(self, sig):
80+
"""
81+
Send a POSIX signal to the background command's process group ID
82+
(PGID).
83+
84+
:param signal: Signal to send.
85+
:type signal: signal.Signals
86+
"""
87+
88+
def kill(self):
89+
"""
90+
Send SIGKILL to the background command.
91+
"""
92+
self.send_signal(signal.SIGKILL)
93+
94+
@abstractmethod
95+
def cancel(self, kill_timeout=_KILL_TIMEOUT):
96+
"""
97+
Try to gracefully terminate the process by sending ``SIGTERM``, then
98+
waiting for ``kill_timeout`` to send ``SIGKILL``.
99+
"""
100+
101+
@abstractmethod
102+
def wait(self):
103+
"""
104+
Block until the background command completes, and return its exit code.
105+
"""
106+
107+
@abstractmethod
108+
def poll(self):
109+
"""
110+
Return exit code if the command has exited, None otherwise.
111+
"""
112+
113+
@property
114+
@abstractmethod
115+
def stdin(self):
116+
"""
117+
File-like object connected to the background's command stdin.
118+
"""
119+
120+
@property
121+
@abstractmethod
122+
def stdout(self):
123+
"""
124+
File-like object connected to the background's command stdout.
125+
"""
126+
127+
@property
128+
@abstractmethod
129+
def stderr(self):
130+
"""
131+
File-like object connected to the background's command stderr.
132+
"""
133+
134+
@abstractmethod
135+
def close(self):
136+
"""
137+
Close all opened streams and then wait for command completion.
138+
139+
:returns: Exit code of the command.
140+
141+
.. note:: If the command is writing to its stdout/stderr, it might be
142+
blocked on that and die when the streams are closed.
143+
"""
144+
145+
def __enter__(self):
146+
return self
147+
148+
def __exit__(self, *args, **kwargs):
149+
self.close()
150+
151+
152+
class PopenBackgroundCommand(BackgroundCommand):
153+
"""
154+
:class:`subprocess.Popen`-based background command.
155+
"""
156+
157+
def __init__(self, popen):
158+
self.popen = popen
159+
160+
def send_signal(self, sig):
161+
return os.killpg(self.popen.pid, sig)
162+
163+
@property
164+
def stdin(self):
165+
return self.popen.stdin
166+
167+
@property
168+
def stdout(self):
169+
return self.popen.stdout
170+
171+
@property
172+
def stderr(self):
173+
return self.popen.stderr
174+
175+
def wait(self):
176+
return self.popen.wait()
177+
178+
def poll(self):
179+
return self.popen.poll()
180+
181+
def cancel(self, kill_timeout=_KILL_TIMEOUT):
182+
popen = self.popen
183+
popen.send_signal(signal.SIGTERM)
184+
try:
185+
popen.wait(timeout=_KILL_TIMEOUT)
186+
except subprocess.TimeoutExpired:
187+
popen.kill()
188+
189+
def close(self):
190+
self.popen.__exit__(None, None, None)
191+
return self.popen.returncode
192+
193+
def __enter__(self):
194+
self.popen.__enter__()
195+
return self
196+
197+
def __exit__(self, *args, **kwargs):
198+
self.popen.__exit__(*args, **kwargs)
199+
200+
class ParamikoBackgroundCommand(BackgroundCommand):
201+
"""
202+
:mod:`paramiko`-based background command.
203+
"""
204+
def __init__(self, conn, chan, pid, as_root, stdin, stdout, stderr, redirect_thread):
205+
self.chan = chan
206+
self.as_root = as_root
207+
self.conn = conn
208+
self.pid = pid
209+
self._stdin = stdin
210+
self._stdout = stdout
211+
self._stderr = stderr
212+
self.redirect_thread = redirect_thread
213+
214+
def send_signal(self, sig):
215+
# If the command has already completed, we don't want to send a signal
216+
# to another process that might have gotten that PID in the meantime.
217+
if self.poll() is not None:
218+
return
219+
# Use -PGID to target a process group rather than just the process
220+
# itself
221+
cmd = _kill_pgid_cmd(self.pid, sig)
222+
self.conn.execute(cmd, as_root=self.as_root)
223+
224+
def wait(self):
225+
return self.chan.recv_exit_status()
226+
227+
def poll(self):
228+
if self.chan.exit_status_ready():
229+
return self.wait()
230+
else:
231+
return None
232+
233+
def cancel(self, kill_timeout=_KILL_TIMEOUT):
234+
self.send_signal(signal.SIGTERM)
235+
# Check if the command terminated quickly
236+
time.sleep(10e-3)
237+
# Otherwise wait for the full timeout and kill it
238+
if self.poll() is None:
239+
time.sleep(kill_timeout)
240+
self.send_signal(signal.SIGKILL)
241+
self.wait()
242+
243+
@property
244+
def stdin(self):
245+
return self._stdin
246+
247+
@property
248+
def stdout(self):
249+
return self._stdout
250+
251+
@property
252+
def stderr(self):
253+
return self._stderr
254+
255+
def close(self):
256+
for x in (self.stdin, self.stdout, self.stderr):
257+
if x is not None:
258+
x.close()
259+
260+
exit_code = self.wait()
261+
thread = self.redirect_thread
262+
if thread:
263+
thread.join()
264+
265+
return exit_code
266+
267+
268+
class AdbBackgroundCommand(BackgroundCommand):
269+
"""
270+
``adb``-based background command.
271+
"""
272+
273+
def __init__(self, conn, adb_popen, pid, as_root):
274+
self.conn = conn
275+
self.as_root = as_root
276+
self.adb_popen = adb_popen
277+
self.pid = pid
278+
279+
def send_signal(self, sig):
280+
self.conn.execute(
281+
_kill_pgid_cmd(self.pid, sig),
282+
as_root=self.as_root,
283+
)
284+
285+
@property
286+
def stdin(self):
287+
return self.adb_popen.stdin
288+
289+
@property
290+
def stdout(self):
291+
return self.adb_popen.stdout
292+
293+
@property
294+
def stderr(self):
295+
return self.adb_popen.stderr
296+
297+
def wait(self):
298+
return self.adb_popen.wait()
299+
300+
def poll(self):
301+
return self.adb_popen.poll()
302+
303+
def cancel(self, kill_timeout=_KILL_TIMEOUT):
304+
self.send_signal(signal.SIGTERM)
305+
try:
306+
self.adb_popen.wait(timeout=_KILL_TIMEOUT)
307+
except subprocess.TimeoutExpired:
308+
self.send_signal(signal.SIGKILL)
309+
self.adb_popen.kill()
310+
311+
def close(self):
312+
self.adb_popen.__exit__(None, None, None)
313+
return self.adb_popen.returncode
314+
315+
def __enter__(self):
316+
self.adb_popen.__enter__()
317+
return self
318+
319+
def __exit__(self, *args, **kwargs):
320+
self.adb_popen.__exit__(*args, **kwargs)

devlib/host.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from devlib.exception import TargetTransientError, TargetStableError
2626
from devlib.utils.misc import check_output
27+
from devlib.connections import ConnectionBase, PopenBackgroundCommand
2728

2829

2930
PACKAGE_BIN_DIRECTORY = os.path.join(os.path.dirname(__file__), 'bin')
@@ -37,7 +38,7 @@ def kill_children(pid, signal=signal.SIGKILL):
3738
os.kill(cpid, signal)
3839

3940

40-
class LocalConnection(object):
41+
class LocalConnection(ConnectionBase):
4142

4243
name = 'local'
4344
host = 'localhost'
@@ -56,6 +57,7 @@ def connected_as_root(self, state):
5657
# pylint: disable=unused-argument
5758
def __init__(self, platform=None, keep_password=True, unrooted=False,
5859
password=None, timeout=None):
60+
super().__init__()
5961
self._connected_as_root = None
6062
self.logger = logging.getLogger('local_connection')
6163
self.keep_password = keep_password
@@ -105,9 +107,24 @@ def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as
105107
raise TargetStableError('unrooted')
106108
password = self._get_password()
107109
command = 'echo {} | sudo -S '.format(quote(password)) + command
108-
return subprocess.Popen(command, stdout=stdout, stderr=stderr, shell=True)
109110

110-
def close(self):
111+
# Make sure to get a new PGID so PopenBackgroundCommand() can kill
112+
# all sub processes that could be started without troubles.
113+
def preexec_fn():
114+
os.setpgrp()
115+
116+
popen = subprocess.Popen(
117+
command,
118+
stdout=stdout,
119+
stderr=stderr,
120+
shell=True,
121+
preexec_fn=preexec_fn,
122+
)
123+
bg_cmd = PopenBackgroundCommand(popen)
124+
self._current_bg_cmds.add(bg_cmd)
125+
return bg_cmd
126+
127+
def _close(self):
111128
pass
112129

113130
def cancel_running_command(self):

0 commit comments

Comments
 (0)