Skip to content

Commit 1b1f563

Browse files
author
Martin KaFai Lau
committed
Merge branch 'bpf-udp-exactly-once-socket-iteration'
Jordan Rife says: ==================== bpf: udp: Exactly-once socket iteration Both UDP and TCP socket iterators use iter->offset to track progress through a bucket, which is a measure of the number of matching sockets from the current bucket that have been seen or processed by the iterator. On subsequent iterations, if the current bucket has unprocessed items, we skip at least iter->offset matching items in the bucket before adding any remaining items to the next batch. However, iter->offset isn't always an accurate measure of "things already seen" when the underlying bucket changes between reads which can lead to repeated or skipped sockets. Instead, this series remembers the cookies of the sockets we haven't seen yet in the current bucket and resumes from the first cookie in that list that we can find on the next iteration. This series focuses on UDP socket iterators, but a later series will apply a similar approach to TCP socket iterators. To be more specific, this series replaces struct sock **batch inside struct bpf_udp_iter_state with union bpf_udp_iter_batch_item *batch, where union bpf_udp_iter_batch_item can contain either a pointer to a socket or a socket cookie. During reads, batch contains pointers to all sockets in the current batch while between reads batch contains all the cookies of the sockets in the current bucket that have yet to be processed. On subsequent reads, when iteration resumes, bpf_iter_udp_batch finds the first saved cookie that matches a socket in the bucket's socket list and picks up from there to construct the next batch. On average, assuming it's rare that the next socket disappears before the next read occurs, we should only need to scan as much as we did with the offset-based approach to find the starting point. In the case that the next socket is no longer there, we keep scanning through the saved cookies list until we find a match. The worst case is when none of the sockets from last time exist anymore, but again, this should be rare. CHANGES ======= v6 -> v7: * Move initialization of iter->state.bucket to -1 from patch five ("bpf: udp: Avoid socket skips and repeats during iteration") to patch three ("bpf: udp: Get rid of st_bucket_done") to avoid skipping the first bucket in the patch three and four (Martin). * Rename sock to sk in bpf_iter_batch_item (Martin). * Use ASSERT_OK_PTR in do_resume_test to check if counts is NULL (Martin). * goto done in do_resume_test when calloc or sock_iter_batch__open fails to make sure things are cleaned up properly, and initialize pointers to NULL explicitly to silence warnings from llvm 20 in CI. v5 -> v6: * Rework the logic in patch two ("bpf: udp: Make sure iter->batch always contains a full bucket snapshot") again to simplify it: * Only try realloc with GFP_USER one time instead of two (Alexei). * v5 introduced a second call to bpf_iter_udp_realloc_batch inside the loop to handle the GFP_ATOMIC case. In v6, move the GFP_USER case inside the loop as well, so it's all in once place. This, I feel, makes it a bit easier to understand the control flow. Consequently, it also simplifies the logic outside the loop. * Use GFP_NOWAIT instead of GFP_ATOMIC to avoid depleting memory reserves, since iterators are not critical operation (Alexei). Alexei suggested using __GFP_NOWARN as well with GFP_NOWAIT, but this is already set inside bpf_iter_udp_realloc_batch, so no change was needed there. * Introduce patch three ("bpf: udp: Get rid of st_bucket_done") to simplify things further, since with patch two, st_bucket_done == true is equivalent to iter->cur_sk == iter->end_sk. * In patch five ("bpf: udp: Avoid socket skips and repeats during iteration"), initialize iter->state.bucket to -1 so that on the first call to bpf_iter_udp_batch, the resume_bucket condition is not hit. This avoids adding a special case to the condition around bpf_iter_udp_resume for bucket zero. v4 -> v5: * Rework the logic from patch two ("bpf: udp: Make sure iter->batch always contains a full bucket snapshot") to move the handling of the GFP_ATOMIC case inside the main loop and get rid of the extra lock variable. This makes the logic clearer and makes it clearer that the bucket lock is always released (Martin). * Introduce udp_portaddr_for_each_entry_from in patch two instead of patch four ("bpf: udp: Avoid socket skips and repeats during iteration"), since patch two now needs to be able to resume list iteration from an arbitrary point in the GFP_ATOMIC case. * Similarly, introduce the memcpy inside bpf_iter_udp_realloc_batch in patch two instead of patch four, since in the GFP_ATOMIC case the new batch needs to remember the sockets from the old batch. * Use sock_gen_cookie instead of __sock_gen_cookie inside bpf_iter_udp_put_batch, since it can be called from a preemptible context (Martin). v3 -> v4: * Explicitly assign sk = NULL on !iter->end_sk exit condition (Kuniyuki). * Reword the commit message of patch two ("bpf: udp: Make sure iter->batch always contains a full bucket snapshot") to make the reasoning for GFP_ATOMIC more clear. v2 -> v3: * Guarantee that iter->batch is always a full snapshot of a bucket to prevent socket repeat scenarios [3]. This supercedes the patch from v2 that simply propagated ENOMEM up from bpf_iter_udp_batch and covers the scenario where the batch size is still too small after a realloc. * Fix up self tests (Martin) * ASSERT_EQ(nread, sizeof(out), "nread") instead of ASSERT_GE(nread, 1, "nread) in read_n. * Use ASSERT_OK and ASSERT_OK_FD in several places. * Add missing free(counts) to do_resume_test. * Move int local_port declaration to the top of do_resume_test. * Remove unnecessary guards before close and free. v1 -> v2: * Drop WARN_ON_ONCE from bpf_iter_udp_realloc_batch (Kuniyuki). * Fixed memcpy size parameter in bpf_iter_udp_realloc_batch; before it was missing sizeof(elem) * (Kuniyuki). * Move "bpf: udp: Propagate ENOMEM up from bpf_iter_udp_batch" to patch two in the series (Kuniyuki). rfc [1] -> v1: * Use hlist_entry_safe directly to retrieve the first socket in the current bucket's linked list instead of immediately breaking from udp_portaddr_for_each_entry (Martin). * Cancel iteration if bpf_iter_udp_realloc_batch() can't grab enough memory to contain a full snapshot of the current bucket to prevent unwanted skips or repeats [2]. [1]: https://lore.kernel.org/bpf/[email protected]/ [2]: https://lore.kernel.org/bpf/CABi4-ogUtMrH8-NVB6W8Xg_F_KDLq=yy-yu-tKr2udXE2Mu1Lg@mail.gmail.com/ [3]: https://lore.kernel.org/bpf/[email protected]/ ==================== Link: https://patch.msgid.link/[email protected] Signed-off-by: Martin KaFai Lau <[email protected]>
2 parents 7625645 + c58dcc1 commit 1b1f563

