Skip to content

Fix net RX freeze bug #2064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/devices/src/virtio/net/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,12 @@ impl Net {
DeviceState::Inactive => unreachable!(),
};
METRICS.net.rx_tap_event_count.inc();
if self.queues[RX_INDEX].is_empty(mem) {

// While there are no available RX queue buffers and there's a deferred_frame
// don't process any more incoming. Otherwise start processing a frame. In the
// process the deferred_frame flag will be set in order to avoid freezing the
// RX queue.
if self.queues[RX_INDEX].is_empty(mem) && self.rx_deferred_frame {
METRICS.net.no_rx_avail_buffer.inc();
return;
}
Expand Down Expand Up @@ -1699,7 +1704,8 @@ pub mod tests {
th.activate_net();
th.net().mocks.set_read_tap(ReadTapMock::Failure);

// The RX queue is empty.
// The RX queue is empty and rx_deffered_frame is set.
th.net().rx_deferred_frame = true;
check_metric_after_block!(
&METRICS.net.no_rx_avail_buffer,
1,
Expand Down
24 changes: 24 additions & 0 deletions tests/framework/microvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from retry.api import retry_call

import host_tools.logging as log_tools
import host_tools.cpu_load as cpu_tools
import host_tools.memory as mem_tools
import host_tools.network as net_tools

Expand Down Expand Up @@ -127,6 +128,10 @@ def __init__(
else:
self._memory_events_queue = None

# Cpu load monitoring has to be explicitly enabled using
# the `enable_cpu_load_monitor` method.
self._cpu_load_monitor = None

# External clone/exec tool, because Python can't into clone
self.bin_cloner_path = bin_cloner_path

Expand All @@ -149,6 +154,11 @@ def kill(self):
raise mem_tools.MemoryUsageExceededException(
self._memory_events_queue.get())

if self._cpu_load_monitor:
self._cpu_load_monitor.signal_stop()
self._cpu_load_monitor.join()
self._cpu_load_monitor.check_samples()

@property
def api_session(self):
"""Return the api session associated with this microVM."""
Expand Down Expand Up @@ -272,6 +282,20 @@ def append_to_log_data(self, data):
"""Append a message to the log data."""
self._log_data += data

def enable_cpu_load_monitor(self, threshold):
"""Enable the cpu load monitor."""
process_pid = self.jailer_clone_pid
# We want to monitor the emulation thread, which is currently
# the first one created.
# A possible improvement is to find it by name.
thread_pid = self.jailer_clone_pid
self._cpu_load_monitor = cpu_tools.CpuLoadMonitor(
process_pid,
thread_pid,
threshold
)
self._cpu_load_monitor.start()

def create_jailed_resource(self, path, create_jail=False):
"""Create a hard link to some resource inside this microvm."""
return self.jailer.jailed_path(path, create=True,
Expand Down
123 changes: 123 additions & 0 deletions tests/host_tools/cpu_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Utilities for measuring cpu utilisation for a process."""
import time
from threading import Thread

import framework.utils as utils

# /proc/<pid>/stat output taken from
# https://www.man7.org/linux/man-pages/man5/proc.5.html
STAT_UTIME_IDX = 13
STAT_STIME_IDX = 14
STAT_STARTTIME_IDX = 21


class CpuLoadExceededException(Exception):
"""A custom exception containing details on excessive cpu load."""

def __init__(self, cpu_load_samples, threshold):
"""Compose the error message containing the cpu load details."""
super(CpuLoadExceededException, self).__init__(
'Cpu load samples {} exceeded maximum threshold {}.\n'
.format(cpu_load_samples, threshold)
)


class CpuLoadMonitor(Thread):
"""Class to represent a cpu load monitor for a thread."""

CPU_LOAD_SAMPLES_TIMEOUT_S = 1

def __init__(
self,
process_pid,
thread_pid,
threshold
):
"""Set up monitor attributes."""
Thread.__init__(self)
self._process_pid = process_pid
self._thread_pid = thread_pid
self._cpu_load_samples = []
self._threshold = threshold
self._should_stop = False

@property
def process_pid(self):
"""Get the process pid."""
return self._process_pid

@property
def thread_pid(self):
"""Get the thread pid."""
return self._thread_pid

@property
def threshold(self):
"""Get the cpu load threshold."""
return self._threshold

@property
def cpu_load_samples(self):
"""Get the cpu load samples."""
return self._cpu_load_samples

def signal_stop(self):
"""Signal that the thread should stop."""
self._should_stop = True

def run(self):
"""Thread for monitoring cpu load of some pid.

`/proc/<process pid>/task/<thread pid>/stat` is used to compute
the cpu load, which is then added to the list.
It is up to the caller to check the queue.
"""
clock_ticks_cmd = 'getconf CLK_TCK'
try:
stdout = utils.cmd_run(
clock_ticks_cmd,
).stdout.decode('utf-8')
except ChildProcessError:
return
try:
clock_ticks = int(stdout.strip("\n"))
except ValueError:
return

while not self._should_stop:
try:
with open('/proc/uptime') as uptime_file:
uptime = uptime_file.readline().strip("\n").split()[0]

with open('/proc/{pid}/task/{tid}/stat'.format(
pid=self.process_pid,
tid=self.thread_pid)
) as stat_file:
stat = stat_file.readline().strip("\n").split()
except IOError:
break

try:
uptime = float(uptime)
utime = int(stat[STAT_UTIME_IDX])
stime = int(stat[STAT_STIME_IDX])
starttime = int(stat[STAT_STARTTIME_IDX])
except ValueError:
break

total_time = utime + stime
seconds = uptime - starttime / clock_ticks
cpu_load = (total_time * 100 / clock_ticks) / seconds

if cpu_load > self.threshold:
self.cpu_load_samples.append(cpu_load)

time.sleep(self.CPU_LOAD_SAMPLES_TIMEOUT_S)

def check_samples(self):
"""Check that there are no samples above the threshold."""
if len(self.cpu_load_samples) > 0:
raise CpuLoadExceededException(
self._cpu_load_samples, self._threshold)
10 changes: 10 additions & 0 deletions tests/host_tools/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,13 @@ def name(self):
def netns(self):
"""Return the network namespace of this tap."""
return self._netns

def set_tx_queue_len(self, tx_queue_len):
"""Set the length of the tap's TX queue."""
utils.run_cmd(
'ip netns exec {} ip link set {} txqueuelen {}'.format(
self.netns,
self.name,
tx_queue_len
)
)
52 changes: 52 additions & 0 deletions tests/integration_tests/functional/test_net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for the net device."""
import time

import framework.utils as utils
import host_tools.network as net_tools

# The iperf version to run this tests with
IPERF_BINARY = 'iperf3'


def test_high_ingress_traffic(test_microvm_with_ssh, network_config):
"""Run iperf rx with high UDP traffic."""
test_microvm = test_microvm_with_ssh
test_microvm.spawn()

test_microvm.basic_config()

# Create tap before configuring interface.
tap, _host_ip, guest_ip = test_microvm.ssh_network_config(
network_config,
'1'
)
# Set the tap's tx queue len to 5. This increases the probability
# of filling the tap under high ingress traffic.
tap.set_tx_queue_len(5)

# Start the microvm.
test_microvm.start()

# Start iperf3 server on the guest.
ssh_connection = net_tools.SSHConnection(test_microvm.ssh_config)
ssh_connection.execute_command('{} -sD\n'.format(IPERF_BINARY))
time.sleep(1)

# Start iperf3 client on the host. Send 1Gbps UDP traffic.
# If the net device breaks, iperf will freeze. We have to use a timeout.
utils.run_cmd(
'timeout 30 {} {} -c {} -u -V -b 1000000000 -t 30'.format(
test_microvm.jailer.netns_cmd_prefix(),
IPERF_BINARY,
guest_ip,
),
ignore_return_code=True
)

# Check if the high ingress traffic broke the net interface.
# If the net interface still works we should be able to execute
# ssh commands.
exit_code, _, _ = ssh_connection.execute_command('echo success\n')
assert exit_code == 0
42 changes: 42 additions & 0 deletions tests/integration_tests/functional/test_rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,48 @@ def test_rx_rate_limiting(test_microvm_with_ssh, network_config):
_check_rx_rate_limit_patch(test_microvm, guest_ips)


def test_rx_rate_limiting_cpu_load(test_microvm_with_ssh, network_config):
"""Run iperf rx with rate limiting; verify cpu load is below threshold."""
test_microvm = test_microvm_with_ssh
test_microvm.spawn()

test_microvm.basic_config()

# Enable monitor that checks if the cpu load is over the threshold.
# After multiple runs, the average value for the cpu load
# seems to be around 10%. Setting the threshold a little
# higher to skip false positives.
threshold = 20
test_microvm.enable_cpu_load_monitor(threshold)

# Create interface with aggressive rate limiting enabled.
rx_rate_limiter_no_burst = {
'bandwidth': {
'size': 65536, # 64KBytes
'refill_time': 1000 # 1s
}
}
_tap, _host_ip, guest_ip = test_microvm.ssh_network_config(
network_config,
'1',
rx_rate_limiter=rx_rate_limiter_no_burst
)

test_microvm.start()

# Start iperf server on guest.
_start_iperf_on_guest(test_microvm, guest_ip)

# Run iperf client sending UDP traffic.
iperf_cmd = '{} {} -u -c {} -b 1000000000 -t{} -f KBytes'.format(
test_microvm.jailer.netns_cmd_prefix(),
IPERF_BINARY,
guest_ip,
IPERF_TRANSMIT_TIME * 5
)
_iperf_out = _run_local_iperf(iperf_cmd)


def _check_tx_rate_limiting(test_microvm, guest_ips, host_ips):
"""Check that the transmit rate is within expectations."""
# Start iperf on the host as this is the tx rate limiting test.
Expand Down