Skip to content

Commit 5704b0c

Browse files
authored
[cronet_http/cupertino_http]: Fixes bugs where cancelling StreamedResponse.stream did not sever the connection (#1760)
1 parent 63c477b commit 5704b0c

File tree

9 files changed

+171
-14
lines changed

9 files changed

+171
-14
lines changed

pkgs/cronet_http/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.3.4-wip
2+
3+
* Cancel requests when the response stream is cancelled.
4+
15
## 1.3.3
26

37
* Throw `ClientException` if `CronetClient.send` runs out of Java heap while

pkgs/cronet_http/example/integration_test/client_profile_test.dart

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,55 @@ void main() {
214214
});
215215
});
216216

217+
group('cancel streaming GET response', () {
218+
late HttpServer successServer;
219+
late Uri successServerUri;
220+
late HttpClientRequestProfile profile;
221+
late List<int> receivedData;
222+
223+
setUpAll(() async {
224+
successServer = (await HttpServer.bind('localhost', 0))
225+
..listen((request) async {
226+
await request.drain<void>();
227+
request.response.headers.set('Content-Type', 'text/plain');
228+
while (true) {
229+
request.response.write('Hello World');
230+
await request.response.flush();
231+
await Future<void>.delayed(const Duration(seconds: 0));
232+
}
233+
});
234+
final cancelCompleter = Completer<void>();
235+
successServerUri = Uri.http('localhost:${successServer.port}');
236+
final client = CronetClientWithProfile.defaultCronetEngine();
237+
final request = StreamedRequest('GET', successServerUri);
238+
unawaited(request.sink.close());
239+
final response = await client.send(request);
240+
241+
var i = 0;
242+
late final StreamSubscription<List<int>> s;
243+
receivedData = [];
244+
s = response.stream.listen((d) {
245+
receivedData += d;
246+
if (++i == 1000) {
247+
s.cancel();
248+
cancelCompleter.complete();
249+
}
250+
});
251+
await cancelCompleter.future;
252+
profile = client.profile!;
253+
});
254+
tearDownAll(() {
255+
successServer.close();
256+
});
257+
258+
test('request attributes', () async {
259+
expect(profile.requestData.contentLength, isNull);
260+
expect(profile.requestData.startTime, isNotNull);
261+
expect(profile.requestData.endTime, isNotNull);
262+
expect(profile.responseData.bodyBytes, receivedData);
263+
});
264+
});
265+
217266
group('redirects', () {
218267
late HttpServer successServer;
219268
late Uri successServerUri;

pkgs/cronet_http/lib/src/cronet_client.dart

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,23 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
153153
StreamController<List<int>>? responseStream;
154154
JByteBuffer? jByteBuffer;
155155
var numRedirects = 0;
156+
var done = false;
156157

157158
// The order of callbacks generated by Cronet is documented here:
158159
// https://developer.android.com/guide/topics/connectivity/cronet/lifecycle
159160
return jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface.implement(
160161
jb.$UrlRequestCallbackProxy_UrlRequestCallbackInterface(
161162
onResponseStarted: (urlRequest, responseInfo) {
162-
responseStream = StreamController();
163+
responseStream = StreamController(onCancel: () {
164+
// The user did `response.stream.cancel()`. We can just pretend that
165+
// the response completed normally.
166+
if (done) return;
167+
done = true;
168+
urlRequest.cancel();
169+
responseStream!.sink.close();
170+
jByteBuffer?.release();
171+
profile?.responseData.close();
172+
});
163173
final responseHeaders =
164174
_cronetToClientHeaders(responseInfo.getAllHeaders());
165175
int? contentLength;
@@ -203,6 +213,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
203213
urlRequest.read(jByteBuffer!);
204214
},
205215
onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) {
216+
if (done) return;
206217
final responseHeaders =
207218
_cronetToClientHeaders(responseInfo.getAllHeaders());
208219

@@ -247,6 +258,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
247258
}
248259
},
249260
onReadCompleted: (urlRequest, responseInfo, byteBuffer) {
261+
if (done) return;
250262
byteBuffer.flip();
251263
final data = jByteBuffer!.asUint8List().sublist(0, byteBuffer.remaining);
252264
responseStream!.add(data);
@@ -256,11 +268,15 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
256268
urlRequest.read(byteBuffer);
257269
},
258270
onSucceeded: (urlRequest, responseInfo) {
271+
if (done) return;
272+
done = true;
259273
responseStream!.sink.close();
260274
jByteBuffer?.release();
261275
profile?.responseData.close();
262276
},
263277
onFailed: (urlRequest, responseInfo, cronetException) {
278+
if (done) return;
279+
done = true;
264280
final error = ClientException(
265281
'Cronet exception: ${cronetException.toString()}', request.url);
266282
if (responseStream == null) {

pkgs/cronet_http/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: cronet_http
2-
version: 1.3.3
2+
version: 1.3.4-wip
33
description: >-
44
An Android Flutter plugin that provides access to the Cronet HTTP client.
55
repository: https://github.com/dart-lang/http/tree/master/pkgs/cronet_http

pkgs/cupertino_http/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 2.1.2-wip
2+
3+
* Cancel requests when the response stream is cancelled.
4+
15
## 2.1.1
26

37
* Support `package:web_socket` 1.0.0.

pkgs/cupertino_http/example/integration_test/client_profile_test.dart

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,59 @@ void main() {
261261
});
262262
});
263263

264+
group('cancel streaming GET response', () {
265+
late HttpServer successServer;
266+
late Uri successServerUri;
267+
late HttpClientRequestProfile profile;
268+
late List<int> receivedData;
269+
270+
setUpAll(() async {
271+
successServer = (await HttpServer.bind('localhost', 0))
272+
..listen((request) async {
273+
await request.drain<void>();
274+
request.response.headers.set('Content-Type', 'text/plain');
275+
while (true) {
276+
request.response.write('Hello World');
277+
await request.response.flush();
278+
await Future<void>.delayed(const Duration(seconds: 0));
279+
}
280+
});
281+
final cancelCompleter = Completer<void>();
282+
successServerUri = Uri.http('localhost:${successServer.port}');
283+
final client = CupertinoClientWithProfile.defaultSessionConfiguration();
284+
final request = StreamedRequest('GET', successServerUri);
285+
unawaited(request.sink.close());
286+
final response = await client.send(request);
287+
288+
var i = 0;
289+
late final StreamSubscription<List<int>> s;
290+
receivedData = [];
291+
s = response.stream.listen((d) {
292+
receivedData += d;
293+
if (++i == 1000) {
294+
s.cancel();
295+
cancelCompleter.complete();
296+
}
297+
});
298+
await cancelCompleter.future;
299+
profile = client.profile!;
300+
});
301+
tearDownAll(() {
302+
successServer.close();
303+
});
304+
305+
test('request attributes', () async {
306+
expect(profile.requestData.contentLength, isNull);
307+
expect(profile.requestData.startTime, isNotNull);
308+
expect(profile.requestData.endTime, isNotNull);
309+
// Extra data could be received before the cancel event is dispatched
310+
// by the url loading framework so check that
311+
// `profile.responseData.bodyBytes` starts with `receivedData`.
312+
expect(profile.responseData.bodyBytes.sublist(0, receivedData.length),
313+
receivedData);
314+
});
315+
});
316+
264317
group('redirects', () {
265318
late HttpServer successServer;
266319
late Uri successServerUri;

pkgs/cupertino_http/lib/src/cupertino_client.dart

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import 'cupertino_api.dart';
1515

1616
final _digitRegex = RegExp(r'^\d+$');
1717

18+
const _nsurlErrorCancelled = -999;
19+
1820
/// This class can be removed when `package:http` v2 is released.
1921
class _StreamedResponseWithUrl extends StreamedResponse
2022
implements BaseResponseWithUrl {
@@ -33,12 +35,12 @@ class _StreamedResponseWithUrl extends StreamedResponse
3335
class _TaskTracker {
3436
final responseCompleter = Completer<URLResponse>();
3537
final BaseRequest request;
36-
final responseController = StreamController<Uint8List>();
38+
final StreamController<Uint8List> responseController;
3739
final HttpClientRequestProfile? profile;
3840
int numRedirects = 0;
3941
Uri? lastUrl; // The last URL redirected to.
4042

41-
_TaskTracker(this.request, this.profile);
43+
_TaskTracker(this.request, this.responseController, this.profile);
4244

4345
void close() {
4446
responseController.close();
@@ -167,7 +169,13 @@ class CupertinoClient extends BaseClient {
167169
static void _onComplete(
168170
URLSession session, URLSessionTask task, NSError? error) {
169171
final taskTracker = _tracker(task);
170-
if (error != null) {
172+
// The task will only be cancelled if the user calls
173+
// `StreamedResponse.stream.cancel()`, which can only happen if the response
174+
// has already been received. Therefore, it is safe to handle task
175+
// cancellation errors as if the response completed normally.
176+
if (error != null &&
177+
!(error.domain.toDartString() == 'NSURLErrorDomain' &&
178+
error.code == _nsurlErrorCancelled)) {
171179
final exception = ClientException(
172180
error.localizedDescription.toDartString(), taskTracker.request.url);
173181
if (taskTracker.profile != null &&
@@ -338,7 +346,10 @@ class CupertinoClient extends BaseClient {
338346
// This will preserve Apple default headers - is that what we want?
339347
request.headers.forEach(urlRequest.setValueForHttpHeaderField);
340348
final task = urlSession.dataTaskWithRequest(urlRequest);
341-
final taskTracker = _TaskTracker(request, profile);
349+
final subscription = StreamController<Uint8List>(onCancel: () {
350+
task.cancel();
351+
});
352+
final taskTracker = _TaskTracker(request, subscription, profile);
342353
_tasks[task] = taskTracker;
343354
task.resume();
344355

pkgs/cupertino_http/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: cupertino_http
2-
version: 2.1.1
2+
version: 2.1.2-wip
33
description: >-
44
A macOS/iOS Flutter plugin that provides access to the Foundation URL
55
Loading System.

pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// for details. All rights reserved. Use of this source code is governed by a
33
// BSD-style license that can be found in the LICENSE file.
44

5+
import 'dart:async';
56
import 'dart:convert';
67

78
import 'package:async/async.dart';
@@ -21,16 +22,16 @@ import 'response_body_streamed_server_vm.dart'
2122
void testResponseBodyStreamed(Client client,
2223
{bool canStreamResponseBody = true}) async {
2324
group('streamed response body', () {
24-
late final String host;
25-
late final StreamChannel<Object?> httpServerChannel;
26-
late final StreamQueue<Object?> httpServerQueue;
25+
late String host;
26+
late StreamChannel<Object?> httpServerChannel;
27+
late StreamQueue<Object?> httpServerQueue;
2728

28-
setUpAll(() async {
29+
setUp(() async {
2930
httpServerChannel = await startServer();
3031
httpServerQueue = StreamQueue(httpServerChannel.stream);
3132
host = 'localhost:${await httpServerQueue.nextAsInt}';
3233
});
33-
tearDownAll(() => httpServerChannel.sink.add(null));
34+
tearDown(() => httpServerChannel.sink.add(null));
3435

3536
test('large response streamed without content length', () async {
3637
// The server continuously streams data to the client until
@@ -56,6 +57,25 @@ void testResponseBodyStreamed(Client client,
5657
expect(response.reasonPhrase, 'OK');
5758
expect(response.request!.method, 'GET');
5859
expect(response.statusCode, 200);
59-
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
60-
});
60+
});
61+
62+
test('cancel streamed response', () async {
63+
final request = Request('GET', Uri.http(host, ''));
64+
final response = await client.send(request);
65+
final cancelled = Completer<void>();
66+
expect(response.reasonPhrase, 'OK');
67+
expect(response.statusCode, 200);
68+
late StreamSubscription<String> subscription;
69+
subscription = const LineSplitter()
70+
.bind(const Utf8Decoder().bind(response.stream))
71+
.listen((s) async {
72+
final lastReceived = int.parse(s.trim());
73+
if (lastReceived == 1000) {
74+
unawaited(subscription.cancel());
75+
cancelled.complete();
76+
}
77+
});
78+
await cancelled.future;
79+
});
80+
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
6181
}

0 commit comments

Comments
 (0)