diff --git a/pkgs/cronet_http/CHANGELOG.md b/pkgs/cronet_http/CHANGELOG.md index 253357951c..0dd75e4e1f 100644 --- a/pkgs/cronet_http/CHANGELOG.md +++ b/pkgs/cronet_http/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.3.4-wip + +* Cancel requests when the response stream is cancelled. + ## 1.3.3 * Throw `ClientException` if `CronetClient.send` runs out of Java heap while diff --git a/pkgs/cronet_http/example/integration_test/client_profile_test.dart b/pkgs/cronet_http/example/integration_test/client_profile_test.dart index 3e17327ce8..08da828099 100644 --- a/pkgs/cronet_http/example/integration_test/client_profile_test.dart +++ b/pkgs/cronet_http/example/integration_test/client_profile_test.dart @@ -214,6 +214,55 @@ void main() { }); }); + group('cancel streaming GET response', () { + late HttpServer successServer; + late Uri successServerUri; + late HttpClientRequestProfile profile; + late List receivedData; + + setUpAll(() async { + successServer = (await HttpServer.bind('localhost', 0)) + ..listen((request) async { + await request.drain(); + request.response.headers.set('Content-Type', 'text/plain'); + while (true) { + request.response.write('Hello World'); + await request.response.flush(); + await Future.delayed(const Duration(seconds: 0)); + } + }); + final cancelCompleter = Completer(); + successServerUri = Uri.http('localhost:${successServer.port}'); + final client = CronetClientWithProfile.defaultCronetEngine(); + final request = StreamedRequest('GET', successServerUri); + unawaited(request.sink.close()); + final response = await client.send(request); + + var i = 0; + late final StreamSubscription> s; + receivedData = []; + s = response.stream.listen((d) { + receivedData += d; + if (++i == 1000) { + s.cancel(); + cancelCompleter.complete(); + } + }); + await cancelCompleter.future; + profile = client.profile!; + }); + tearDownAll(() { + successServer.close(); + }); + + test('request attributes', () async { + expect(profile.requestData.contentLength, isNull); + expect(profile.requestData.startTime, isNotNull); + expect(profile.requestData.endTime, isNotNull); + expect(profile.responseData.bodyBytes, receivedData); + }); + }); + group('redirects', () { late HttpServer successServer; late Uri successServerUri; diff --git a/pkgs/cronet_http/lib/src/cronet_client.dart b/pkgs/cronet_http/lib/src/cronet_client.dart index 8341e1caa5..b3ccc7237e 100644 --- a/pkgs/cronet_http/lib/src/cronet_client.dart +++ b/pkgs/cronet_http/lib/src/cronet_client.dart @@ -153,13 +153,23 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( StreamController>? responseStream; JByteBuffer? jByteBuffer; var numRedirects = 0; + var done = false; // The order of callbacks generated by Cronet is documented here: // https://developer.android.com/guide/topics/connectivity/cronet/lifecycle return jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface.implement( jb.$UrlRequestCallbackProxy_UrlRequestCallbackInterface( onResponseStarted: (urlRequest, responseInfo) { - responseStream = StreamController(); + responseStream = StreamController(onCancel: () { + // The user did `response.stream.cancel()`. We can just pretend that + // the response completed normally. + if (done) return; + done = true; + urlRequest.cancel(); + responseStream!.sink.close(); + jByteBuffer?.release(); + profile?.responseData.close(); + }); final responseHeaders = _cronetToClientHeaders(responseInfo.getAllHeaders()); int? contentLength; @@ -203,6 +213,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( urlRequest.read(jByteBuffer!); }, onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) { + if (done) return; final responseHeaders = _cronetToClientHeaders(responseInfo.getAllHeaders()); @@ -247,6 +258,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( } }, onReadCompleted: (urlRequest, responseInfo, byteBuffer) { + if (done) return; byteBuffer.flip(); final data = jByteBuffer!.asUint8List().sublist(0, byteBuffer.remaining); responseStream!.add(data); @@ -256,11 +268,15 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( urlRequest.read(byteBuffer); }, onSucceeded: (urlRequest, responseInfo) { + if (done) return; + done = true; responseStream!.sink.close(); jByteBuffer?.release(); profile?.responseData.close(); }, onFailed: (urlRequest, responseInfo, cronetException) { + if (done) return; + done = true; final error = ClientException( 'Cronet exception: ${cronetException.toString()}', request.url); if (responseStream == null) { diff --git a/pkgs/cronet_http/pubspec.yaml b/pkgs/cronet_http/pubspec.yaml index ab55257ae2..d712cc0e7b 100644 --- a/pkgs/cronet_http/pubspec.yaml +++ b/pkgs/cronet_http/pubspec.yaml @@ -1,5 +1,5 @@ name: cronet_http -version: 1.3.3 +version: 1.3.4-wip description: >- An Android Flutter plugin that provides access to the Cronet HTTP client. repository: https://github.com/dart-lang/http/tree/master/pkgs/cronet_http diff --git a/pkgs/cupertino_http/CHANGELOG.md b/pkgs/cupertino_http/CHANGELOG.md index 83ffb8b581..e21c2fd886 100644 --- a/pkgs/cupertino_http/CHANGELOG.md +++ b/pkgs/cupertino_http/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.1.2-wip + +* Cancel requests when the response stream is cancelled. + ## 2.1.1 * Support `package:web_socket` 1.0.0. diff --git a/pkgs/cupertino_http/example/integration_test/client_profile_test.dart b/pkgs/cupertino_http/example/integration_test/client_profile_test.dart index d823370618..aa74415881 100644 --- a/pkgs/cupertino_http/example/integration_test/client_profile_test.dart +++ b/pkgs/cupertino_http/example/integration_test/client_profile_test.dart @@ -261,6 +261,59 @@ void main() { }); }); + group('cancel streaming GET response', () { + late HttpServer successServer; + late Uri successServerUri; + late HttpClientRequestProfile profile; + late List receivedData; + + setUpAll(() async { + successServer = (await HttpServer.bind('localhost', 0)) + ..listen((request) async { + await request.drain(); + request.response.headers.set('Content-Type', 'text/plain'); + while (true) { + request.response.write('Hello World'); + await request.response.flush(); + await Future.delayed(const Duration(seconds: 0)); + } + }); + final cancelCompleter = Completer(); + successServerUri = Uri.http('localhost:${successServer.port}'); + final client = CupertinoClientWithProfile.defaultSessionConfiguration(); + final request = StreamedRequest('GET', successServerUri); + unawaited(request.sink.close()); + final response = await client.send(request); + + var i = 0; + late final StreamSubscription> s; + receivedData = []; + s = response.stream.listen((d) { + receivedData += d; + if (++i == 1000) { + s.cancel(); + cancelCompleter.complete(); + } + }); + await cancelCompleter.future; + profile = client.profile!; + }); + tearDownAll(() { + successServer.close(); + }); + + test('request attributes', () async { + expect(profile.requestData.contentLength, isNull); + expect(profile.requestData.startTime, isNotNull); + expect(profile.requestData.endTime, isNotNull); + // Extra data could be received before the cancel event is dispatched + // by the url loading framework so check that + // `profile.responseData.bodyBytes` starts with `receivedData`. + expect(profile.responseData.bodyBytes.sublist(0, receivedData.length), + receivedData); + }); + }); + group('redirects', () { late HttpServer successServer; late Uri successServerUri; diff --git a/pkgs/cupertino_http/lib/src/cupertino_client.dart b/pkgs/cupertino_http/lib/src/cupertino_client.dart index 7618fe53bc..72db68525e 100644 --- a/pkgs/cupertino_http/lib/src/cupertino_client.dart +++ b/pkgs/cupertino_http/lib/src/cupertino_client.dart @@ -15,6 +15,8 @@ import 'cupertino_api.dart'; final _digitRegex = RegExp(r'^\d+$'); +const _nsurlErrorCancelled = -999; + /// This class can be removed when `package:http` v2 is released. class _StreamedResponseWithUrl extends StreamedResponse implements BaseResponseWithUrl { @@ -33,12 +35,12 @@ class _StreamedResponseWithUrl extends StreamedResponse class _TaskTracker { final responseCompleter = Completer(); final BaseRequest request; - final responseController = StreamController(); + final StreamController responseController; final HttpClientRequestProfile? profile; int numRedirects = 0; Uri? lastUrl; // The last URL redirected to. - _TaskTracker(this.request, this.profile); + _TaskTracker(this.request, this.responseController, this.profile); void close() { responseController.close(); @@ -167,7 +169,13 @@ class CupertinoClient extends BaseClient { static void _onComplete( URLSession session, URLSessionTask task, NSError? error) { final taskTracker = _tracker(task); - if (error != null) { + // The task will only be cancelled if the user calls + // `StreamedResponse.stream.cancel()`, which can only happen if the response + // has already been received. Therefore, it is safe to handle task + // cancellation errors as if the response completed normally. + if (error != null && + !(error.domain.toDartString() == 'NSURLErrorDomain' && + error.code == _nsurlErrorCancelled)) { final exception = ClientException( error.localizedDescription.toDartString(), taskTracker.request.url); if (taskTracker.profile != null && @@ -338,7 +346,10 @@ class CupertinoClient extends BaseClient { // This will preserve Apple default headers - is that what we want? request.headers.forEach(urlRequest.setValueForHttpHeaderField); final task = urlSession.dataTaskWithRequest(urlRequest); - final taskTracker = _TaskTracker(request, profile); + final subscription = StreamController(onCancel: () { + task.cancel(); + }); + final taskTracker = _TaskTracker(request, subscription, profile); _tasks[task] = taskTracker; task.resume(); diff --git a/pkgs/cupertino_http/pubspec.yaml b/pkgs/cupertino_http/pubspec.yaml index 2631aa840d..585b5ea7ac 100644 --- a/pkgs/cupertino_http/pubspec.yaml +++ b/pkgs/cupertino_http/pubspec.yaml @@ -1,5 +1,5 @@ name: cupertino_http -version: 2.1.1 +version: 2.1.2-wip description: >- A macOS/iOS Flutter plugin that provides access to the Foundation URL Loading System. diff --git a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart index f355d6c8de..8ae49d1f80 100644 --- a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart +++ b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:convert'; import 'package:async/async.dart'; @@ -21,16 +22,16 @@ import 'response_body_streamed_server_vm.dart' void testResponseBodyStreamed(Client client, {bool canStreamResponseBody = true}) async { group('streamed response body', () { - late final String host; - late final StreamChannel httpServerChannel; - late final StreamQueue httpServerQueue; + late String host; + late StreamChannel httpServerChannel; + late StreamQueue httpServerQueue; - setUpAll(() async { + setUp(() async { httpServerChannel = await startServer(); httpServerQueue = StreamQueue(httpServerChannel.stream); host = 'localhost:${await httpServerQueue.nextAsInt}'; }); - tearDownAll(() => httpServerChannel.sink.add(null)); + tearDown(() => httpServerChannel.sink.add(null)); test('large response streamed without content length', () async { // The server continuously streams data to the client until @@ -56,6 +57,25 @@ void testResponseBodyStreamed(Client client, expect(response.reasonPhrase, 'OK'); expect(response.request!.method, 'GET'); expect(response.statusCode, 200); - }, skip: canStreamResponseBody ? false : 'does not stream response bodies'); - }); + }); + + test('cancel streamed response', () async { + final request = Request('GET', Uri.http(host, '')); + final response = await client.send(request); + final cancelled = Completer(); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + late StreamSubscription subscription; + subscription = const LineSplitter() + .bind(const Utf8Decoder().bind(response.stream)) + .listen((s) async { + final lastReceived = int.parse(s.trim()); + if (lastReceived == 1000) { + unawaited(subscription.cancel()); + cancelled.complete(); + } + }); + await cancelled.future; + }); + }, skip: canStreamResponseBody ? false : 'does not stream response bodies'); }