File tree

5 files changed

+575
-73
lines changed

5 files changed

+575
-73
lines changed

include/linux/udp.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,9 @@ static inline void udp_allow_gso(struct sock *sk)
216216
#define udp_portaddr_for_each_entry(__sk, list) \
217217
hlist_for_each_entry(__sk, list, __sk_common.skc_portaddr_node)
218218

219+
#define udp_portaddr_for_each_entry_from(__sk) \
220+
hlist_for_each_entry_from(__sk, __sk_common.skc_portaddr_node)
221+
219222
#define udp_portaddr_for_each_entry_rcu(__sk, list) \
220223
hlist_for_each_entry_rcu(__sk, list, __sk_common.skc_portaddr_node)
221224

net/ipv4/udp.c

Lines changed: 117 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
#include <linux/inet.h>
9494
#include <linux/netdevice.h>
9595
#include <linux/slab.h>
96+
#include <linux/sock_diag.h>
9697
#include <net/tcp_states.h>
9798
#include <linux/skbuff.h>
9899
#include <linux/proc_fs.h>
@@ -3413,34 +3414,55 @@ struct bpf_iter__udp {
34133414
int bucket __aligned(8);
34143415
};
34153416

3417+
union bpf_udp_iter_batch_item {
3418+
struct sock *sk;
3419+
__u64 cookie;
3420+
};
3421+
34163422
struct bpf_udp_iter_state {
34173423
struct udp_iter_state state;
34183424
unsigned int cur_sk;
34193425
unsigned int end_sk;
34203426
unsigned int max_sk;
3421-
int offset;
3422-
struct sock **batch;
3423-
bool st_bucket_done;
3427+
union bpf_udp_iter_batch_item *batch;
34243428
};
34253429

