Skip to content

Commit 8fab384

Browse files
agrafNipaLocal
authored and
NipaLocal
committed
vsock/virtio: Remove queued_replies pushback logic
Ever since the introduction of the virtio vsock driver, it included pushback logic that blocks it from taking any new RX packets until the TX queue backlog becomes shallower than the virtqueue size. This logic works fine when you connect a user space application on the hypervisor with a virtio-vsock target, because the guest will stop receiving data until the host pulled all outstanding data from the VM. With Nitro Enclaves however, we connect 2 VMs directly via vsock: Parent Enclave RX -------- TX TX -------- RX This means we now have 2 virtio-vsock backends that both have the pushback logic. If the parent's TX queue runs full at the same time as the Enclave's, both virtio-vsock drivers fall into the pushback path and no longer accept RX traffic. However, that RX traffic is TX traffic on the other side which blocks that driver from making any forward progress. We're now in a deadlock. To resolve this, let's remove that pushback logic altogether and rely on higher levels (like credits) to ensure we do not consume unbounded memory. RX and TX queues share the same work queue. To prevent starvation of TX by an RX flood and vice versa now that the pushback logic is gone, let's deliberately reschedule RX and TX work after a fixed threshold (256) of packets to process. Fixes: 0ea9e1d ("VSOCK: Introduce virtio_transport.ko") Signed-off-by: Alexander Graf <[email protected]> Acked-by: Michael S. Tsirkin <[email protected]> Signed-off-by: NipaLocal <nipa@local>
1 parent 5e3bf0e commit 8fab384

File tree

1 file changed

+19
-54
lines changed

1 file changed

+19
-54
lines changed

net/vmw_vsock/virtio_transport.c

Lines changed: 19 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ static struct virtio_vsock __rcu *the_virtio_vsock;
2626
static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
2727
static struct virtio_transport virtio_transport; /* forward declaration */
2828

29+
/*
30+
* Max number of RX packets transferred before requeueing so we do
31+
* not starve TX traffic because they share the same work queue.
32+
*/
33+
#define VSOCK_MAX_PKTS_PER_WORK 256
34+
2935
struct virtio_vsock {
3036
struct virtio_device *vdev;
3137
struct virtqueue *vqs[VSOCK_VQ_MAX];
@@ -44,8 +50,6 @@ struct virtio_vsock {
4450
struct work_struct send_pkt_work;
4551
struct sk_buff_head send_pkt_queue;
4652

47-
atomic_t queued_replies;
48-
4953
/* The following fields are protected by rx_lock. vqs[VSOCK_VQ_RX]
5054
* must be accessed with rx_lock held.
5155
*/
@@ -158,7 +162,7 @@ virtio_transport_send_pkt_work(struct work_struct *work)
158162
container_of(work, struct virtio_vsock, send_pkt_work);
159163
struct virtqueue *vq;
160164
bool added = false;
161-
bool restart_rx = false;
165+
int pkts = 0;
162166

163167
mutex_lock(&vsock->tx_lock);
164168

@@ -169,32 +173,24 @@ virtio_transport_send_pkt_work(struct work_struct *work)
169173

170174
for (;;) {
171175
struct sk_buff *skb;
172-
bool reply;
173176
int ret;
174177

178+
if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
179+
/* Allow other works on the same queue to run */
180+
queue_work(virtio_vsock_workqueue, work);
181+
break;
182+
}
183+
175184
skb = virtio_vsock_skb_dequeue(&vsock->send_pkt_queue);
176185
if (!skb)
177186
break;
178187

179-
reply = virtio_vsock_skb_reply(skb);
180-
181188
ret = virtio_transport_send_skb(skb, vq, vsock, GFP_KERNEL);
182189
if (ret < 0) {
183190
virtio_vsock_skb_queue_head(&vsock->send_pkt_queue, skb);
184191
break;
185192
}
186193

187-
if (reply) {
188-
struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
189-
int val;
190-
191-
val = atomic_dec_return(&vsock->queued_replies);
192-
193-
/* Do we now have resources to resume rx processing? */
194-
if (val + 1 == virtqueue_get_vring_size(rx_vq))
195-
restart_rx = true;
196-
}
197-
198194
added = true;
199195
}
200196

@@ -203,9 +199,6 @@ virtio_transport_send_pkt_work(struct work_struct *work)
203199

204200
out:
205201
mutex_unlock(&vsock->tx_lock);
206-
207-
if (restart_rx)
208-
queue_work(virtio_vsock_workqueue, &vsock->rx_work);
209202
}
210203

