diff --git a/src/lib.rs b/src/lib.rs index ba1b439..b1b4254 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -526,37 +526,10 @@ impl Receiver { /// assert_eq!(r.recv().await, Err(RecvError)); /// # }); /// ``` - pub async fn recv(&self) -> Result { - let mut listener = None; - - loop { - // Attempt to receive a message. - match self.try_recv() { - Ok(msg) => { - // If the capacity is larger than 1, notify another blocked receive operation. - // There is no need to notify stream operations because all of them get - // notified every time a message is sent into the channel. - match self.channel.queue.capacity() { - Some(1) => {} - Some(_) | None => self.channel.recv_ops.notify(1), - } - return Ok(msg); - } - Err(TryRecvError::Closed) => return Err(RecvError), - Err(TryRecvError::Empty) => {} - } - - // Receiving failed - now start listening for notifications or wait for one. - match listener.take() { - None => { - // Start listening and then try receiving again. - listener = Some(self.channel.recv_ops.listen()); - } - Some(l) => { - // Wait for a notification. - l.await; - } - } + pub fn recv(&self) -> Recv<'_, T> { + Recv { + receiver: self, + listener: None, } } @@ -934,3 +907,113 @@ impl fmt::Display for TryRecvError { } } } + +/// A future returned by [`Sender::send()`]. +#[derive(Debug)] +#[must_use = "futures do nothing unless .awaited"] +pub struct Send<'a, T> { + sender: &'a Sender, + listener: Option, + msg: Option, +} + +impl<'a, T> Unpin for Send<'a, T> {} + +impl<'a, T> Future for Send<'a, T> { + type Output = Result<(), SendError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = Pin::new(self); + + loop { + let msg = this.msg.take().unwrap(); + // Attempt to send a message. + match this.sender.try_send(msg) { + Ok(()) => { + // If the capacity is larger than 1, notify another blocked send operation. + match this.sender.channel.queue.capacity() { + Some(1) => {} + Some(_) | None => this.sender.channel.send_ops.notify(1), + } + return Poll::Ready(Ok(())); + } + Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), + Err(TrySendError::Full(m)) => this.msg = Some(m), + } + + // Sending failed - now start listening for notifications or wait for one. + match &mut this.listener { + None => { + // Start listening and then try receiving again. + this.listener = Some(this.sender.channel.send_ops.listen()); + } + Some(l) => { + // Wait for a notification. + match Pin::new(l).poll(cx) { + Poll::Ready(_) => { + this.listener = None; + continue; + } + + Poll::Pending => return Poll::Pending, + } + } + } + } + } +} + +/// A future returned by [`Receiver::recv()`]. +#[derive(Debug)] +#[must_use = "futures do nothing unless .awaited"] +pub struct Recv<'a, T> { + receiver: &'a Receiver, + listener: Option, +} + +impl<'a, T> Unpin for Recv<'a, T> {} + +impl<'a, T> Future for Recv<'a, T> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = Pin::new(self); + + loop { + // Attempt to receive a message. + match this.receiver.try_recv() { + Ok(msg) => { + // If the capacity is larger than 1, notify another blocked receive operation. + // There is no need to notify stream operations because all of them get + // notified every time a message is sent into the channel. + match this.receiver.channel.queue.capacity() { + Some(1) => {} + Some(_) | None => this.receiver.channel.recv_ops.notify(1), + } + return Poll::Ready(Ok(msg)); + } + Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), + Err(TryRecvError::Empty) => {} + } + + // Receiving failed - now start listening for notifications or wait for one. + match &mut this.listener { + None => { + // Start listening and then try receiving again. + this.listener = Some(this.receiver.channel.recv_ops.listen()); + } + Some(l) => { + // Wait for a notification. + match Pin::new(l).poll(cx) { + Poll::Ready(_) => { + this.listener = None; + continue; + } + + Poll::Pending => return Poll::Pending, + } + } + } + } + } +}