Skip to content

Commit 8421f91

Browse files
author
Gleb Pomykalov
committed
Introduce ready_chunks adaptor
1 parent 2f8943d commit 8421f91

File tree

5 files changed

+221
-1
lines changed

5 files changed

+221
-1
lines changed

futures-util/src/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ pub use self::stream::CatchUnwind;
2424
#[cfg(feature = "alloc")]
2525
pub use self::stream::Chunks;
2626

27+
#[cfg(feature = "alloc")]
28+
pub use self::stream::ReadyChunks;
29+
2730
#[cfg(feature = "sink")]
2831
pub use self::stream::Forward;
2932

futures-util/src/stream/stream/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ mod chunks;
128128
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
129129
pub use self::chunks::Chunks;
130130

131+
#[cfg(feature = "alloc")]
132+
mod ready_chunks;
133+
#[cfg(feature = "alloc")]
134+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
135+
pub use self::ready_chunks::ReadyChunks;
136+
131137
mod scan;
132138
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
133139
pub use self::scan::Scan;
@@ -1186,6 +1192,32 @@ pub trait StreamExt: Stream {
11861192
Chunks::new(self, capacity)
11871193
}
11881194

1195+
/// An adaptor for chunking up ready items of the stream inside a vector.
1196+
///
1197+
/// This combinator will attempt to pull ready items from this stream and
1198+
/// buffer them into a local vector. At most `capacity` items will get
1199+
/// buffered before they're yielded from the returned stream. If underlying
1200+
/// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1201+
/// be immediately returned.
1202+
///
1203+
/// If the underlying stream ended and only a partial vector was created,
1204+
/// it'll be returned. Additionally if an error happens from the underlying
1205+
/// stream then the currently buffered items will be yielded.
1206+
///
1207+
/// This method is only available when the `std` or `alloc` feature of this
1208+
/// library is activated, and it is activated by default.
1209+
///
1210+
/// # Panics
1211+
///
1212+
/// This method will panic if `capacity` is zero.
1213+
#[cfg(feature = "alloc")]
1214+
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1215+
where
1216+
Self: Sized,
1217+
{
1218+
ReadyChunks::new(self, capacity)
1219+
}
1220+
11891221
/// A future that completes after the given stream has been fully processed
11901222
/// into the sink and the sink has been flushed and closed.
11911223
///
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
use crate::stream::Fuse;
2+
use futures_core::stream::{Stream, FusedStream};
3+
use futures_core::task::{Context, Poll};
4+
#[cfg(feature = "sink")]
5+
use futures_sink::Sink;
6+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
7+
use core::mem;
8+
use core::pin::Pin;
9+
use alloc::vec::Vec;
10+
11+
/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method.
12+
#[derive(Debug)]
13+
#[must_use = "streams do nothing unless polled"]
14+
pub struct ReadyChunks<St: Stream> {
15+
stream: Fuse<St>,
16+
items: Vec<St::Item>,
17+
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
18+
}
19+
20+
impl<St: Unpin + Stream> Unpin for ReadyChunks<St> {}
21+
22+
impl<St: Stream> ReadyChunks<St> where St: Stream {
23+
unsafe_unpinned!(items: Vec<St::Item>);
24+
unsafe_pinned!(stream: Fuse<St>);
25+
26+
pub(super) fn new(stream: St, capacity: usize) -> ReadyChunks<St> {
27+
assert!(capacity > 0);
28+
29+
ReadyChunks {
30+
stream: super::Fuse::new(stream),
31+
items: Vec::with_capacity(capacity),
32+
cap: capacity,
33+
}
34+
}
35+
36+
fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> {
37+
let cap = self.cap;
38+
mem::replace(self.as_mut().items(), Vec::with_capacity(cap))
39+
}
40+
41+
/// Acquires a reference to the underlying stream that this combinator is
42+
/// pulling from.
43+
pub fn get_ref(&self) -> &St {
44+
self.stream.get_ref()
45+
}
46+
47+
/// Acquires a mutable reference to the underlying stream that this
48+
/// combinator is pulling from.
49+
///
50+
/// Note that care must be taken to avoid tampering with the state of the
51+
/// stream which may otherwise confuse this combinator.
52+
pub fn get_mut(&mut self) -> &mut St {
53+
self.stream.get_mut()
54+
}
55+
56+
/// Acquires a pinned mutable reference to the underlying stream that this
57+
/// combinator is pulling from.
58+
///
59+
/// Note that care must be taken to avoid tampering with the state of the
60+
/// stream which may otherwise confuse this combinator.
61+
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
62+
self.stream().get_pin_mut()
63+
}
64+
65+
/// Consumes this combinator, returning the underlying stream.
66+
///
67+
/// Note that this may discard intermediate state of this combinator, so
68+
/// care should be taken to avoid losing resources when this is called.
69+
pub fn into_inner(self) -> St {
70+
self.stream.into_inner()
71+
}
72+
}
73+
74+
impl<St: Stream> Stream for ReadyChunks<St> {
75+
type Item = Vec<St::Item>;
76+
77+
fn poll_next(
78+
mut self: Pin<&mut Self>,
79+
cx: &mut Context<'_>,
80+
) -> Poll<Option<Self::Item>> {
81+
loop {
82+
match self.as_mut().stream().poll_next(cx) {
83+
// Flush all collected data if underlying stream doesn't contain
84+
// more ready values
85+
Poll::Pending => {
86+
return if self.items.is_empty() {
87+
Poll::Pending
88+
} else {
89+
Poll::Ready(Some(self.as_mut().take()))
90+
}
91+
}
92+
93+
// Push the ready item into the buffer and check whether it is full.
94+
// If so, replace our buffer with a new and empty one and return
95+
// the full one.
96+
Poll::Ready(Some(item)) => {
97+
self.as_mut().items().push(item);
98+
if self.items.len() >= self.cap {
99+
return Poll::Ready(Some(self.as_mut().take()))
100+
}
101+
}
102+
103+
// Since the underlying stream ran out of values, return what we
104+
// have buffered, if we have anything.
105+
Poll::Ready(None) => {
106+
let last = if self.items.is_empty() {
107+
None
108+
} else {
109+
let full_buf = mem::replace(self.as_mut().items(), Vec::new());
110+
Some(full_buf)
111+
};
112+
113+
return Poll::Ready(last);
114+
}
115+
}
116+
}
117+
}
118+
119+
fn size_hint(&self) -> (usize, Option<usize>) {
120+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
121+
let (lower, upper) = self.stream.size_hint();
122+
let lower = lower.saturating_add(chunk_len);
123+
let upper = match upper {
124+
Some(x) => x.checked_add(chunk_len),
125+
None => None,
126+
};
127+
(lower, upper)
128+
}
129+
}
130+
131+
impl<St: FusedStream> FusedStream for ReadyChunks<St> {
132+
fn is_terminated(&self) -> bool {
133+
self.stream.is_terminated() && self.items.is_empty()
134+
}
135+
}
136+
137+
// Forwarding impl of Sink from the underlying stream
138+
#[cfg(feature = "sink")]
139+
impl<S, Item> Sink<Item> for ReadyChunks<S>
140+
where
141+
S: Stream + Sink<Item>,
142+
{
143+
type Error = S::Error;
144+
145+
delegate_sink!(stream, Item);
146+
}

