Skip to content

Commit e32c7b9

Browse files
authored
Fix StreamGroup.broadcast() close() not completing when streams close. (#876)
1 parent af37fe5 commit e32c7b9

File tree

4 files changed

+47
-2
lines changed

4 files changed

+47
-2
lines changed

pkgs/async/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 2.13.1-wip
2+
3+
- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.
4+
15
## 2.13.0
26

37
- Fix type check and cast in SubscriptionStream's cancelOnError wrapper

pkgs/async/lib/src/stream_group.dart

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,32 @@ class StreamGroup<T> implements Sink<Stream<T>> {
289289
if (_closed) return _controller.done;
290290

291291
_closed = true;
292-
if (_subscriptions.isEmpty) _controller.close();
292+
293+
if (_subscriptions.isEmpty) {
294+
_onIdleController?.close();
295+
_controller.close();
296+
return _controller.done;
297+
}
298+
299+
if (_controller.stream.isBroadcast) {
300+
// For a broadcast group that's closed, we must listen to streams with
301+
// null subscriptions to detect when they complete. This ensures the
302+
// group itself can close once all its streams have closed.
303+
List<Stream<T>>? streamsToRemove;
304+
305+
_subscriptions.updateAll((stream, subscription) {
306+
if (subscription != null) return subscription;
307+
308+
try {
309+
return _listenToStream(stream);
310+
} on Object {
311+
(streamsToRemove ??= []).add(stream);
312+
return null;
313+
}
314+
});
315+
316+
streamsToRemove?.forEach(_subscriptions.remove);
317+
}
293318

294319
return _controller.done;
295320
}

pkgs/async/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: async
2-
version: 2.13.0
2+
version: 2.13.1-wip
33
description: Utility functions and classes related to the 'dart:async' library.
44
repository: https://github.com/dart-lang/core/tree/main/pkgs/async
55
issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync

pkgs/async/test/stream_group_test.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,22 @@ void main() {
491491
controller.add('first');
492492
expect(streamGroup.close(), completes);
493493
});
494+
495+
test('completes close() when streams close without being removed',
496+
() async {
497+
var controller = StreamController.broadcast();
498+
var group = StreamGroup.broadcast();
499+
group.add(controller.stream);
500+
var closeCompleted = false;
501+
group.close().then((_) => closeCompleted = true);
502+
503+
await flushMicrotasks();
504+
expect(closeCompleted, isFalse);
505+
506+
await controller.close();
507+
await flushMicrotasks();
508+
expect(closeCompleted, isTrue);
509+
});
494510
});
495511

496512
group('regardless of type', () {

0 commit comments

Comments
 (0)