Skip to content

Commit e408ccd

Browse files
authored
bugfix: Tracking down a deadlock in async-channel
There was a deadlock where notifications would not get delivered for listeners that were stuck in the fallback queue. There is an optimization where the notification count is checked to see if the notifications delivered by the user would make any impact on the listener chain; if it wouldn't, then it isn't done at all. For no_std, I've removed this optimization, as I doubt that it can be done consistently due to the nature of the fallback queue. During testing I also uncovered problems with the current implementation of the fallback queue itself. I've replaced it with concurrent-queue for now, as I doubt that a fully functioning queue wouldn't be trivial enough to justify inlining it into event-listener itself. Unfortunately it adds three additional dependencies to the tree of event-listener. With this change, the async-channel tests pass consistently. Signed-off-by: John Nunley <[email protected]>
1 parent 564b84b commit e408ccd

File tree

7 files changed

+239
-296
lines changed

7 files changed

+239
-296
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ exclude = ["/.*"]
1616

1717
[features]
1818
default = ["std"]
19-
std = ["parking"]
19+
std = ["concurrent-queue/std", "parking"]
2020
portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
2121

2222
[dependencies]
23+
concurrent-queue = { version = "2.2.0", default-features = false }
2324
parking = { version = "2.0.0", optional = true }
2425
pin-project-lite = "0.2.12"
2526
portable-atomic-util = { version = "0.1.2", default-features = false, optional = true, features = ["alloc"] }

src/lib.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ impl<T> Event<T> {
388388

389389
// Notify if there is at least one unnotified listener and the number of notified
390390
// listeners is less than `limit`.
391-
if inner.notified.load(Ordering::Acquire) < limit {
391+
if inner.needs_notification(limit) {
392392
return inner.notify(notify);
393393
}
394394
}
@@ -993,7 +993,7 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
993993
}
994994

995995
/// The state of a listener.
996-
#[derive(Debug, PartialEq)]
996+
#[derive(PartialEq)]
997997
enum State<T> {
998998
/// The listener was just created.
999999
Created,
@@ -1016,6 +1016,20 @@ enum State<T> {
10161016
NotifiedTaken,
10171017
}
10181018

1019+
impl<T> fmt::Debug for State<T> {
1020+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1021+
match self {
1022+
Self::Created => f.write_str("Created"),
1023+
Self::Notified { additional, .. } => f
1024+
.debug_struct("Notified")
1025+
.field("additional", additional)
1026+
.finish(),
1027+
Self::Task(_) => f.write_str("Task(_)"),
1028+
Self::NotifiedTaken => f.write_str("NotifiedTaken"),
1029+
}
1030+
}
1031+
}
1032+
10191033
impl<T> State<T> {
10201034
fn is_notified(&self) -> bool {
10211035
matches!(self, Self::Notified { .. } | Self::NotifiedTaken)

0 commit comments

Comments
 (0)