Skip to content

Commit 14e0235

Browse files
connection: Rework TransferManager
* Split TransferManager and TransferHandle: * TransferManager deals with the generic monitoring. To abort a transfer, it simply cancels the transfer and raises an exception from manage(). * TransferHandle provides a way for the manager to query the state of the transfer and cancel it. It is backend-specific. * Remove most of the state in TransferManager, along with the associated background command leak etc * Use a daemonic monitor thread to behave as excpected on interpreter shutdown. * Ensure a transfer manager _always_ exists. When no management is desired, a noop object is used. This avoids using a None sentinel, which is invariably mishandled by some code leading to crashes. * Try to merge more paths in the code to uncover as many issues as possible in testing. * Fix percentage for SSHTransferHandle (transferred / (remaining + transferred) instead of transferred / remaining)
1 parent 3fd999d commit 14e0235

File tree

3 files changed

+205
-177
lines changed

3 files changed

+205
-177
lines changed

devlib/connection.py

Lines changed: 124 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
#
1515

1616
from abc import ABC, abstractmethod
17-
from contextlib import contextmanager
17+
from contextlib import contextmanager, nullcontext
1818
from datetime import datetime
1919
from functools import partial
2020
from weakref import WeakSet
2121
from shlex import quote
22-
from time import monotonic
2322
import os
2423
import signal
2524
import socket
@@ -61,12 +60,26 @@ class ConnectionBase(InitCheckpoint):
6160
"""
6261
Base class for all connections.
6362
"""
64-
def __init__(self):
63+
def __init__(
64+
self,
65+
poll_transfers=False,
66+
start_transfer_poll_delay=30,
67+
total_transfer_timeout=3600,
68+
transfer_poll_period=30,
69+
):
6570
self._current_bg_cmds = WeakSet()
6671
self._closed = False
6772
self._close_lock = threading.Lock()
6873
self.busybox = None
6974

75+
self.transfer_mgr = TransferManager(
76+
self,
77+
start_transfer_poll_delay=start_transfer_poll_delay,
78+
total_transfer_timeout=total_transfer_timeout,
79+
transfer_poll_period=transfer_poll_period,
80+
) if poll_transfers else NoopTransferManager()
81+
82+
7083
def cancel_running_command(self):
7184
bg_cmds = set(self._current_bg_cmds)
7285
for bg_cmd in bg_cmds:
@@ -500,161 +513,147 @@ def __exit__(self, *args, **kwargs):
500513
self.adb_popen.__exit__(*args, **kwargs)
501514

502515

503-
class TransferManagerBase(ABC):
504-
505-
def _pull_dest_size(self, dest):
506-
if os.path.isdir(dest):
507-
return sum(
508-
os.stat(os.path.join(dirpath, f)).st_size
509-
for dirpath, _, fnames in os.walk(dest)
510-
for f in fnames
511-
)
512-
else:
513-
return os.stat(dest).st_size
514-
515-
def _push_dest_size(self, dest):
516-
cmd = '{} du -s {}'.format(quote(self.conn.busybox), quote(dest))
517-
out = self.conn.execute(cmd)
518-
try:
519-
return int(out.split()[0])
520-
except ValueError:
521-
return 0
522-
523-
def __init__(self, conn, poll_period, start_transfer_poll_delay, total_timeout):
516+
class TransferManager:
517+
def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600):
524518
self.conn = conn
525519
self.poll_period = poll_period
526520
self.total_timeout = total_timeout
527521
self.start_transfer_poll_delay = start_transfer_poll_delay
528522

529523
self.logger = logging.getLogger('FileTransfer')
530-
self.managing = threading.Event()
531-
self.transfer_started = threading.Event()
532-
self.transfer_completed = threading.Event()
533-
self.transfer_aborted = threading.Event()
534524

