Skip to content

Redefine Stream/Sink/AsyncRead/AsyncWrite/etc on top of Future #1365

@Matthias247

Description

@Matthias247

I think in the async/await world it might be preferable to redefine the traits and types like the ones mentioned in the title on top of Futures, and e.g. move from poll_something() signatures to methods that return Futures.

E.g.

trait Stream<T> {
    type Item: Future<Item=T>;
    fn read(&mut self) -> Self::Item;
    // or something along the following, whatever works best
    fn read(&mut self) -> impl Future<Item = T>
}

The important reason for this, that it should be possible to implement those types on top of Futures as primitives, which isn't possible today.

As a motivating example I'm thinking about a MPMC channel, which can be built around some async primitives as shown below:

struct ChannelImpl<T> {
    state: AsyncMutex<Queue<T>>;
    has_space: AsyncManualResetEvent;
    has_element: AsyncManualResetEvent;
}

impl ChannelImpl<T> {
    async fn write(&mut self, item: T) {
        loop {
            await!(self.has_space.wait());
            let mut queue = await!(self.state.lock());
            if queue.free_space() == 0 {
                continue;
            }
            queue.push(item);
            break;
       }
       self.has_element.set();
    }

    async fn read(&mut self) -> T {
        let mut result: Option<T> = None;
        loop {
            await!(self.has_element.wait());
            let mut queue = await!(self.state.lock());
            if queue.len() == 0 {
                continue;
            }
            result = Some(queue.pop());
            break;
       }
       self.has_space.set();
       result.unwrap()
    }
}

I'm not yet sure if the current state of async functions and traits allow all this (please educate me!), but I think this should be one of the goals that async programming in Rust should enable.

This kind of implementation is however currently not wrappable in a Stream or Sink implementation.
We can't stuff this kind of code inside a poll_xyz method of a Stream/etc, since it requires temporary Futures that all carry their own poll state to be instantiated in the task. Those Futures must maintain their state between calls to poll(). E.g. polling an asynchronous mutex the first time creates a registration in the waiting list, and dropping it cancels the registration. If we would try to use an asynchronous mutex inside a poll_next method, we must always need to drop it before poll_next() returns, which would cancel the registration and never allow use to retrieve the mutex.

I think the more general observation might be, that Futures allow for persisting some additional temporary state per operation, and provide additional lifecycle hooks per operation (Future gets polled for the first time, and Future gets dropped) compared to the current Stream definitions.

Another side effect is that currently Streams/etc must all support pinning. In Streams that return Futures that wouldn't be the case, since the Futures get pinned. They might indirectly get pinned because the respective Futures might reference the Stream through a reference and a certain lifetime, but that relationship is already directly checked through Rusts normal means, and e.g. doesn't put special requirements regarding pinning on the implementation of the type. And often we would Streams being moveable after they have been partly consumed (e.g. a Tcp Socket), which is not allowed if they are pinned. Of course some of them might be Unpin, but this kind of change would allow moves again for all Streams.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions