Skip to content

Commit fc4bcab

Browse files
mralephCommit Queue
authored and
Commit Queue
committed
[io] Propagate cancellation in _HttpOutgoing.addStream
If HttpResponse is being closed prematurally (e.g. because client decided to close its request) we need to propagate cancellation to the stream which is being piped into the response. Otherwise we will keep that stream forever hanging around and leak underlying resources. Fixes #55886 TEST=tests/standalone/io/regress_55886_test.dart [email protected] Change-Id: I7c294ed19cc7c350fd101b078bd650ce8a6526a2 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/369061 Reviewed-by: Martin Kustermann <[email protected]> Commit-Queue: Martin Kustermann <[email protected]>
1 parent a4d3ee0 commit fc4bcab

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed

sdk/lib/_http/http_impl.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1793,6 +1793,7 @@ class _HttpOutgoing implements StreamConsumer<List<int>> {
17931793
onError: controller.addError,
17941794
onDone: controller.close,
17951795
cancelOnError: true);
1796+
controller.onCancel = sub.cancel;
17961797
controller.onPause = sub.pause;
17971798
controller.onResume = sub.resume;
17981799
// Write headers now that we are listening to the stream.
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
// Regression test for https://dartbug.com/55886: [HttpResponse.addStream]
6+
// should cancel subscription to the stream which is being added if
7+
// [HttpResponse] itself is being closed.
8+
9+
import 'dart:async';
10+
import 'dart:convert';
11+
import 'dart:io';
12+
13+
import 'package:async_helper/async_helper.dart';
14+
import 'package:expect/expect.dart';
15+
16+
Future<void> pipeStream(Stream<List<int>> from, IOSink to) async {
17+
bool wasCancelled = false;
18+
19+
StreamSubscription<List<int>>? subscription;
20+
late final StreamController<List<int>> streamController;
21+
streamController = StreamController<List<int>>(
22+
onPause: () {
23+
subscription?.pause();
24+
},
25+
onResume: () {
26+
subscription?.resume();
27+
},
28+
onCancel: () {
29+
wasCancelled = true;
30+
subscription?.cancel();
31+
subscription = null;
32+
},
33+
onListen: () {
34+
subscription = from.listen(
35+
(data) {
36+
streamController.add(data);
37+
},
38+
onDone: () {
39+
streamController.close();
40+
subscription?.cancel();
41+
subscription = null;
42+
},
43+
onError: (e, st) {
44+
streamController.addError(e, st);
45+
subscription?.cancel();
46+
subscription = null;
47+
},
48+
);
49+
},
50+
);
51+
52+
await streamController.stream.pipe(to);
53+
Expect.isTrue(wasCancelled);
54+
}
55+
56+
Stream<List<int>> generateSlowly() async* {
57+
for (var i = 0; i < 100; i++) {
58+
yield utf8.encode("item $i");
59+
await Future.delayed(Duration(milliseconds: 100));
60+
}
61+
}
62+
63+
Future<void> serve(HttpServer server) async {
64+
await for (var rq in server) {
65+
rq.response.bufferOutput = false;
66+
await pipeStream(generateSlowly(), rq.response);
67+
break;
68+
}
69+
}
70+
71+
void main() async {
72+
asyncStart();
73+
74+
final server = await HttpServer.bind('localhost', 0);
75+
serve(server).then((_) => asyncEnd());
76+
77+
// Send request and then cancel response stream subscription after
78+
// the first chunk. This should cause server to close the connection
79+
// and cancel subscription to the stream which is being piped into
80+
// the response.
81+
final client = HttpClient();
82+
final rq = await client.get('localhost', server.port, '/');
83+
final rs = await rq.close();
84+
late StreamSubscription sub;
85+
sub = rs.map(utf8.decode).listen((msg) {
86+
sub.cancel();
87+
});
88+
}

0 commit comments

Comments
 (0)