211204
/* Caller need to hold RCU for vsock.
@@ -261,9 +254,6 @@ virtio_transport_send_pkt(struct sk_buff *skb)
261254
*/
262255
if (!skb_queue_empty_lockless(&vsock->send_pkt_queue) ||
263256
virtio_transport_send_skb_fast_path(vsock, skb)) {
264-
if (virtio_vsock_skb_reply(skb))
265-
atomic_inc(&vsock->queued_replies);
266-
267257
virtio_vsock_skb_queue_tail(&vsock->send_pkt_queue, skb);
268258
queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
269259
}
@@ -277,7 +267,7 @@ static int
277267
virtio_transport_cancel_pkt(struct vsock_sock *vsk)
278268
{
279269
struct virtio_vsock *vsock;
280-
int cnt = 0, ret;
270+
int ret;
281271

282272
rcu_read_lock();
283273
vsock = rcu_dereference(the_virtio_vsock);
@@ -286,17 +276,7 @@ virtio_transport_cancel_pkt(struct vsock_sock *vsk)
286276
goto out_rcu;
287277
}
288278

289-
cnt = virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
290-
291-
if (cnt) {
292-
struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
293-
int new_cnt;
294-
295-
new_cnt = atomic_sub_return(cnt, &vsock->queued_replies);
296-
if (new_cnt + cnt >= virtqueue_get_vring_size(rx_vq) &&
297-
new_cnt < virtqueue_get_vring_size(rx_vq))
298-
queue_work(virtio_vsock_workqueue, &vsock->rx_work);
299-
}
279+
virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
300280

301281
ret = 0;
302282

@@ -367,18 +347,6 @@ static void virtio_transport_tx_work(struct work_struct *work)
367347
queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
368348
}
369349

370-
/* Is there space left for replies to rx packets? */
371-
static bool virtio_transport_more_replies(struct virtio_vsock *vsock)
372-
{
373-
struct virtqueue *vq = vsock->vqs[VSOCK_VQ_RX];
374-
int val;
375-
376-
smp_rmb(); /* paired with atomic_inc() and atomic_dec_return() */
377-
val = atomic_read(&vsock->queued_replies);
378-
379-
return val < virtqueue_get_vring_size(vq);
380-
}
381-
382350
/* event_lock must be held */
383351
static int virtio_vsock_event_fill_one(struct virtio_vsock *vsock,
384352
struct virtio_vsock_event *event)
@@ -613,6 +581,7 @@ static void virtio_transport_rx_work(struct work_struct *work)
613581
struct virtio_vsock *vsock =
614582
container_of(work, struct virtio_vsock, rx_work);
615583
struct virtqueue *vq;
584+
int pkts = 0;
616585

617586
vq = vsock->vqs[VSOCK_VQ_RX];
618587

@@ -627,11 +596,9 @@ static void virtio_transport_rx_work(struct work_struct *work)
627596
struct sk_buff *skb;
628597
unsigned int len;
629598

630-
if (!virtio_transport_more_replies(vsock)) {
631-
/* Stop rx until the device processes already
632-
* pending replies. Leave rx virtqueue
633-
* callbacks disabled.
634-
*/
599+
if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
600+
/* Allow other works on the same queue to run */
601+
queue_work(virtio_vsock_workqueue, work);
635602
goto out;
636603
}
637604

@@ -675,8 +642,6 @@ static int virtio_vsock_vqs_init(struct virtio_vsock *vsock)
675642
vsock->rx_buf_max_nr = 0;
676643
mutex_unlock(&vsock->rx_lock);
677644

678-
atomic_set(&vsock->queued_replies, 0);
679-
680645
ret = virtio_find_vqs(vdev, VSOCK_VQ_MAX, vsock->vqs, vqs_info, NULL);
681646
if (ret < 0)
682647
return ret;

0 commit comments

Comments
 (0)