34263430
static int bpf_iter_udp_realloc_batch(struct bpf_udp_iter_state *iter,
3427-
unsigned int new_batch_sz);
3431+
unsigned int new_batch_sz, gfp_t flags);
3432+
static struct sock *bpf_iter_udp_resume(struct sock *first_sk,
3433+
union bpf_udp_iter_batch_item *cookies,
3434+
int n_cookies)
3435+
{
3436+
struct sock *sk = NULL;
3437+
int i;
3438+
3439+
for (i = 0; i < n_cookies; i++) {
3440+
sk = first_sk;
3441+
udp_portaddr_for_each_entry_from(sk)
3442+
if (cookies[i].cookie == atomic64_read(&sk->sk_cookie))
3443+
goto done;
3444+
}
3445+
done:
3446+
return sk;
3447+
}
3448+
34283449
static struct sock *bpf_iter_udp_batch(struct seq_file *seq)
34293450
{
34303451
struct bpf_udp_iter_state *iter = seq->private;
34313452
struct udp_iter_state *state = &iter->state;
3453+
unsigned int find_cookie, end_cookie;
34323454
struct net *net = seq_file_net(seq);
3433-
int resume_bucket, resume_offset;
34343455
struct udp_table *udptable;
34353456
unsigned int batch_sks = 0;
3436-
bool resized = false;
3457+
int resume_bucket;
3458+
int resizes = 0;
34373459
struct sock *sk;
3460+
int err = 0;
34383461

34393462
resume_bucket = state->bucket;
3440-
resume_offset = iter->offset;
34413463

34423464
/* The current batch is done, so advance the bucket. */
3443-
if (iter->st_bucket_done)
3465+
if (iter->cur_sk == iter->end_sk)
34443466
state->bucket++;
34453467

34463468
udptable = udp_get_table_seq(seq, net);
@@ -3453,62 +3475,89 @@ static struct sock *bpf_iter_udp_batch(struct seq_file *seq)
34533475
* before releasing the bucket lock. This allows BPF programs that are
34543476
* called in seq_show to acquire the bucket lock if needed.
34553477
*/
3478+
find_cookie = iter->cur_sk;
3479+
end_cookie = iter->end_sk;
34563480
iter->cur_sk = 0;
34573481
iter->end_sk = 0;
3458-
iter->st_bucket_done = false;
34593482
batch_sks = 0;
34603483

34613484
for (; state->bucket <= udptable->mask; state->bucket++) {
34623485
struct udp_hslot *hslot2 = &udptable->hash2[state->bucket].hslot;
34633486

34643487
if (hlist_empty(&hslot2->head))
3465-
continue;
3488+
goto next_bucket;
34663489

3467-
iter->offset = 0;
34683490
spin_lock_bh(&hslot2->lock);
3469-
udp_portaddr_for_each_entry(sk, &hslot2->head) {
3491+
sk = hlist_entry_safe(hslot2->head.first, struct sock,
3492+
__sk_common.skc_portaddr_node);
3493+
/* Resume from the first (in iteration order) unseen socket from
3494+
* the last batch that still exists in resume_bucket. Most of
3495+
* the time this will just be where the last iteration left off
3496+
* in resume_bucket unless that socket disappeared between
3497+
* reads.
3498+
*/
3499+
if (state->bucket == resume_bucket)
3500+
sk = bpf_iter_udp_resume(sk, &iter->batch[find_cookie],
3501+
end_cookie - find_cookie);
3502+
fill_batch:
3503+
udp_portaddr_for_each_entry_from(sk) {
34703504
if (seq_sk_match(seq, sk)) {
3471-
/* Resume from the last iterated socket at the
3472-
* offset in the bucket before iterator was stopped.
3473-
*/
3474-
if (state->bucket == resume_bucket &&
3475-
iter->offset < resume_offset) {
3476-
++iter->offset;
3477-
continue;
3478-
}
34793505
if (iter->end_sk < iter->max_sk) {
34803506
sock_hold(sk);
3481-
iter->batch[iter->end_sk++] = sk;
3507+
iter->batch[iter->end_sk++].sk = sk;
34823508
}
34833509
batch_sks++;
34843510
}
34853511
}
3512+
3513+
/* Allocate a larger batch and try again. */
3514+
if (unlikely(resizes <= 1 && iter->end_sk &&
3515+
iter->end_sk != batch_sks)) {
3516+
resizes++;
3517+
3518+
/* First, try with GFP_USER to maximize the chances of
3519+
* grabbing more memory.
3520+
*/
3521+
if (resizes == 1) {
3522+
spin_unlock_bh(&hslot2->lock);
3523+
err = bpf_iter_udp_realloc_batch(iter,
3524+
batch_sks * 3 / 2,
3525+
GFP_USER);
3526+
if (err)
3527+
return ERR_PTR(err);
3528+
/* Start over. */
3529+
goto again;
3530+
}
3531+
3532+
/* Next, hold onto the lock, so the bucket doesn't
3533+
* change while we get the rest of the sockets.
3534+
*/
3535+
err = bpf_iter_udp_realloc_batch(iter, batch_sks,
3536+
GFP_NOWAIT);
3537+
if (err) {
3538+
spin_unlock_bh(&hslot2->lock);
3539+
return ERR_PTR(err);
3540+
}
3541+
3542+
/* Pick up where we left off. */
3543+
sk = iter->batch[iter->end_sk - 1].sk;
3544+
sk = hlist_entry_safe(sk->__sk_common.skc_portaddr_node.next,
3545+
struct sock,
3546+
__sk_common.skc_portaddr_node);
3547+
batch_sks = iter->end_sk;
3548+
goto fill_batch;
3549+
}
3550+
34863551
spin_unlock_bh(&hslot2->lock);
34873552

34883553
if (iter->end_sk)
34893554
break;
3555+
next_bucket:
3556+
resizes = 0;
34903557
}
34913558

3492-
/* All done: no batch made. */
3493-
if (!iter->end_sk)
3494-
return NULL;
3495-
3496-
if (iter->end_sk == batch_sks) {
3497-
/* Batching is done for the current bucket; return the first
3498-
* socket to be iterated from the batch.
3499-
*/
3500-
iter->st_bucket_done = true;
3501-
goto done;
3502-
}
3503-
if (!resized && !bpf_iter_udp_realloc_batch(iter, batch_sks * 3 / 2)) {
3504-
resized = true;
3505-
/* After allocating a larger batch, retry one more time to grab
3506-
* the whole bucket.
3507-
*/
3508-
goto again;
3509-
}
3510-
done:
3511-
return iter->batch[0];
3559+
WARN_ON_ONCE(iter->end_sk != batch_sks);
3560+
return iter->end_sk ? iter->batch[0].sk : NULL;
35123561
}
35133562

35143563
static void *bpf_iter_udp_seq_next(struct seq_file *seq, void *v, loff_t *pos)
@@ -3519,16 +3568,14 @@ static void *bpf_iter_udp_seq_next(struct seq_file *seq, void *v, loff_t *pos)
35193568
/* Whenever seq_next() is called, the iter->cur_sk is
35203569
* done with seq_show(), so unref the iter->cur_sk.
35213570
*/
3522-
if (iter->cur_sk < iter->end_sk) {
3523-
sock_put(iter->batch[iter->cur_sk++]);
3524-
++iter->offset;
3525-
}
3571+
if (iter->cur_sk < iter->end_sk)
3572+
sock_put(iter->batch[iter->cur_sk++].sk);
35263573

35273574
/* After updating iter->cur_sk, check if there are more sockets
35283575
* available in the current bucket batch.
35293576
*/
35303577
if (iter->cur_sk < iter->end_sk)
3531-
sk = iter->batch[iter->cur_sk];
3578+
sk = iter->batch[iter->cur_sk].sk;
35323579
else
35333580
/* Prepare a new batch. */
35343581
sk = bpf_iter_udp_batch(seq);
@@ -3592,8 +3639,19 @@ static int bpf_iter_udp_seq_show(struct seq_file *seq, void *v)
35923639

35933640
static void bpf_iter_udp_put_batch(struct bpf_udp_iter_state *iter)
35943641
{
3595-
while (iter->cur_sk < iter->end_sk)
3596-
sock_put(iter->batch[iter->cur_sk++]);
3642+
union bpf_udp_iter_batch_item *item;
3643+
unsigned int cur_sk = iter->cur_sk;
3644+
__u64 cookie;
3645+
3646+
/* Remember the cookies of the sockets we haven't seen yet, so we can
3647+
* pick up where we left off next time around.
3648+
*/
3649+
while (cur_sk < iter->end_sk) {
3650+
item = &iter->batch[cur_sk++];
3651+
cookie = sock_gen_cookie(item->sk);
3652+
sock_put(item->sk);
3653+
item->cookie = cookie;
3654+
}
35973655
}
35983656

35993657
static void bpf_iter_udp_seq_stop(struct seq_file *seq, void *v)
@@ -3609,10 +3667,8 @@ static void bpf_iter_udp_seq_stop(struct seq_file *seq, void *v)
36093667
(void)udp_prog_seq_show(prog, &meta, v, 0, 0);
36103668
}
36113669

