Skip to content

Commit c91d08b

Browse files
committed
Return StreamFut instead of stream, small code refactoring
1 parent 7dc39b0 commit c91d08b

File tree

1 file changed

+31
-27
lines changed

1 file changed

+31
-27
lines changed

futures-util/src/stream/stream/flat_map_unordered.rs

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ impl ArcWake for PollWaker {
8585
}
8686

8787
/// Future which contains optional stream. If it's `Some`, it will attempt
88-
/// to call `poll_next` on it, returning `Some((item, stream))` in case of
89-
/// `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`.
88+
/// to call `poll_next` on it, returning `Some((item, next_item_fut))` in
89+
/// case of `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`.
9090
/// If `poll_next` will return `Poll::Pending`, it will be forwared to
9191
/// the future, and current task will be notified by waker.
9292
#[must_use = "futures do nothing unless you `.await` or poll them"]
@@ -96,12 +96,19 @@ struct StreamFut<St> {
9696

9797
impl<St> StreamFut<St> {
9898
unsafe_pinned!(stream: Option<St>);
99+
100+
/// Constructs new `StreamFut` using given `stream`.
101+
fn new(stream: St) -> Self {
102+
Self {
103+
stream: stream.into(),
104+
}
105+
}
99106
}
100107

101108
impl<St: Stream + Unpin> Unpin for StreamFut<St> {}
102109

103110
impl<St: Stream> Future for StreamFut<St> {
104-
type Output = Option<(St::Item, St)>;
111+
type Output = Option<(St::Item, StreamFut<St>)>;
105112

106113
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
107114
let item = if let Some(stream) = self.as_mut().stream().as_pin_mut() {
@@ -111,9 +118,12 @@ impl<St: Stream> Future for StreamFut<St> {
111118
};
112119

113120
Poll::Ready(item.map(|item| {
114-
(item, unsafe {
115-
self.get_unchecked_mut().stream.take().unwrap()
116-
})
121+
(
122+
item,
123+
StreamFut {
124+
stream: unsafe { self.get_unchecked_mut().stream.take() },
125+
},
126+
)
117127
}))
118128
}
119129
}
@@ -264,16 +274,17 @@ where
264274
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265275
let mut poll_state_value = self.as_mut().poll_state().begin_polling();
266276

277+
println!("STATE {}", poll_state_value);
278+
267279
let mut next_item = None;
268280
let mut need_to_poll_next = NONE;
269281
let mut polling_with_two_wakers =
270282
poll_state_value & NEED_TO_POLL == NEED_TO_POLL && self.not_exceeded_limit();
271-
let mut polled_stream = false;
272-
let mut polled_futures = false;
283+
let mut stream_will_be_woken = false;
284+
let mut futures_will_be_woken = false;
273285

274286
if poll_state_value & NEED_TO_POLL_STREAM != NONE {
275287
if self.not_exceeded_limit() {
276-
polled_stream = true;
277288
match if polling_with_two_wakers {
278289
let waker = self.create_poll_stream_waker(ctx);
279290
let mut ctx = Context::from_waker(&waker);
@@ -282,9 +293,7 @@ where
282293
self.as_mut().stream().poll_next(ctx)
283294
} {
284295
Poll::Ready(Some(inner_stream)) => {
285-
self.as_mut().futures().push(StreamFut {
286-
stream: Some(inner_stream),
287-
});
296+
self.as_mut().futures().push(StreamFut::new(inner_stream));
288297
need_to_poll_next |= NEED_TO_POLL_STREAM;
289298
// Polling futures in current iteration with the same context
290299
// is ok because we already received `Poll::Ready` from
@@ -300,6 +309,7 @@ where
300309
polling_with_two_wakers = false;
301310
}
302311
Poll::Pending => {
312+
stream_will_be_woken = true;
303313
if !polling_with_two_wakers {
304314
need_to_poll_next |= NEED_TO_POLL_STREAM;
305315
}
@@ -311,25 +321,23 @@ where
311321
}
312322

313323
if poll_state_value & NEED_TO_POLL_FUTURES != NONE {
314-
polled_futures = true;
315324
match if polling_with_two_wakers {
316325
let waker = self.create_poll_futures_waker(ctx);
317326
let mut ctx = Context::from_waker(&waker);
318327
self.as_mut().futures().poll_next(&mut ctx)
319328
} else {
320329
self.as_mut().futures().poll_next(ctx)
321330
} {
322-
Poll::Ready(Some(Some((item, stream)))) => {
323-
self.as_mut().futures().push(StreamFut {
324-
stream: Some(stream),
325-
});
331+
Poll::Ready(Some(Some((item, next_item_fut)))) => {
332+
self.as_mut().futures().push(next_item_fut);
326333
next_item = Some(item);
327334
need_to_poll_next |= NEED_TO_POLL_FUTURES;
328335
}
329336
Poll::Ready(Some(None)) => {
330337
need_to_poll_next |= NEED_TO_POLL_FUTURES;
331338
}
332339
Poll::Pending => {
340+
futures_will_be_woken = true;
333341
if !polling_with_two_wakers {
334342
need_to_poll_next |= NEED_TO_POLL_FUTURES;
335343
}
@@ -342,16 +350,12 @@ where
342350

343351
let poll_state_value = self.as_mut().poll_state().end_polling(need_to_poll_next);
344352

345-
if poll_state_value & NEED_TO_POLL != NONE {
346-
if !polling_with_two_wakers {
347-
if poll_state_value & NEED_TO_POLL_FUTURES != NONE && !polled_futures
348-
|| poll_state_value & NEED_TO_POLL_STREAM != NONE && !polled_stream
349-
{
350-
ctx.waker().wake_by_ref();
351-
}
352-
} else {
353-
ctx.waker().wake_by_ref();
354-
}
353+
if poll_state_value & NEED_TO_POLL != NONE
354+
&& (polling_with_two_wakers
355+
|| (poll_state_value & NEED_TO_POLL_FUTURES != NONE && !futures_will_be_woken
356+
|| poll_state_value & NEED_TO_POLL_STREAM != NONE && !stream_will_be_woken))
357+
{
358+
ctx.waker().wake_by_ref();
355359
}
356360

357361
if self.futures.is_empty() && self.is_stream_done || next_item.is_some() {

0 commit comments

Comments
 (0)