Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 114 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,37 +526,10 @@ impl<T> Receiver<T> {
/// assert_eq!(r.recv().await, Err(RecvError));
/// # });
/// ```
pub async fn recv(&self) -> Result<T, RecvError> {
let mut listener = None;

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// If the capacity is larger than 1, notify another blocked receive operation.
// There is no need to notify stream operations because all of them get
// notified every time a message is sent into the channel.
match self.channel.queue.capacity() {
Some(1) => {}
Some(_) | None => self.channel.recv_ops.notify(1),
}
return Ok(msg);
}
Err(TryRecvError::Closed) => return Err(RecvError),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match listener.take() {
None => {
// Start listening and then try receiving again.
listener = Some(self.channel.recv_ops.listen());
}
Some(l) => {
// Wait for a notification.
l.await;
}
}
pub fn recv(&self) -> Recv<'_, T> {
Recv {
receiver: self,
listener: None,
}
}

Expand Down Expand Up @@ -934,3 +907,113 @@ impl fmt::Display for TryRecvError {
}
}
}

/// A future returned by [`Sender::send()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}

impl<'a, T> Unpin for Send<'a, T> {}

impl<'a, T> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);

loop {
let msg = this.msg.take().unwrap();
// Attempt to send a message.
match this.sender.try_send(msg) {
Ok(()) => {
// If the capacity is larger than 1, notify another blocked send operation.
match this.sender.channel.queue.capacity() {
Some(1) => {}
Some(_) | None => this.sender.channel.send_ops.notify(1),
}
return Poll::Ready(Ok(()));
}
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => this.msg = Some(m),
}

// Sending failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
this.listener = Some(this.sender.channel.send_ops.listen());
}
Some(l) => {
// Wait for a notification.
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
this.listener = None;
continue;
}

Poll::Pending => return Poll::Pending,
}
}
}
}
}
}

/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
receiver: &'a Receiver<T>,
listener: Option<EventListener>,
}

impl<'a, T> Unpin for Recv<'a, T> {}

impl<'a, T> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);

loop {
// Attempt to receive a message.
match this.receiver.try_recv() {
Ok(msg) => {
// If the capacity is larger than 1, notify another blocked receive operation.
// There is no need to notify stream operations because all of them get
// notified every time a message is sent into the channel.
match this.receiver.channel.queue.capacity() {
Some(1) => {}
Some(_) | None => this.receiver.channel.recv_ops.notify(1),
}
return Poll::Ready(Ok(msg));
}
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
this.listener = Some(this.receiver.channel.recv_ops.listen());
}
Some(l) => {
// Wait for a notification.
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
this.listener = None;
continue;
}

Poll::Pending => return Poll::Pending,
}
}
}
}
}
}