3612-
if (iter->cur_sk < iter->end_sk) {
3670+
if (iter->cur_sk < iter->end_sk)
36133671
bpf_iter_udp_put_batch(iter);
3614-
iter->st_bucket_done = false;
3615-
}
36163672
}
36173673

36183674
static const struct seq_operations bpf_iter_udp_seq_ops = {
@@ -3863,16 +3919,19 @@ DEFINE_BPF_ITER_FUNC(udp, struct bpf_iter_meta *meta,
38633919
struct udp_sock *udp_sk, uid_t uid, int bucket)
38643920

38653921
static int bpf_iter_udp_realloc_batch(struct bpf_udp_iter_state *iter,
3866-
unsigned int new_batch_sz)
3922+
unsigned int new_batch_sz, gfp_t flags)
38673923
{
3868-
struct sock **new_batch;
3924+
union bpf_udp_iter_batch_item *new_batch;
38693925

38703926
new_batch = kvmalloc_array(new_batch_sz, sizeof(*new_batch),
3871-
GFP_USER | __GFP_NOWARN);
3927+
flags | __GFP_NOWARN);
38723928
if (!new_batch)
38733929
return -ENOMEM;
38743930

3875-
bpf_iter_udp_put_batch(iter);
3931+
if (flags != GFP_NOWAIT)
3932+
bpf_iter_udp_put_batch(iter);
3933+
3934+
memcpy(new_batch, iter->batch, sizeof(*iter->batch) * iter->end_sk);
38763935
kvfree(iter->batch);
38773936
iter->batch = new_batch;
38783937
iter->max_sk = new_batch_sz;
@@ -3891,10 +3950,12 @@ static int bpf_iter_init_udp(void *priv_data, struct bpf_iter_aux_info *aux)
38913950
if (ret)
38923951
return ret;
38933952

3894-
ret = bpf_iter_udp_realloc_batch(iter, INIT_BATCH_SZ);
3953+
ret = bpf_iter_udp_realloc_batch(iter, INIT_BATCH_SZ, GFP_USER);
38953954
if (ret)
38963955
bpf_iter_fini_seq_net(priv_data);
38973956

3957+
iter->state.bucket = -1;
3958+
38983959
return ret;
38993960
}
39003961

0 commit comments

Comments
 (0)