535-
self.monitor_thread = None
536-
self.sources = None
537-
self.dest = None
538-
self.direction = None
525+
@contextmanager
526+
def manage(self, sources, dest, direction, handle):
527+
excep = None
528+
stop_thread = threading.Event()
529+
530+
def monitor():
531+
nonlocal excep
532+
533+
def cancel(reason):
534+
self.logger.warning(
535+
f'Cancelling file transfer {sources} -> {dest} due to: {reason}'
536+
)
537+
handle.cancel()
538+
539+
start_t = time.monotonic()
540+
stop_thread.wait(self.start_transfer_poll_delay)
541+
while not stop_thread.wait(self.poll_period):
542+
if not handle.isactive():
543+
cancel(reason='transfer inactive')
544+
elif monotonic() - start_t > self.total_timeout:
545+
cancel(reason='transfer timed out')
546+
excep = TimeoutError(f'{direction}: {sources} -> {dest}')
547+
548+
m_thread = threading.Thread(target=monitor, daemon=True)
549+
try:
550+
m_thread.start()
551+
yield self
552+
finally:
553+
stop_thread.set()
554+
m_thread.join()
555+
if excep is not None:
556+
raise excep
539557

540-
@abstractmethod
541-
def _cancel(self):
542-
pass
543558

544-
def cancel(self, reason=None):
545-
msg = 'Cancelling file transfer {} -> {}'.format(self.sources, self.dest)
546-
if reason is not None:
547-
msg += ' due to \'{}\''.format(reason)
548-
self.logger.warning(msg)
549-
self.transfer_aborted.set()
550-
self._cancel()
559+
class NoopTransferManager:
560+
def manage(self, *args, **kwargs):
561+
return nullcontext(self)
562+
563+
564+
class TransferHandleBase(ABC):
565+
def __init__(self, mgr):
566+
self.mgr = mgr
551567

552568
@abstractmethod
553569
def isactive(self):
554570
pass
555571

556-
@contextmanager
557-
def manage(self, sources, dest, direction):
558-
try:
559-
self.sources, self.dest, self.direction = sources, dest, direction
560-
m_thread = threading.Thread(target=self._monitor)
572+
@abstractmethod
573+
def cancel(self):
574+
pass
561575

562-
self.transfer_completed.clear()
563-
self.transfer_aborted.clear()
564-
self.transfer_started.set()
565576

566-
m_thread.start()
567-
yield self
568-
except BaseException:
569-
self.cancel(reason='exception during transfer')
570-
raise
571-
finally:
572-
self.transfer_completed.set()
573-
self.transfer_started.set()
574-
m_thread.join()
575-
self.transfer_started.clear()
576-
self.transfer_completed.clear()
577-
self.transfer_aborted.clear()
577+
class PopenTransferHandle(TransferHandleBase):
578+
def __init__(self, bg_cmd, dest, direction, *args, **kwargs):
579+
super().__init__(*args, **kwargs)
578580

579-
def _monitor(self):
580-
start_t = monotonic()
581-
self.transfer_completed.wait(self.start_transfer_poll_delay)
582-
while not self.transfer_completed.wait(self.poll_period):
583-
if not self.isactive():
584-
self.cancel(reason='transfer inactive')
585-
elif monotonic() - start_t > self.total_timeout:
586-
self.cancel(reason='transfer timed out')
581+
if direction == 'push':
582+
sample_size = self._push_dest_size
583+
elif direction == 'pull':
584+
sample_size = self._pull_dest_size
585+
else:
586+
raise ValueError(f'Unknown direction: {direction}')
587587

588+
self.sample_size = lambda: sample_size(dest)
588589

589-
class PopenTransferManager(TransferManagerBase):
590+
self.bg_cmd = bg_cmd
591+
self.last_sample = 0
590592

591-
def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600):
592-
super().__init__(conn, poll_period, start_transfer_poll_delay, total_timeout)
593-
self.transfer = None
594-
self.last_sample = None
593+
@staticmethod
594+
def _pull_dest_size(dest):
595+
if os.path.isdir(dest):
596+
return sum(
597+
os.stat(os.path.join(dirpath, f)).st_size
598+
for dirpath, _, fnames in os.walk(dest)
599+
for f in fnames
600+
)
601+
else:
602+
return os.stat(dest).st_size
595603

