From cb3ad3324cc2549b2ef251bc3bf7df8605ce1148 Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Wed, 30 Apr 2025 15:28:06 -0700 Subject: [PATCH 1/5] [cronet_http/cupertino_http]: Fixes bugs were cancelling the response stream did not terminate the connection --- pkgs/cronet_http/CHANGELOG.md | 4 ++ .../integration_test/client_profile_test.dart | 49 +++++++++++++++++ pkgs/cronet_http/lib/src/cronet_client.dart | 16 +++++- pkgs/cronet_http/pubspec.yaml | 2 +- pkgs/cupertino_http/CHANGELOG.md | 4 ++ .../integration_test/client_profile_test.dart | 53 +++++++++++++++++++ .../lib/src/cupertino_client.dart | 19 +++++-- pkgs/cupertino_http/pubspec.yaml | 2 +- .../lib/src/response_body_streamed_test.dart | 34 +++++++++--- 9 files changed, 169 insertions(+), 14 deletions(-) diff --git a/pkgs/cronet_http/CHANGELOG.md b/pkgs/cronet_http/CHANGELOG.md index 253357951c..0dd75e4e1f 100644 --- a/pkgs/cronet_http/CHANGELOG.md +++ b/pkgs/cronet_http/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.3.4-wip + +* Cancel requests when the response stream is cancelled. + ## 1.3.3 * Throw `ClientException` if `CronetClient.send` runs out of Java heap while diff --git a/pkgs/cronet_http/example/integration_test/client_profile_test.dart b/pkgs/cronet_http/example/integration_test/client_profile_test.dart index 3e17327ce8..08da828099 100644 --- a/pkgs/cronet_http/example/integration_test/client_profile_test.dart +++ b/pkgs/cronet_http/example/integration_test/client_profile_test.dart @@ -214,6 +214,55 @@ void main() { }); }); + group('cancel streaming GET response', () { + late HttpServer successServer; + late Uri successServerUri; + late HttpClientRequestProfile profile; + late List receivedData; + + setUpAll(() async { + successServer = (await HttpServer.bind('localhost', 0)) + ..listen((request) async { + await request.drain(); + request.response.headers.set('Content-Type', 'text/plain'); + while (true) { + request.response.write('Hello World'); + await request.response.flush(); + await Future.delayed(const Duration(seconds: 0)); + } + }); + final cancelCompleter = Completer(); + successServerUri = Uri.http('localhost:${successServer.port}'); + final client = CronetClientWithProfile.defaultCronetEngine(); + final request = StreamedRequest('GET', successServerUri); + unawaited(request.sink.close()); + final response = await client.send(request); + + var i = 0; + late final StreamSubscription> s; + receivedData = []; + s = response.stream.listen((d) { + receivedData += d; + if (++i == 1000) { + s.cancel(); + cancelCompleter.complete(); + } + }); + await cancelCompleter.future; + profile = client.profile!; + }); + tearDownAll(() { + successServer.close(); + }); + + test('request attributes', () async { + expect(profile.requestData.contentLength, isNull); + expect(profile.requestData.startTime, isNotNull); + expect(profile.requestData.endTime, isNotNull); + expect(profile.responseData.bodyBytes, receivedData); + }); + }); + group('redirects', () { late HttpServer successServer; late Uri successServerUri; diff --git a/pkgs/cronet_http/lib/src/cronet_client.dart b/pkgs/cronet_http/lib/src/cronet_client.dart index 8341e1caa5..4154462c95 100644 --- a/pkgs/cronet_http/lib/src/cronet_client.dart +++ b/pkgs/cronet_http/lib/src/cronet_client.dart @@ -153,13 +153,23 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( StreamController>? responseStream; JByteBuffer? jByteBuffer; var numRedirects = 0; + var cancelled = false; // The order of callbacks generated by Cronet is documented here: // https://developer.android.com/guide/topics/connectivity/cronet/lifecycle return jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface.implement( jb.$UrlRequestCallbackProxy_UrlRequestCallbackInterface( onResponseStarted: (urlRequest, responseInfo) { - responseStream = StreamController(); + /// + responseStream = StreamController(onCancel: () { + // The user did `response.stream.cancel()`. We can just pretend that + // the response completed normally. + cancelled = true; + urlRequest.cancel(); + responseStream!.sink.close(); + jByteBuffer?.release(); + profile?.responseData.close(); + }); final responseHeaders = _cronetToClientHeaders(responseInfo.getAllHeaders()); int? contentLength; @@ -203,6 +213,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( urlRequest.read(jByteBuffer!); }, onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) { + if (cancelled) return; final responseHeaders = _cronetToClientHeaders(responseInfo.getAllHeaders()); @@ -247,6 +258,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( } }, onReadCompleted: (urlRequest, responseInfo, byteBuffer) { + if (cancelled) return; byteBuffer.flip(); final data = jByteBuffer!.asUint8List().sublist(0, byteBuffer.remaining); responseStream!.add(data); @@ -256,11 +268,13 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( urlRequest.read(byteBuffer); }, onSucceeded: (urlRequest, responseInfo) { + if (cancelled) return; responseStream!.sink.close(); jByteBuffer?.release(); profile?.responseData.close(); }, onFailed: (urlRequest, responseInfo, cronetException) { + if (cancelled) return; final error = ClientException( 'Cronet exception: ${cronetException.toString()}', request.url); if (responseStream == null) { diff --git a/pkgs/cronet_http/pubspec.yaml b/pkgs/cronet_http/pubspec.yaml index ab55257ae2..d712cc0e7b 100644 --- a/pkgs/cronet_http/pubspec.yaml +++ b/pkgs/cronet_http/pubspec.yaml @@ -1,5 +1,5 @@ name: cronet_http -version: 1.3.3 +version: 1.3.4-wip description: >- An Android Flutter plugin that provides access to the Cronet HTTP client. repository: https://github.com/dart-lang/http/tree/master/pkgs/cronet_http diff --git a/pkgs/cupertino_http/CHANGELOG.md b/pkgs/cupertino_http/CHANGELOG.md index 83ffb8b581..e21c2fd886 100644 --- a/pkgs/cupertino_http/CHANGELOG.md +++ b/pkgs/cupertino_http/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.1.2-wip + +* Cancel requests when the response stream is cancelled. + ## 2.1.1 * Support `package:web_socket` 1.0.0. diff --git a/pkgs/cupertino_http/example/integration_test/client_profile_test.dart b/pkgs/cupertino_http/example/integration_test/client_profile_test.dart index d823370618..47785d5bca 100644 --- a/pkgs/cupertino_http/example/integration_test/client_profile_test.dart +++ b/pkgs/cupertino_http/example/integration_test/client_profile_test.dart @@ -261,6 +261,59 @@ void main() { }); }); + group('cancel streaming GET response', () { + late HttpServer successServer; + late Uri successServerUri; + late HttpClientRequestProfile profile; + late List receivedData; + + setUpAll(() async { + successServer = (await HttpServer.bind('localhost', 0)) + ..listen((request) async { + await request.drain(); + request.response.headers.set('Content-Type', 'text/plain'); + while (true) { + request.response.write('Hello World'); + await request.response.flush(); + await Future.delayed(Duration(seconds: 0)); + } + }); + final cancelCompleter = Completer(); + successServerUri = Uri.http('localhost:${successServer.port}'); + final client = CupertinoClientWithProfile.defaultSessionConfiguration(); + final request = StreamedRequest('GET', successServerUri); + request.sink.close(); + final response = await client.send(request); + + var i = 0; + late final StreamSubscription> s; + receivedData = []; + s = response.stream.listen((d) { + receivedData += d; + if (++i == 1000) { + s.cancel(); + cancelCompleter.complete(); + } + }); + await cancelCompleter.future; + profile = client.profile!; + }); + tearDownAll(() { + successServer.close(); + }); + + test('request attributes', () async { + expect(profile.requestData.contentLength, isNull); + expect(profile.requestData.startTime, isNotNull); + expect(profile.requestData.endTime, isNotNull); + // Extra data could be received before the cancel event is dispatched + // by the url loading framework so check that + // `profile.responseData.bodyBytes` starts with `receivedData`. + expect(profile.responseData.bodyBytes.sublist(0, receivedData.length), + receivedData); + }); + }); + group('redirects', () { late HttpServer successServer; late Uri successServerUri; diff --git a/pkgs/cupertino_http/lib/src/cupertino_client.dart b/pkgs/cupertino_http/lib/src/cupertino_client.dart index 7618fe53bc..72db68525e 100644 --- a/pkgs/cupertino_http/lib/src/cupertino_client.dart +++ b/pkgs/cupertino_http/lib/src/cupertino_client.dart @@ -15,6 +15,8 @@ import 'cupertino_api.dart'; final _digitRegex = RegExp(r'^\d+$'); +const _nsurlErrorCancelled = -999; + /// This class can be removed when `package:http` v2 is released. class _StreamedResponseWithUrl extends StreamedResponse implements BaseResponseWithUrl { @@ -33,12 +35,12 @@ class _StreamedResponseWithUrl extends StreamedResponse class _TaskTracker { final responseCompleter = Completer(); final BaseRequest request; - final responseController = StreamController(); + final StreamController responseController; final HttpClientRequestProfile? profile; int numRedirects = 0; Uri? lastUrl; // The last URL redirected to. - _TaskTracker(this.request, this.profile); + _TaskTracker(this.request, this.responseController, this.profile); void close() { responseController.close(); @@ -167,7 +169,13 @@ class CupertinoClient extends BaseClient { static void _onComplete( URLSession session, URLSessionTask task, NSError? error) { final taskTracker = _tracker(task); - if (error != null) { + // The task will only be cancelled if the user calls + // `StreamedResponse.stream.cancel()`, which can only happen if the response + // has already been received. Therefore, it is safe to handle task + // cancellation errors as if the response completed normally. + if (error != null && + !(error.domain.toDartString() == 'NSURLErrorDomain' && + error.code == _nsurlErrorCancelled)) { final exception = ClientException( error.localizedDescription.toDartString(), taskTracker.request.url); if (taskTracker.profile != null && @@ -338,7 +346,10 @@ class CupertinoClient extends BaseClient { // This will preserve Apple default headers - is that what we want? request.headers.forEach(urlRequest.setValueForHttpHeaderField); final task = urlSession.dataTaskWithRequest(urlRequest); - final taskTracker = _TaskTracker(request, profile); + final subscription = StreamController(onCancel: () { + task.cancel(); + }); + final taskTracker = _TaskTracker(request, subscription, profile); _tasks[task] = taskTracker; task.resume(); diff --git a/pkgs/cupertino_http/pubspec.yaml b/pkgs/cupertino_http/pubspec.yaml index 2631aa840d..585b5ea7ac 100644 --- a/pkgs/cupertino_http/pubspec.yaml +++ b/pkgs/cupertino_http/pubspec.yaml @@ -1,5 +1,5 @@ name: cupertino_http -version: 2.1.1 +version: 2.1.2-wip description: >- A macOS/iOS Flutter plugin that provides access to the Foundation URL Loading System. diff --git a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart index f355d6c8de..c293de99d0 100644 --- a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart +++ b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:convert'; import 'package:async/async.dart'; @@ -21,16 +22,16 @@ import 'response_body_streamed_server_vm.dart' void testResponseBodyStreamed(Client client, {bool canStreamResponseBody = true}) async { group('streamed response body', () { - late final String host; - late final StreamChannel httpServerChannel; - late final StreamQueue httpServerQueue; + late String host; + late StreamChannel httpServerChannel; + late StreamQueue httpServerQueue; - setUpAll(() async { + setUp(() async { httpServerChannel = await startServer(); httpServerQueue = StreamQueue(httpServerChannel.stream); host = 'localhost:${await httpServerQueue.nextAsInt}'; }); - tearDownAll(() => httpServerChannel.sink.add(null)); + tearDown(() => httpServerChannel.sink.add(null)); test('large response streamed without content length', () async { // The server continuously streams data to the client until @@ -56,6 +57,25 @@ void testResponseBodyStreamed(Client client, expect(response.reasonPhrase, 'OK'); expect(response.request!.method, 'GET'); expect(response.statusCode, 200); - }, skip: canStreamResponseBody ? false : 'does not stream response bodies'); - }); + }); + + test('cancel streamed response', () async { + final request = Request('GET', Uri.http(host, '')); + final response = await client.send(request); + final cancelled = Completer(); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + late StreamSubscription subscription; + subscription = const LineSplitter() + .bind(const Utf8Decoder().bind(response.stream)) + .listen((s) async { + final lastReceived = int.parse(s.trim()); + if (lastReceived == 1000) { + subscription.cancel(); + cancelled.complete(); + } + }); + await cancelled.future; + }); + }, skip: canStreamResponseBody ? false : 'does not stream response bodies'); } From d335d55b07061ec914330d266d79153b7a68d950 Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Wed, 30 Apr 2025 15:43:27 -0700 Subject: [PATCH 2/5] Fix lints --- .../example/integration_test/client_profile_test.dart | 4 ++-- .../lib/src/response_body_streamed_test.dart | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkgs/cupertino_http/example/integration_test/client_profile_test.dart b/pkgs/cupertino_http/example/integration_test/client_profile_test.dart index 47785d5bca..aa74415881 100644 --- a/pkgs/cupertino_http/example/integration_test/client_profile_test.dart +++ b/pkgs/cupertino_http/example/integration_test/client_profile_test.dart @@ -275,14 +275,14 @@ void main() { while (true) { request.response.write('Hello World'); await request.response.flush(); - await Future.delayed(Duration(seconds: 0)); + await Future.delayed(const Duration(seconds: 0)); } }); final cancelCompleter = Completer(); successServerUri = Uri.http('localhost:${successServer.port}'); final client = CupertinoClientWithProfile.defaultSessionConfiguration(); final request = StreamedRequest('GET', successServerUri); - request.sink.close(); + unawaited(request.sink.close()); final response = await client.send(request); var i = 0; diff --git a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart index c293de99d0..8ae49d1f80 100644 --- a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart +++ b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart @@ -71,7 +71,7 @@ void testResponseBodyStreamed(Client client, .listen((s) async { final lastReceived = int.parse(s.trim()); if (lastReceived == 1000) { - subscription.cancel(); + unawaited(subscription.cancel()); cancelled.complete(); } }); From dd31d608101b5d3ff9e25eb5def7e1610cfc4efd Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Wed, 30 Apr 2025 16:00:06 -0700 Subject: [PATCH 3/5] Update cronet_client.dart --- pkgs/cronet_http/lib/src/cronet_client.dart | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkgs/cronet_http/lib/src/cronet_client.dart b/pkgs/cronet_http/lib/src/cronet_client.dart index 4154462c95..1fbc925d58 100644 --- a/pkgs/cronet_http/lib/src/cronet_client.dart +++ b/pkgs/cronet_http/lib/src/cronet_client.dart @@ -153,7 +153,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( StreamController>? responseStream; JByteBuffer? jByteBuffer; var numRedirects = 0; - var cancelled = false; + var done = false; // The order of callbacks generated by Cronet is documented here: // https://developer.android.com/guide/topics/connectivity/cronet/lifecycle @@ -164,7 +164,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( responseStream = StreamController(onCancel: () { // The user did `response.stream.cancel()`. We can just pretend that // the response completed normally. - cancelled = true; + done = true; urlRequest.cancel(); responseStream!.sink.close(); jByteBuffer?.release(); @@ -213,7 +213,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( urlRequest.read(jByteBuffer!); }, onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) { - if (cancelled) return; + if (done) return; final responseHeaders = _cronetToClientHeaders(responseInfo.getAllHeaders()); @@ -258,7 +258,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( } }, onReadCompleted: (urlRequest, responseInfo, byteBuffer) { - if (cancelled) return; + if (done) return; byteBuffer.flip(); final data = jByteBuffer!.asUint8List().sublist(0, byteBuffer.remaining); responseStream!.add(data); @@ -268,13 +268,15 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( urlRequest.read(byteBuffer); }, onSucceeded: (urlRequest, responseInfo) { - if (cancelled) return; + if (done) return; + done = true; responseStream!.sink.close(); jByteBuffer?.release(); profile?.responseData.close(); }, onFailed: (urlRequest, responseInfo, cronetException) { - if (cancelled) return; + if (done) return; + done = true; final error = ClientException( 'Cronet exception: ${cronetException.toString()}', request.url); if (responseStream == null) { From 1d9b0a5afc2a2baa37ec838a2e057237c956b58f Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Wed, 30 Apr 2025 16:09:09 -0700 Subject: [PATCH 4/5] Update cronet_client.dart --- pkgs/cronet_http/lib/src/cronet_client.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/pkgs/cronet_http/lib/src/cronet_client.dart b/pkgs/cronet_http/lib/src/cronet_client.dart index 1fbc925d58..ba15fd246c 100644 --- a/pkgs/cronet_http/lib/src/cronet_client.dart +++ b/pkgs/cronet_http/lib/src/cronet_client.dart @@ -164,6 +164,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( responseStream = StreamController(onCancel: () { // The user did `response.stream.cancel()`. We can just pretend that // the response completed normally. + if (done) return; done = true; urlRequest.cancel(); responseStream!.sink.close(); From be90b8099699642bb1630023d98e0484541b2cb0 Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Thu, 1 May 2025 09:46:35 -0700 Subject: [PATCH 5/5] Remove spurious comment --- pkgs/cronet_http/lib/src/cronet_client.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/pkgs/cronet_http/lib/src/cronet_client.dart b/pkgs/cronet_http/lib/src/cronet_client.dart index ba15fd246c..b3ccc7237e 100644 --- a/pkgs/cronet_http/lib/src/cronet_client.dart +++ b/pkgs/cronet_http/lib/src/cronet_client.dart @@ -160,7 +160,6 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks( return jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface.implement( jb.$UrlRequestCallbackProxy_UrlRequestCallbackInterface( onResponseStarted: (urlRequest, responseInfo) { - /// responseStream = StreamController(onCancel: () { // The user did `response.stream.cancel()`. We can just pretend that // the response completed normally.