futures/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ pub mod stream {
458458
#[cfg(feature = "alloc")]
459459
pub use futures_util::stream::{
460460
// For StreamExt:
461-
Chunks,
461+
Chunks, ReadyChunks,
462462
};
463463

464464
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]

futures/tests/stream.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,42 @@ fn take_until() {
124124
assert_eq!(stream.next().await, None);
125125
});
126126
}
127+
128+
#[test]
129+
#[should_panic]
130+
fn ready_chunks_panic_on_cap_zero() {
131+
use futures::channel::mpsc;
132+
use futures::stream::StreamExt;
133+
134+
let (_, rx1) = mpsc::channel::<()>(1);
135+
136+
let _ = rx1.ready_chunks(0);
137+
}
138+
139+
#[cfg(feature = "executor")] // executor::
140+
#[test]
141+
fn ready_chunks() {
142+
use futures::channel::mpsc;
143+
use futures::stream::StreamExt;
144+
use futures::sink::SinkExt;
145+
use futures::FutureExt;
146+
use futures_test::task::noop_context;
147+
148+
let (mut tx, rx1) = mpsc::channel::<i32>(16);
149+
150+
let mut s = rx1.ready_chunks(2);
151+
152+
let mut cx = noop_context();
153+
assert!(s.next().poll_unpin(&mut cx).is_pending());
154+
155+
futures::executor::block_on(async {
156+
tx.send(1).await.unwrap();
157+
158+
assert_eq!(s.next().await.unwrap(), vec![1]);
159+
tx.send(2).await.unwrap();
160+
tx.send(3).await.unwrap();
161+
tx.send(4).await.unwrap();
162+
assert_eq!(s.next().await.unwrap(), vec![2,3]);
163+
assert_eq!(s.next().await.unwrap(), vec![4]);
164+
});
165+
}

0 commit comments

Comments
 (0)