596-
def _cancel(self):
597-
if self.transfer:
598-
self.transfer.cancel()
599-
self.transfer = None
600-
self.last_sample = None
604+
def _push_dest_size(self, dest):
605+
conn = self.mgr.conn
606+
cmd = '{} du -s -- {}'.format(quote(conn.busybox), quote(dest))
607+
out = conn.execute(cmd)
608+
return int(out.split()[0])
609+
610+
def cancel(self):
611+
self.bg_cmd.cancel()
601612

602613
def isactive(self):
603-
size_fn = self._push_dest_size if self.direction == 'push' else self._pull_dest_size
604-
curr_size = size_fn(self.dest)
605-
self.logger.debug('Polled file transfer, destination size {}'.format(curr_size))
606-
active = True if self.last_sample is None else curr_size > self.last_sample
607-
self.last_sample = curr_size
608-
return active
614+
try:
615+
curr_size = self.sample_size()
616+
except Exception as e:
617+
self.logger.debug(f'File size polling failed: {e}')
618+
return True
619+
else:
620+
self.logger.debug(f'Polled file transfer, destination size: {curr_size}')
621+
if curr_size:
622+
active = curr_size > self.last_sample
623+
self.last_sample = curr_size
624+
return active
625+
# If the file is empty it will never grow in size, so we assume
626+
# everything is going well.
627+
else:
628+
return True
609629

610-
def set_transfer_and_wait(self, popen_bg_cmd):
611-
self.transfer = popen_bg_cmd
612-
self.last_sample = None
613-
ret = self.transfer.wait()
614630

615-
if ret and not self.transfer_aborted.is_set():
616-
raise subprocess.CalledProcessError(ret, self.transfer.popen.args)
617-
elif self.transfer_aborted.is_set():
618-
raise TimeoutError(self.transfer.popen.args)
631+
class SSHTransferHandle(TransferHandleBase):
619632

633+
def __init__(self, handle, *args, **kwargs):
634+
super().__init__(*args, **kwargs)
620635

621-
class SSHTransferManager(TransferManagerBase):
636+
# SFTPClient or SSHClient
637+
self.handle = handle
622638

623-
def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600):
624-
super().__init__(conn, poll_period, start_transfer_poll_delay, total_timeout)
625-
self.transferer = None
626639
self.progressed = False
627-
self.transferred = None
628-
self.to_transfer = None
640+
self.transferred = 0
641+
self.to_transfer = 0
629642

630-
def _cancel(self):
631-
self.transferer.close()
643+
def cancel(self):
644+
self.handle.close()
632645

633646
def isactive(self):
634647
progressed = self.progressed
635-
self.progressed = False
636-
msg = 'Polled transfer: {}% [{}B/{}B]'
637-
pc = format((self.transferred / self.to_transfer) * 100, '.2f')
638-
self.logger.debug(msg.format(pc, self.transferred, self.to_transfer))
648+
if progressed:
649+
self.progressed = False
650+
pc = (self.transferred / (self.transferred + self.to_transfer)) * 100
651+
self.logger.debug(
652+
f'Polled transfer: {pc:.2f}% [{self.transferred}B/{self.to_transfer}B]'
653+
)
639654
return progressed
640655

641-
@contextmanager
642-
def manage(self, sources, dest, direction, transferer):
643-
with super().manage(sources, dest, direction):
644-
try:
645-
self.progressed = False
646-
self.transferer = transferer # SFTPClient or SCPClient
647-
yield self
648-
except socket.error as e:
649-
if self.transfer_aborted.is_set():
650-
self.transfer_aborted.clear()
651-
method = 'SCP' if self.conn.use_scp else 'SFTP'
652-
raise TimeoutError('{} {}: {} -> {}'.format(method, self.direction, sources, self.dest))
653-
else:
654-
raise e
655-
656656
def progress_cb(self, to_transfer, transferred):
657-
if self.transfer_started.is_set():
658-
self.progressed = True
659-
self.transferred = transferred
660-
self.to_transfer = to_transfer
657+
self.progressed = True
658+
self.transferred = transferred
659+
self.to_transfer = to_transfer

