Skip to content

Commit 9852727

Browse files
committed
fix(common): replace usage of Notify::notify_waiters with Notify::notify_one
1 parent c004653 commit 9852727

File tree

1 file changed

+38
-21
lines changed

1 file changed

+38
-21
lines changed

src/common/buf.rs

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,28 @@ use std::sync::atomic::{AtomicBool, Ordering};
33
use std::sync::Arc;
44
use tokio::sync::Notify;
55

6-
#[derive(Clone)]
6+
/// Signaler returned as part of `NotifyOnEos::new` that can be polled to receive information,
7+
/// when the buffer gets advanced to the end.
8+
// Cannot be Clone due to usage of `Notify::notify_one` in `NotifyOnEos::advance`,
9+
// revisit once `Notify::notify_all` stabilizes.
710
pub struct EosSignaler {
811
notifier: Arc<Notify>,
912
}
1013

1114
impl EosSignaler {
12-
fn notify_eos(&self) {
13-
self.notifier.notify_waiters();
14-
}
15-
1615
pub async fn wait_till_eos(self) {
1716
self.notifier.notified().await;
1817
}
1918
}
2019

21-
pub struct AlertOnEos<B> {
20+
/// Wrapper for `bytes::Buf` that returns a `EosSignaler` that can be polled to receive information,
21+
/// when the buffer gets advanced to the end.
22+
///
23+
/// NOTE: For the notification to work, caller must ensure that `Buf::advance` gets called
24+
/// enough times to advance to the end of the buffer (so that `Buf::has_remaining` afterwards returns `0`).
25+
pub struct NotifyOnEos<B> {
2226
inner: B,
23-
signaler: EosSignaler,
27+
notifier: Arc<Notify>,
2428
// It'd be better if we consumed the signaler, making it inaccessible after notification.
2529
// Unfortunately, that would require something like AtomicOption.
2630
// arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
@@ -29,21 +33,20 @@ pub struct AlertOnEos<B> {
2933
has_already_signaled: AtomicBool,
3034
}
3135

32-
impl<B> AlertOnEos<B> {
36+
impl<B> NotifyOnEos<B> {
3337
pub fn new(inner: B) -> (Self, EosSignaler) {
34-
let signal = EosSignaler {
35-
notifier: Arc::new(Notify::new()),
36-
};
38+
let notifier = Arc::new(Notify::new());
3739
let this = Self {
3840
inner,
39-
signaler: signal.clone(),
41+
notifier: notifier.clone(),
4042
has_already_signaled: AtomicBool::new(false),
4143
};
44+
let signal = EosSignaler { notifier };
4245
(this, signal)
4346
}
4447
}
4548

46-
impl<B: Buf> Buf for AlertOnEos<B> {
49+
impl<B: Buf> Buf for NotifyOnEos<B> {
4750
fn remaining(&self) -> usize {
4851
self.inner.remaining()
4952
}
@@ -55,22 +58,36 @@ impl<B: Buf> Buf for AlertOnEos<B> {
5558
fn advance(&mut self, cnt: usize) {
5659
self.inner.advance(cnt);
5760
if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) {
58-
self.signaler.notify_eos();
61+
// tokio::sync::Notify has private method `notify_all` that, once stabilized,
62+
// would allow us to make `EosSignaler` Cloneable with better ergonomics
63+
// to await EOS from multiple places.
64+
self.notifier.notify_one();
5965
}
6066
}
6167
}
6268

6369
#[cfg(test)]
6470
mod tests {
65-
use crate::common::buf::AlertOnEos;
66-
use hyper::body::Bytes;
71+
use crate::common::buf::NotifyOnEos;
72+
use hyper::body::{Buf, Bytes};
6773
use std::time::Duration;
6874

6975
#[tokio::test]
70-
async fn test_get_notified() {
71-
let buf = Bytes::from_static(b"");
72-
let (_buf, signaler) = AlertOnEos::new(buf);
73-
let result = tokio::time::timeout(Duration::from_secs(1), signaler.wait_till_eos()).await;
74-
assert_eq!(result, Ok(()));
76+
async fn test_get_notified_immediately() {
77+
let buf = Bytes::from_static(b"abc");
78+
let (mut buf, signaler) = NotifyOnEos::new(buf);
79+
buf.advance(3);
80+
signaler.wait_till_eos().await;
81+
}
82+
83+
#[tokio::test]
84+
async fn test_get_notified_after_1ms() {
85+
let buf = Bytes::from_static(b"abc");
86+
let (mut buf, signaler) = NotifyOnEos::new(buf);
87+
tokio::spawn(async move {
88+
tokio::time::sleep(Duration::from_millis(1)).await;
89+
buf.advance(3);
90+
});
91+
signaler.wait_till_eos().await;
7592
}
7693
}

0 commit comments

Comments
 (0)