|
12 | 12 |
|
13 | 13 | struct kmem_cache *sk_msg_cachep;
|
14 | 14 |
|
15 |
| -static bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce) |
| 15 | +bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce) |
16 | 16 | {
|
17 | 17 | if (msg->sg.end > msg->sg.start &&
|
18 | 18 | elem_first_coalesce < msg->sg.end)
|
@@ -713,6 +713,118 @@ static void sk_psock_backlog(struct work_struct *work)
|
713 | 713 | mutex_unlock(&psock->work_mutex);
|
714 | 714 | }
|
715 | 715 |
|
| 716 | +static bool backlog_notify(struct sk_psock *psock, bool m_sched_failed, |
| 717 | + bool ingress_empty) |
| 718 | +{ |
| 719 | + /* Notify if: |
| 720 | + * 1. We have corked enough bytes |
| 721 | + * 2. We have already delayed notification |
| 722 | + * 3. Memory allocation failed |
| 723 | + * 4. Ingress queue was empty and we're about to add data |
| 724 | + */ |
| 725 | + return psock->backlog_since_notify >= TCP_BPF_GSO_SIZE || |
| 726 | + psock->backlog_work_delayed || |
| 727 | + m_sched_failed || |
| 728 | + ingress_empty; |
| 729 | +} |
| 730 | + |
| 731 | +static bool backlog_xfer_to_local(struct sk_psock *psock, struct sock *sk_from, |
| 732 | + struct list_head *local_head, u32 *tot_size) |
| 733 | +{ |
| 734 | + struct sock *sk = psock->sk; |
| 735 | + struct sk_msg *msg, *tmp; |
| 736 | + u32 size = 0; |
| 737 | + |
| 738 | + list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) { |
| 739 | + if (msg->sk != sk_from) |
| 740 | + break; |
| 741 | + |
| 742 | + if (!__sk_rmem_schedule(sk, msg->sg.size, false)) |
| 743 | + return true; |
| 744 | + |
| 745 | + list_move_tail(&msg->list, local_head); |
| 746 | + sk_wmem_queued_add(msg->sk, -msg->sg.size); |
| 747 | + sock_put(msg->sk); |
| 748 | + msg->sk = NULL; |
| 749 | + psock->backlog_since_notify += msg->sg.size; |
| 750 | + size += msg->sg.size; |
| 751 | + } |
| 752 | + |
| 753 | + *tot_size = size; |
| 754 | + return false; |
| 755 | +} |
| 756 | + |
| 757 | +/* This function handles the transfer of backlogged messages from the sender |
| 758 | + * backlog queue to the ingress queue of the peer socket. Notification of data |
| 759 | + * availability will be sent under some conditions. |
| 760 | + */ |
| 761 | +void sk_psock_backlog_msg(struct sk_psock *psock) |
| 762 | +{ |
| 763 | + bool rmem_schedule_failed = false; |
| 764 | + struct sock *sk_from = NULL; |
| 765 | + struct sock *sk = psock->sk; |
| 766 | + LIST_HEAD(local_head); |
| 767 | + struct sk_msg *msg; |
| 768 | + bool should_notify; |
| 769 | + u32 tot_size = 0; |
| 770 | + |
| 771 | + if (!sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) |
| 772 | + return; |
| 773 | + |
| 774 | + lock_sock(sk); |
| 775 | + spin_lock(&psock->backlog_msg_lock); |
| 776 | + |
| 777 | + msg = list_first_entry_or_null(&psock->backlog_msg, |
| 778 | + struct sk_msg, list); |
| 779 | + if (!msg) { |
| 780 | + should_notify = !list_empty(&psock->ingress_msg); |
| 781 | + spin_unlock(&psock->backlog_msg_lock); |
| 782 | + goto notify; |
| 783 | + } |
| 784 | + |
| 785 | + sk_from = msg->sk; |
| 786 | + sock_hold(sk_from); |
| 787 | + |
| 788 | + rmem_schedule_failed = backlog_xfer_to_local(psock, sk_from, |
| 789 | + &local_head, &tot_size); |
| 790 | + should_notify = backlog_notify(psock, rmem_schedule_failed, |
| 791 | + list_empty(&psock->ingress_msg)); |
| 792 | + spin_unlock(&psock->backlog_msg_lock); |
| 793 | + |
| 794 | + spin_lock_bh(&psock->ingress_lock); |
| 795 | + list_splice_tail_init(&local_head, &psock->ingress_msg); |
| 796 | + spin_unlock_bh(&psock->ingress_lock); |
| 797 | + |
| 798 | + atomic_add(tot_size, &sk->sk_rmem_alloc); |
| 799 | + sk_mem_charge(sk, tot_size); |
| 800 | + |
| 801 | +notify: |
| 802 | + if (should_notify) { |
| 803 | + psock->backlog_since_notify = 0; |
| 804 | + sk_psock_data_ready(sk, psock); |
| 805 | + if (!list_empty(&psock->backlog_msg)) |
| 806 | + sk_psock_run_backlog_work(psock, rmem_schedule_failed); |
| 807 | + } else { |
| 808 | + sk_psock_run_backlog_work(psock, true); |
| 809 | + } |
| 810 | + release_sock(sk); |
| 811 | + |
| 812 | + if (sk_from) { |
| 813 | + bool slow = lock_sock_fast(sk_from); |
| 814 | + |
| 815 | + sk_mem_uncharge(sk_from, tot_size); |
| 816 | + unlock_sock_fast(sk_from, slow); |
| 817 | + sock_put(sk_from); |
| 818 | + } |
| 819 | +} |
| 820 | + |
| 821 | +static void sk_psock_backlog_msg_work(struct work_struct *work) |
| 822 | +{ |
| 823 | + struct delayed_work *dwork = to_delayed_work(work); |
| 824 | + |
| 825 | + sk_psock_backlog_msg(container_of(dwork, struct sk_psock, backlog_work)); |
| 826 | +} |
| 827 | + |
716 | 828 | struct sk_psock *sk_psock_init(struct sock *sk, int node)
|
717 | 829 | {
|
718 | 830 | struct sk_psock *psock;
|
@@ -750,8 +862,11 @@ struct sk_psock *sk_psock_init(struct sock *sk, int node)
|
750 | 862 |
|
751 | 863 | INIT_DELAYED_WORK(&psock->work, sk_psock_backlog);
|
752 | 864 | mutex_init(&psock->work_mutex);
|
| 865 | + INIT_DELAYED_WORK(&psock->backlog_work, sk_psock_backlog_msg_work); |
753 | 866 | INIT_LIST_HEAD(&psock->ingress_msg);
|
754 | 867 | spin_lock_init(&psock->ingress_lock);
|
| 868 | + INIT_LIST_HEAD(&psock->backlog_msg); |
| 869 | + spin_lock_init(&psock->backlog_msg_lock); |
755 | 870 | skb_queue_head_init(&psock->ingress_skb);
|
756 | 871 |
|
757 | 872 | sk_psock_set_state(psock, SK_PSOCK_TX_ENABLED);
|
@@ -805,6 +920,26 @@ static void __sk_psock_zap_ingress(struct sk_psock *psock)
|
805 | 920 | __sk_psock_purge_ingress_msg(psock);
|
806 | 921 | }
|
807 | 922 |
|
| 923 | +static void __sk_psock_purge_backlog_msg(struct sk_psock *psock) |
| 924 | +{ |
| 925 | + struct sk_msg *msg, *tmp; |
| 926 | + |
| 927 | + spin_lock(&psock->backlog_msg_lock); |
| 928 | + list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) { |
| 929 | + struct sock *sk_from = msg->sk; |
| 930 | + bool slow; |
| 931 | + |
| 932 | + list_del(&msg->list); |
| 933 | + slow = lock_sock_fast(sk_from); |
| 934 | + sk_wmem_queued_add(sk_from, -msg->sg.size); |
| 935 | + sock_put(sk_from); |
| 936 | + sk_msg_free(sk_from, msg); |
| 937 | + unlock_sock_fast(sk_from, slow); |
| 938 | + kfree_sk_msg(msg); |
| 939 | + } |
| 940 | + spin_unlock(&psock->backlog_msg_lock); |
| 941 | +} |
| 942 | + |
808 | 943 | static void sk_psock_link_destroy(struct sk_psock *psock)
|
809 | 944 | {
|
810 | 945 | struct sk_psock_link *link, *tmp;
|
@@ -834,7 +969,9 @@ static void sk_psock_destroy(struct work_struct *work)
|
834 | 969 | sk_psock_done_strp(psock);
|
835 | 970 |
|
836 | 971 | cancel_delayed_work_sync(&psock->work);
|
| 972 | + cancel_delayed_work_sync(&psock->backlog_work); |
837 | 973 | __sk_psock_zap_ingress(psock);
|
| 974 | + __sk_psock_purge_backlog_msg(psock); |
838 | 975 | mutex_destroy(&psock->work_mutex);
|
839 | 976 |
|
840 | 977 | psock_progs_drop(&psock->progs);
|
|
0 commit comments