diff --git a/src/devices/src/virtio/net/device.rs b/src/devices/src/virtio/net/device.rs index 3f1095217c3..0c209eaf2f5 100644 --- a/src/devices/src/virtio/net/device.rs +++ b/src/devices/src/virtio/net/device.rs @@ -625,7 +625,12 @@ impl Net { DeviceState::Inactive => unreachable!(), }; METRICS.net.rx_tap_event_count.inc(); - if self.queues[RX_INDEX].is_empty(mem) { + + // While there are no available RX queue buffers and there's a deferred_frame + // don't process any more incoming. Otherwise start processing a frame. In the + // process the deferred_frame flag will be set in order to avoid freezing the + // RX queue. + if self.queues[RX_INDEX].is_empty(mem) && self.rx_deferred_frame { METRICS.net.no_rx_avail_buffer.inc(); return; } @@ -1699,7 +1704,8 @@ pub mod tests { th.activate_net(); th.net().mocks.set_read_tap(ReadTapMock::Failure); - // The RX queue is empty. + // The RX queue is empty and rx_deffered_frame is set. + th.net().rx_deferred_frame = true; check_metric_after_block!( &METRICS.net.no_rx_avail_buffer, 1, diff --git a/tests/framework/microvm.py b/tests/framework/microvm.py index a5a33bb46e3..5d686ef6bd0 100644 --- a/tests/framework/microvm.py +++ b/tests/framework/microvm.py @@ -22,6 +22,7 @@ from retry.api import retry_call import host_tools.logging as log_tools +import host_tools.cpu_load as cpu_tools import host_tools.memory as mem_tools import host_tools.network as net_tools @@ -127,6 +128,10 @@ def __init__( else: self._memory_events_queue = None + # Cpu load monitoring has to be explicitly enabled using + # the `enable_cpu_load_monitor` method. + self._cpu_load_monitor = None + # External clone/exec tool, because Python can't into clone self.bin_cloner_path = bin_cloner_path @@ -149,6 +154,11 @@ def kill(self): raise mem_tools.MemoryUsageExceededException( self._memory_events_queue.get()) + if self._cpu_load_monitor: + self._cpu_load_monitor.signal_stop() + self._cpu_load_monitor.join() + self._cpu_load_monitor.check_samples() + @property def api_session(self): """Return the api session associated with this microVM.""" @@ -272,6 +282,20 @@ def append_to_log_data(self, data): """Append a message to the log data.""" self._log_data += data + def enable_cpu_load_monitor(self, threshold): + """Enable the cpu load monitor.""" + process_pid = self.jailer_clone_pid + # We want to monitor the emulation thread, which is currently + # the first one created. + # A possible improvement is to find it by name. + thread_pid = self.jailer_clone_pid + self._cpu_load_monitor = cpu_tools.CpuLoadMonitor( + process_pid, + thread_pid, + threshold + ) + self._cpu_load_monitor.start() + def create_jailed_resource(self, path, create_jail=False): """Create a hard link to some resource inside this microvm.""" return self.jailer.jailed_path(path, create=True, diff --git a/tests/host_tools/cpu_load.py b/tests/host_tools/cpu_load.py new file mode 100644 index 00000000000..17239cb4ea7 --- /dev/null +++ b/tests/host_tools/cpu_load.py @@ -0,0 +1,123 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Utilities for measuring cpu utilisation for a process.""" +import time +from threading import Thread + +import framework.utils as utils + +# /proc//stat output taken from +# https://www.man7.org/linux/man-pages/man5/proc.5.html +STAT_UTIME_IDX = 13 +STAT_STIME_IDX = 14 +STAT_STARTTIME_IDX = 21 + + +class CpuLoadExceededException(Exception): + """A custom exception containing details on excessive cpu load.""" + + def __init__(self, cpu_load_samples, threshold): + """Compose the error message containing the cpu load details.""" + super(CpuLoadExceededException, self).__init__( + 'Cpu load samples {} exceeded maximum threshold {}.\n' + .format(cpu_load_samples, threshold) + ) + + +class CpuLoadMonitor(Thread): + """Class to represent a cpu load monitor for a thread.""" + + CPU_LOAD_SAMPLES_TIMEOUT_S = 1 + + def __init__( + self, + process_pid, + thread_pid, + threshold + ): + """Set up monitor attributes.""" + Thread.__init__(self) + self._process_pid = process_pid + self._thread_pid = thread_pid + self._cpu_load_samples = [] + self._threshold = threshold + self._should_stop = False + + @property + def process_pid(self): + """Get the process pid.""" + return self._process_pid + + @property + def thread_pid(self): + """Get the thread pid.""" + return self._thread_pid + + @property + def threshold(self): + """Get the cpu load threshold.""" + return self._threshold + + @property + def cpu_load_samples(self): + """Get the cpu load samples.""" + return self._cpu_load_samples + + def signal_stop(self): + """Signal that the thread should stop.""" + self._should_stop = True + + def run(self): + """Thread for monitoring cpu load of some pid. + + `/proc//task//stat` is used to compute + the cpu load, which is then added to the list. + It is up to the caller to check the queue. + """ + clock_ticks_cmd = 'getconf CLK_TCK' + try: + stdout = utils.cmd_run( + clock_ticks_cmd, + ).stdout.decode('utf-8') + except ChildProcessError: + return + try: + clock_ticks = int(stdout.strip("\n")) + except ValueError: + return + + while not self._should_stop: + try: + with open('/proc/uptime') as uptime_file: + uptime = uptime_file.readline().strip("\n").split()[0] + + with open('/proc/{pid}/task/{tid}/stat'.format( + pid=self.process_pid, + tid=self.thread_pid) + ) as stat_file: + stat = stat_file.readline().strip("\n").split() + except IOError: + break + + try: + uptime = float(uptime) + utime = int(stat[STAT_UTIME_IDX]) + stime = int(stat[STAT_STIME_IDX]) + starttime = int(stat[STAT_STARTTIME_IDX]) + except ValueError: + break + + total_time = utime + stime + seconds = uptime - starttime / clock_ticks + cpu_load = (total_time * 100 / clock_ticks) / seconds + + if cpu_load > self.threshold: + self.cpu_load_samples.append(cpu_load) + + time.sleep(self.CPU_LOAD_SAMPLES_TIMEOUT_S) + + def check_samples(self): + """Check that there are no samples above the threshold.""" + if len(self.cpu_load_samples) > 0: + raise CpuLoadExceededException( + self._cpu_load_samples, self._threshold) diff --git a/tests/host_tools/network.py b/tests/host_tools/network.py index cbcabbad8a5..b6841a098b0 100644 --- a/tests/host_tools/network.py +++ b/tests/host_tools/network.py @@ -319,3 +319,13 @@ def name(self): def netns(self): """Return the network namespace of this tap.""" return self._netns + + def set_tx_queue_len(self, tx_queue_len): + """Set the length of the tap's TX queue.""" + utils.run_cmd( + 'ip netns exec {} ip link set {} txqueuelen {}'.format( + self.netns, + self.name, + tx_queue_len + ) + ) diff --git a/tests/integration_tests/functional/test_net.py b/tests/integration_tests/functional/test_net.py new file mode 100644 index 00000000000..d2891d95e30 --- /dev/null +++ b/tests/integration_tests/functional/test_net.py @@ -0,0 +1,52 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for the net device.""" +import time + +import framework.utils as utils +import host_tools.network as net_tools + +# The iperf version to run this tests with +IPERF_BINARY = 'iperf3' + + +def test_high_ingress_traffic(test_microvm_with_ssh, network_config): + """Run iperf rx with high UDP traffic.""" + test_microvm = test_microvm_with_ssh + test_microvm.spawn() + + test_microvm.basic_config() + + # Create tap before configuring interface. + tap, _host_ip, guest_ip = test_microvm.ssh_network_config( + network_config, + '1' + ) + # Set the tap's tx queue len to 5. This increases the probability + # of filling the tap under high ingress traffic. + tap.set_tx_queue_len(5) + + # Start the microvm. + test_microvm.start() + + # Start iperf3 server on the guest. + ssh_connection = net_tools.SSHConnection(test_microvm.ssh_config) + ssh_connection.execute_command('{} -sD\n'.format(IPERF_BINARY)) + time.sleep(1) + + # Start iperf3 client on the host. Send 1Gbps UDP traffic. + # If the net device breaks, iperf will freeze. We have to use a timeout. + utils.run_cmd( + 'timeout 30 {} {} -c {} -u -V -b 1000000000 -t 30'.format( + test_microvm.jailer.netns_cmd_prefix(), + IPERF_BINARY, + guest_ip, + ), + ignore_return_code=True + ) + + # Check if the high ingress traffic broke the net interface. + # If the net interface still works we should be able to execute + # ssh commands. + exit_code, _, _ = ssh_connection.execute_command('echo success\n') + assert exit_code == 0 diff --git a/tests/integration_tests/functional/test_rate_limiter.py b/tests/integration_tests/functional/test_rate_limiter.py index 459ad9ce4a5..9ec0419c7b1 100644 --- a/tests/integration_tests/functional/test_rate_limiter.py +++ b/tests/integration_tests/functional/test_rate_limiter.py @@ -151,6 +151,48 @@ def test_rx_rate_limiting(test_microvm_with_ssh, network_config): _check_rx_rate_limit_patch(test_microvm, guest_ips) +def test_rx_rate_limiting_cpu_load(test_microvm_with_ssh, network_config): + """Run iperf rx with rate limiting; verify cpu load is below threshold.""" + test_microvm = test_microvm_with_ssh + test_microvm.spawn() + + test_microvm.basic_config() + + # Enable monitor that checks if the cpu load is over the threshold. + # After multiple runs, the average value for the cpu load + # seems to be around 10%. Setting the threshold a little + # higher to skip false positives. + threshold = 20 + test_microvm.enable_cpu_load_monitor(threshold) + + # Create interface with aggressive rate limiting enabled. + rx_rate_limiter_no_burst = { + 'bandwidth': { + 'size': 65536, # 64KBytes + 'refill_time': 1000 # 1s + } + } + _tap, _host_ip, guest_ip = test_microvm.ssh_network_config( + network_config, + '1', + rx_rate_limiter=rx_rate_limiter_no_burst + ) + + test_microvm.start() + + # Start iperf server on guest. + _start_iperf_on_guest(test_microvm, guest_ip) + + # Run iperf client sending UDP traffic. + iperf_cmd = '{} {} -u -c {} -b 1000000000 -t{} -f KBytes'.format( + test_microvm.jailer.netns_cmd_prefix(), + IPERF_BINARY, + guest_ip, + IPERF_TRANSMIT_TIME * 5 + ) + _iperf_out = _run_local_iperf(iperf_cmd) + + def _check_tx_rate_limiting(test_microvm, guest_ips, host_ips): """Check that the transmit rate is within expectations.""" # Start iperf on the host as this is the tx rate limiting test.