Skip to content

Commit f2ddcfd

Browse files
committed
Updated tests + stronger ordering
1 parent 6adcfee commit f2ddcfd

File tree

2 files changed

+50
-32
lines changed

2 files changed

+50
-32
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use pin_project::pin_project;
1919
const NONE: u8 = 0;
2020

2121
/// Indicates that `futures` need to be polled.
22-
const NEED_TO_POLL_FUTURES: u8 = 0b1;
22+
const NEED_TO_POLL_FUTURES: u8 = 1;
2323

2424
/// Indicates that `stream` needs to be polled.
2525
const NEED_TO_POLL_STREAM: u8 = 0b10;
@@ -30,7 +30,7 @@ const NEED_TO_POLL: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM;
3030
/// Indicates that current stream is polled at the moment.
3131
const POLLING: u8 = 0b100;
3232

33-
// Indicates that we already called one of wakers.
33+
// Indicates that it already called one of wakers.
3434
const WOKEN: u8 = 0b1000;
3535

3636
/// State which used to determine what needs to be polled, and are we polling
@@ -50,20 +50,20 @@ impl SharedPollState {
5050

5151
/// Swaps state with `POLLING`, returning previous state.
5252
fn begin_polling(&self) -> u8 {
53-
self.state.swap(POLLING, Ordering::AcqRel)
53+
self.state.swap(POLLING, Ordering::SeqCst)
5454
}
5555

5656
/// Performs bitwise or with `to_poll` and given state, returning
5757
/// previous state.
5858
fn set_or(&self, to_poll: u8) -> u8 {
59-
self.state.fetch_or(to_poll, Ordering::AcqRel)
59+
self.state.fetch_or(to_poll, Ordering::SeqCst)
6060
}
6161

6262
/// Performs bitwise or with `to_poll` and current state, stores result
6363
/// with non-`POLLING` state, and returns disjunction result.
64-
fn end_polling(&self, to_poll: u8) -> u8 {
65-
let to_poll = to_poll | self.state.load(Ordering::Acquire);
66-
self.state.store(to_poll & !POLLING & !WOKEN, Ordering::Release);
64+
fn end_polling(&self, mut to_poll: u8) -> u8 {
65+
to_poll |= self.state.swap(!POLLING & !WOKEN, Ordering::SeqCst);
66+
self.state.fetch_and(to_poll & !POLLING & !WOKEN, Ordering::SeqCst);
6767
to_poll
6868
}
6969
}
@@ -319,7 +319,7 @@ where
319319

320320
let poll_state_value = this.poll_state.end_polling(need_to_poll_next);
321321

322-
let is_done = this.futures.is_empty() && *this.is_stream_done;
322+
let is_done = *this.is_stream_done && this.futures.is_empty();
323323

324324
if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL != NONE
325325
&& (polling_with_two_wakers

futures/tests/stream.rs

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,16 @@ fn flatten_unordered() {
8787
let sleep_time = Duration::from_millis(*self.data.last().unwrap_or(&0) as u64);
8888
thread::spawn(move || {
8989
thread::sleep(sleep_time);
90-
woken.swap(true, Ordering::Relaxed);
90+
woken.swap(true, Ordering::SeqCst);
9191
waker.wake_by_ref();
9292
});
9393
} else {
94-
self.woken.swap(true, Ordering::Relaxed);
94+
self.woken.swap(true, Ordering::SeqCst);
9595
ctx.waker().wake_by_ref();
9696
}
9797
self.polled = true;
9898
Poll::Pending
9999
} else {
100-
assert!(
101-
self.woken.swap(false, Ordering::AcqRel),
102-
"Inner stream polled before wake!"
103-
);
104100
self.polled = false;
105101
Poll::Ready(self.data.pop())
106102
}
@@ -126,27 +122,25 @@ fn flatten_unordered() {
126122
let sleep_time = Duration::from_millis(self.base as u64);
127123
thread::spawn(move || {
128124
thread::sleep(sleep_time);
129-
woken.swap(true, Ordering::Relaxed);
125+
woken.swap(true, Ordering::SeqCst);
130126
waker.wake_by_ref();
131127
});
132128
} else {
133-
self.woken.swap(true, Ordering::Relaxed);
129+
self.woken.swap(true, Ordering::SeqCst);
134130
ctx.waker().wake_by_ref();
135131
}
136132
Poll::Pending
137133
} else {
138134
assert!(
139-
self.woken.swap(false, Ordering::AcqRel),
140-
"Stream polled before wake!"
135+
self.woken.swap(false, Ordering::SeqCst),
136+
format!("Stream polled before wake! {}", self.base)
141137
);
138+
let data: Vec<_> = (0..6).into_iter().map(|v| v + self.base * 6).collect();
142139
self.base += 1;
143140
self.polled = false;
144141
Poll::Ready(Some(DataStream {
145142
polled: false,
146-
data: vec![9, 8, 7, 6, 5]
147-
.into_iter()
148-
.map(|v| v * self.base)
149-
.collect(),
143+
data,
150144
wake_immediately: self.wake_immediately && self.base % 2 == 0,
151145
woken: Arc::new(AtomicBool::new(false)),
152146
}))
@@ -156,9 +150,28 @@ fn flatten_unordered() {
156150

157151
// concurrent tests
158152
block_on(async {
159-
let fm_unordered = Interchanger {
153+
let mut fl_unordered = Interchanger {
154+
polled: false,
155+
base: 0,
156+
woken: Arc::new(AtomicBool::new(false)),
157+
wake_immediately: false,
158+
}
159+
.take(10)
160+
.map(|s| s.map(identity))
161+
.flatten()
162+
.collect::<Vec<_>>()
163+
.await;
164+
165+
fl_unordered.sort();
166+
167+
assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
168+
});
169+
170+
// concurrent tests
171+
block_on(async {
172+
let mut fm_unordered = Interchanger {
160173
polled: false,
161-
base: 1,
174+
base: 0,
162175
woken: Arc::new(AtomicBool::new(false)),
163176
wake_immediately: false,
164177
}
@@ -167,7 +180,9 @@ fn flatten_unordered() {
167180
.collect::<Vec<_>>()
168181
.await;
169182

170-
assert_eq!(fm_unordered.len(), 50);
183+
fm_unordered.sort();
184+
185+
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
171186
});
172187

173188
// basic behaviour
@@ -209,9 +224,9 @@ fn flatten_unordered() {
209224

210225
// wake up immmediately
211226
block_on(async {
212-
let fl_unordered = Interchanger {
227+
let mut fl_unordered = Interchanger {
213228
polled: false,
214-
base: 1,
229+
base: 0,
215230
woken: Arc::new(AtomicBool::new(false)),
216231
wake_immediately: true,
217232
}
@@ -221,14 +236,16 @@ fn flatten_unordered() {
221236
.collect::<Vec<_>>()
222237
.await;
223238

224-
assert_eq!(fl_unordered.len(), 50);
239+
fl_unordered.sort();
240+
241+
assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
225242
});
226243

227244
// wake up immmediately
228245
block_on(async {
229-
let fm_unordered = Interchanger {
246+
let mut fm_unordered = Interchanger {
230247
polled: false,
231-
base: 1,
248+
base: 0,
232249
woken: Arc::new(AtomicBool::new(false)),
233250
wake_immediately: true,
234251
}
@@ -237,7 +254,9 @@ fn flatten_unordered() {
237254
.collect::<Vec<_>>()
238255
.await;
239256

240-
assert_eq!(fm_unordered.len(), 50);
257+
fm_unordered.sort();
258+
259+
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
241260
});
242261
}
243262

@@ -310,7 +329,6 @@ fn take_until() {
310329
});
311330
}
312331

313-
#[cfg(feature = "executor")] // executor::
314332
#[test]
315333
#[should_panic]
316334
fn ready_chunks_panic_on_cap_zero() {

0 commit comments

Comments
 (0)