devlib/utils/android.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
from devlib.exception import TargetTransientError, TargetStableError, HostError, TargetTransientCalledProcessError, TargetStableCalledProcessError, AdbRootError
4141
from devlib.utils.misc import check_output, which, ABI_MAP, redirect_streams, get_subprocess
42-
from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenBackgroundCommand, PopenTransferManager
42+
from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenBackgroundCommand, PopenTransferManager, NoopTransferManager
4343

4444

4545
logger = logging.getLogger('android')
@@ -278,26 +278,32 @@ def connected_as_root(self, state):
278278
self._connected_as_root[self.device] = state
279279

280280
# pylint: disable=unused-argument
281-
def __init__(self, device=None, timeout=None, platform=None, adb_server=None,
282-
adb_as_root=False, connection_attempts=MAX_ATTEMPTS,
283-
poll_transfers=False,
284-
start_transfer_poll_delay=30,
285-
total_transfer_timeout=3600,
286-
transfer_poll_period=30,):
287-
super().__init__()
281+
def __init__(
282+
self,
283+
device=None,
284+
timeout=None,
285+
platform=None,
286+
adb_server=None,
287+
adb_as_root=False,
288+
connection_attempts=MAX_ATTEMPTS,
289+
290+
poll_transfers=False,
291+
start_transfer_poll_delay=30,
292+
total_transfer_timeout=3600,
293+
transfer_poll_period=30,
294+
):
295+
super().__init__(
296+
poll_transfers=poll_transfers,
297+
start_transfer_poll_delay=start_transfer_poll_delay,
298+
total_transfer_timeout=total_transfer_timeout,
299+
transfer_poll_period=transfer_poll_period,
300+
)
288301
self.timeout = timeout if timeout is not None else self.default_timeout
289302
if device is None:
290303
device = adb_get_device(timeout=timeout, adb_server=adb_server)
291304
self.device = device
292305
self.adb_server = adb_server
293306
self.adb_as_root = adb_as_root
294-
self.poll_transfers = poll_transfers
295-
if poll_transfers:
296-
transfer_opts = {'start_transfer_poll_delay': start_transfer_poll_delay,
297-
'total_timeout': total_transfer_timeout,
298-
'poll_period': transfer_poll_period,
299-
}
300-
self.transfer_mgr = PopenTransferManager(self, **transfer_opts) if poll_transfers else None
301307
lock, nr_active = AdbConnection.active_connections
302308
with lock:
303309
nr_active[self.device] += 1
@@ -330,12 +336,18 @@ def _push_pull(self, action, sources, dest, timeout):
330336
paths = ' '.join(map(do_quote, paths))
331337

332338
command = "{} {}".format(action, paths)
333-
if timeout or not self.poll_transfers:
339+
if timeout:
334340
adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server)
335341
else:
336-
with self.transfer_mgr.manage(sources, dest, action):
337-
bg_cmd = adb_command_background(self.device, command, adb_server=self.adb_server)
338-
self.transfer_mgr.set_transfer_and_wait(bg_cmd)
342+
bg_cmd = adb_command_background(self.device, command, adb_server=self.adb_server)
343+
handle = PopenTransferHandle(
344+
mgr=self.transfer_mgr,
345+
bg_cmd=bg_cmd,
346+
dest=dest,
347+
direction=action
348+
)
349+
with bg_cmd, self.transfer_mgr.manage(sources, dest, action, handle):
350+
bg_cmd.communicate()
339351

340352
# pylint: disable=unused-argument
341353
def execute(self, command, timeout=None, check_exit_code=False,

0 commit comments

Comments
 (0)