From 4d18cbf7f48f7ea93d1b0fe491217c40d21f0be7 Mon Sep 17 00:00:00 2001 From: Agufa Tech Date: Wed, 21 May 2025 13:54:55 -0400 Subject: [PATCH 1/5] feat(gql_socket_link): BREAKING CHANGE multiplex websocket connection --- links/gql_websocket_link/CHANGELOG.md | 4 + links/gql_websocket_link/README.md | 3 + .../lib/src/graphql_transport_ws.dart | 29 ++- links/gql_websocket_link/pubspec.yaml | 4 +- .../test/gql_websocket_link_test.dart | 221 +++++++++--------- 5 files changed, 147 insertions(+), 114 deletions(-) diff --git a/links/gql_websocket_link/CHANGELOG.md b/links/gql_websocket_link/CHANGELOG.md index 7f5958952..1b6e93617 100644 --- a/links/gql_websocket_link/CHANGELOG.md +++ b/links/gql_websocket_link/CHANGELOG.md @@ -1,3 +1,7 @@ +## 3.0.0 + +- BREAKING: graphql-transport-ws: streaming operations and single result operations are now multiplexed on a single connection, user has to manually resubscribe subscription when connection is broken + ## 2.0.1 - support uuid 4.0.0 diff --git a/links/gql_websocket_link/README.md b/links/gql_websocket_link/README.md index 0b0d476a0..efb2885c0 100644 --- a/links/gql_websocket_link/README.md +++ b/links/gql_websocket_link/README.md @@ -84,7 +84,10 @@ The `WebSocketLink` class has some known issues, see: - https://github.com/gql-dart/gql/issues/430 +#### TransportWebSocketLink (`graphql-transport-ws`) +- Streaming operations and single result operations are now multiplexed on a single connection. This means you have to manually resubscribe on subscriptions. + On IOS and Android, when you app is in background (lock screen, etc.), all open sockets will be closed to save battery, and your app is freezed, no code from your app will run. Thus there is no way to reconnect a websocket connection when it is broken because no code from your app will run, you have to manually resubscribe on app resume (use `WidgetsBindingObserver` or related packages). You can use old TransportWebSocketLink client to resubscribe, the underlying socket is newly acquried when you resubscribe. ## Features and bugs diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index dab9253c8..9562f4241 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -693,6 +693,7 @@ class _ConnectionState { // subscriptions might complete while waiting for retry if (locks == 0) { + // print("retry connecting = null"); connecting = null; return denied( LikeCloseEvent(code: 1000, reason: "All Subscriptions Gone"), @@ -736,6 +737,7 @@ class _ConnectionState { (Object event) => emitter.emit(TransportWsEvent.closed(event)); errorOrClosed((errOrEvent) { options.log?.call("errorOrClosed $errOrEvent"); + // print("errorOrClosed $errOrEvent connecting = null"); connecting = null; isOpen = false; connectionAckTimeout?.cancel(); @@ -904,7 +906,6 @@ class _ConnectionState { )); } catch (err) { // stop reading messages as soon as reading breaks once - print("_messageSubs.cancel()"); // ignore: unawaited_futures _messageSubs.cancel(); emitter.emit(TransportWsEvent.error(err)); @@ -944,7 +945,6 @@ class _ConnectionState { Future<_Connection> connect() async { connecting ??= _startConnecting(); final _connection = await connecting!; - options.log?.call("_connection"); final socket = _connection.socket; @@ -1017,6 +1017,7 @@ class _Client extends TransportWsClient { ) { final id = options.generateID(); options.log?.call("subscribe $id"); + // print("subscribe step 1 generate id $id ${state.hashCode}"); bool done = false; bool errored = false; @@ -1031,6 +1032,7 @@ class _Client extends TransportWsClient { for (;;) { try { final _c = await state.connect(); + // print("subscribe step 2 connect $id ${state.hashCode} ${_c.socket.hashCode}"); final socket = _c.socket; final release = _c.release; final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose; @@ -1041,9 +1043,12 @@ class _Client extends TransportWsClient { // print(payload.context.toString()); // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); // if done while waiting for connect, release the connection lock right away + final serializedRequest = + options.serializer.serializeRequest(payload); final _subscribeMsg = await options.graphQLSocketMessageEncoder( - SubscribeMessage(id, options.serializer.serializeRequest(payload)), + SubscribeMessage(id, serializedRequest), ); + // print("subscribe step 3 operation $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); // print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); if (done) { if (!release.isCompleted) release.complete(); @@ -1057,6 +1062,7 @@ class _Client extends TransportWsClient { completer.complete(); } state.nextOrErrorMsgWaitMap.remove(id); + // print("subscribe step 5 receive NextMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); } else if (message is ErrorMessage) { errored = true; done = true; @@ -1067,9 +1073,11 @@ class _Client extends TransportWsClient { } state.nextOrErrorMsgWaitMap.remove(id); releaser(); + // print("subscribe step 6 receive ErrorMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); } else if (message is CompleteMessage) { done = true; releaser(); // release completes the sink + // print("subscribe step 7 receive CompleteMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); } }); @@ -1087,19 +1095,29 @@ class _Client extends TransportWsClient { state.locks--; done = true; if (!release.isCompleted) release.complete(); + // print("subscribe step 8 releaser $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); }; + // print("subscribe step 4 waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); // either the releaser will be called, connection completed and // the promise resolved or the socket closed and the promise rejected. // whatever happens though, we want to stop listening for messages - await waitForReleaseOrThrowOnClose.whenComplete(unlisten); + final likeCloseEvent = await Future.any( + [waitForReleaseOrThrowOnClose, waitForLikeCloseEvent]) + .whenComplete(() async { + unlisten(); + // print( + // "subscribe step 9 complete waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }); // workground for dart linux bug: complete error not being caught by try..catch block - final likeCloseEvent = await waitForLikeCloseEvent; if (likeCloseEvent != null) { + // print("subscribe step 10 error likeCloseEvent $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); throw likeCloseEvent; } + // print("subscribe step 11 success $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + return; // completed, shouldnt try again } catch (errOrCloseEvent) { if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return; @@ -1319,6 +1337,7 @@ class TransportWebSocketLink extends Link { // TODO pass more data? ); } + return response; }, ); diff --git a/links/gql_websocket_link/pubspec.yaml b/links/gql_websocket_link/pubspec.yaml index bd5ea2de5..9b11629be 100644 --- a/links/gql_websocket_link/pubspec.yaml +++ b/links/gql_websocket_link/pubspec.yaml @@ -1,7 +1,8 @@ name: gql_websocket_link -version: 2.0.1 +version: 2.1.0 description: GQL Websocket Link repository: https://github.com/gql-dart/gql +publish_to: none environment: sdk: '>=2.15.0 <4.0.0' dependencies: @@ -12,6 +13,7 @@ dependencies: rxdart: '>=0.26.0 <=0.28.0' uuid: '>=3.0.0 <5.0.0' web_socket_channel: ^3.0.3 + # path: ../../../http/pkgs/web_socket_channel dev_dependencies: gql_pedantic: ^1.0.2 mockito: ^5.0.0 diff --git a/links/gql_websocket_link/test/gql_websocket_link_test.dart b/links/gql_websocket_link/test/gql_websocket_link_test.dart index 5869ae0d9..d2b319f2d 100644 --- a/links/gql_websocket_link/test/gql_websocket_link_test.dart +++ b/links/gql_websocket_link/test/gql_websocket_link_test.dart @@ -335,7 +335,7 @@ void _testLinks( .map((dynamic s) => json.decode(s as String)) .listen( (dynamic message) { - print("message $message"); + // print("message $message"); if (message["type"] == "connection_init") { channel.sink.add( json.encode( @@ -343,7 +343,7 @@ void _testLinks( ), ); } else if (message["type"] == startMessageType) { - print("enter subscribe"); + // print("enter subscribe"); channel.sink.add( json.encode( { @@ -1553,114 +1553,119 @@ void _testLinks( ), ); - server1 = await HttpServer.bind("localhost", 0); - server1.transform(WebSocketTransformer()).listen( - expectAsync1( - (webSocket) async { - final channel = IOWebSocketChannel(webSocket); - var messageCount = 0; - channel.stream.listen( - expectAsync1( - (dynamic message) { - final map = json.decode(message as String) - as Map?; - if (messageCount == 0) { - expect(map!["type"], MessageTypes.connectionInit); - channel.sink.add( - json.encode( - ConnectionAck(), - ), - ); - } else if (messageCount == 1) { - expect(map!["id"], isA()); - expect(map["type"], startMessageType); - subId = map["id"] as String?; - // disconnect - webSocket.close(websocket_status.goingAway); - } - messageCount++; - }, - count: 2, - reason: - "server1 should only receive 2 messages, init and start", - id: "server1:websocket_messages", - ), - ); - }, - count: 1, - reason: "server 1 should only be connected once", - id: "server1:websocket_connections", - ), - ); + if (isApolloSubProtocol) { + server1 = await HttpServer.bind("localhost", 0); + server1.transform(WebSocketTransformer()).listen( + expectAsync1( + (webSocket) async { + final channel = IOWebSocketChannel(webSocket); + var messageCount = 0; + channel.stream.listen( + expectAsync1( + (dynamic message) { + final map = json.decode(message as String) + as Map?; + if (messageCount == 0) { + expect(map!["type"], MessageTypes.connectionInit); + channel.sink.add( + json.encode( + ConnectionAck(), + ), + ); + } else if (messageCount == 1) { + expect(map!["id"], isA()); + expect(map["type"], startMessageType); + subId = map["id"] as String?; + // disconnect + webSocket.close(websocket_status.goingAway); + } + messageCount++; + }, + count: 2, + reason: + "server1 should only receive 2 messages, init and start", + id: "server1:websocket_messages", + ), + ); + }, + count: 1, + reason: "server 1 should only be connected once", + id: "server1:websocket_connections", + ), + ); - server2 = await HttpServer.bind("localhost", 0); - server2.transform(WebSocketTransformer()).listen( - expectAsync1( - (webSocket) async { - final channel = IOWebSocketChannel(webSocket); - var messageCount = 0; - channel.stream.listen( - expectAsync1( - (dynamic message) { - final map = json.decode(message as String) - as Map?; - if (messageCount == 0) { - expect(map!["type"], MessageTypes.connectionInit); - channel.sink.add( - json.encode( - ConnectionAck(), - ), - ); - } else if (messageCount == 1) { - expect(map!["id"], isA()); - expect(map["type"], startMessageType); - expect(map["id"], subId); - completer.complete(); - } else { - expect(map!["id"], isA()); - expect( - map["type"], - isApolloSubProtocol - ? MessageTypes.stop - : MessageTypes.complete); - expect(map["id"], subId); - - stopReceivedCompleter.complete(); - } - messageCount++; - }, - count: 3, - id: "server2:websocket_messages", - reason: - "server 2 should receive init, subscription and complete/stop msg", - ), - ); - }, - count: 1, - reason: "server 2 should only receive one connection", - ), - ); + server2 = await HttpServer.bind("localhost", 0); + server2.transform(WebSocketTransformer()).listen( + expectAsync1( + (webSocket) async { + final channel = IOWebSocketChannel(webSocket); + var messageCount = 0; + channel.stream.listen( + expectAsync1( + (dynamic message) { + final map = json.decode(message as String) + as Map?; + if (messageCount == 0) { + expect(map!["type"], MessageTypes.connectionInit); + channel.sink.add( + json.encode( + ConnectionAck(), + ), + ); + } else if (messageCount == 1) { + expect(map!["id"], isA()); + expect(map["type"], startMessageType); + expect(map["id"], subId); + completer.complete(); + } else { + expect(map!["id"], isA()); + expect( + map["type"], + isApolloSubProtocol + ? MessageTypes.stop + : MessageTypes.complete); + expect(map["id"], subId); + + stopReceivedCompleter.complete(); + } + messageCount++; + }, + count: 3, + id: "server2:websocket_messages", + reason: + "server 2 should receive init, subscription and complete/stop msg", + ), + ); + }, + count: 1, + reason: "server 2 should only receive one connection", + ), + ); - link = makeLink( - null, - channelGenerator: () async { - if (connectToServer == 1) { - connectToServer++; - final webSocket = - await WebSocket.connect("ws://localhost:${server1.port}"); - return IOWebSocketChannel(webSocket); - } else { - final webSocket = - await WebSocket.connect("ws://localhost:${server2.port}"); - return IOWebSocketChannel(webSocket); - } - }, - reconnectInterval: Duration(milliseconds: 500), - ); - final sub = link.request(request).listen(print, onError: print); - await completer.future; - await sub.cancel(); - await stopReceivedCompleter.future; + link = makeLink( + null, + channelGenerator: () async { + if (connectToServer == 1) { + connectToServer++; + final webSocket = + await WebSocket.connect("ws://localhost:${server1.port}"); + return IOWebSocketChannel(webSocket); + } else { + final webSocket = + await WebSocket.connect("ws://localhost:${server2.port}"); + return IOWebSocketChannel(webSocket); + } + }, + reconnectInterval: Duration(milliseconds: 500), + ); + final sub = link.request(request).listen(print, onError: print); + await completer.future; + await sub.cancel(); + await stopReceivedCompleter.future; + } else { + // only test for transport ws sub-protocol + return; + } }, ); From 59046d4a2b2365d478ffc5312d93e7ee48023749 Mon Sep 17 00:00:00 2001 From: Agufa Tech Date: Wed, 21 May 2025 13:57:53 -0400 Subject: [PATCH 2/5] chore(gql_websocket_link): version change in pubspec.yaml --- links/gql_websocket_link/pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/links/gql_websocket_link/pubspec.yaml b/links/gql_websocket_link/pubspec.yaml index 9b11629be..6a85764ab 100644 --- a/links/gql_websocket_link/pubspec.yaml +++ b/links/gql_websocket_link/pubspec.yaml @@ -1,5 +1,5 @@ name: gql_websocket_link -version: 2.1.0 +version: 3.0.0 description: GQL Websocket Link repository: https://github.com/gql-dart/gql publish_to: none From 303fb0502658cd4bd089827cef21ca47900372f7 Mon Sep 17 00:00:00 2001 From: Agufa Tech Date: Wed, 21 May 2025 14:06:58 -0400 Subject: [PATCH 3/5] doc(gql_websocket_link): remove line break in README --- links/gql_websocket_link/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/links/gql_websocket_link/README.md b/links/gql_websocket_link/README.md index efb2885c0..1da73f91c 100644 --- a/links/gql_websocket_link/README.md +++ b/links/gql_websocket_link/README.md @@ -86,8 +86,7 @@ The `WebSocketLink` class has some known issues, see: #### TransportWebSocketLink (`graphql-transport-ws`) -- Streaming operations and single result operations are now multiplexed on a single connection. This means you have to manually resubscribe on subscriptions. - On IOS and Android, when you app is in background (lock screen, etc.), all open sockets will be closed to save battery, and your app is freezed, no code from your app will run. Thus there is no way to reconnect a websocket connection when it is broken because no code from your app will run, you have to manually resubscribe on app resume (use `WidgetsBindingObserver` or related packages). You can use old TransportWebSocketLink client to resubscribe, the underlying socket is newly acquried when you resubscribe. +- Streaming operations and single result operations are now multiplexed on a single connection. This means you have to manually resubscribe on subscriptions. On IOS and Android, when you app is in background (lock screen, etc.), all open sockets will be closed to save battery, and your app is freezed, no code from your app will run. Thus there is no way to reconnect a websocket connection when it is broken because no code from your app will run, you have to manually resubscribe on app resume (use `WidgetsBindingObserver` or related packages). You can use old TransportWebSocketLink client to resubscribe, the underlying socket is newly acquried when you resubscribe. ## Features and bugs From 1875c48d407516a62735bbf77b0f829d59bd5df8 Mon Sep 17 00:00:00 2001 From: Agufa Tech Date: Wed, 21 May 2025 14:11:00 -0400 Subject: [PATCH 4/5] doc(gql_websocket_link): typo in README --- links/gql_websocket_link/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/links/gql_websocket_link/README.md b/links/gql_websocket_link/README.md index 1da73f91c..7e75bc995 100644 --- a/links/gql_websocket_link/README.md +++ b/links/gql_websocket_link/README.md @@ -86,7 +86,7 @@ The `WebSocketLink` class has some known issues, see: #### TransportWebSocketLink (`graphql-transport-ws`) -- Streaming operations and single result operations are now multiplexed on a single connection. This means you have to manually resubscribe on subscriptions. On IOS and Android, when you app is in background (lock screen, etc.), all open sockets will be closed to save battery, and your app is freezed, no code from your app will run. Thus there is no way to reconnect a websocket connection when it is broken because no code from your app will run, you have to manually resubscribe on app resume (use `WidgetsBindingObserver` or related packages). You can use old TransportWebSocketLink client to resubscribe, the underlying socket is newly acquried when you resubscribe. +- Streaming operations and single result operations are now multiplexed on a single connection. This means you have to manually resubscribe on subscriptions. On IOS and Android, when you app is in background (lock screen, etc.), all open sockets will be closed to save battery, and your app is freezed, no code from your app will run. Thus there is no way to reconnect a websocket connection when it is broken because no code from your app will run, you have to manually resubscribe on app resume (use `WidgetsBindingObserver` or related packages). You can use old TransportWebSocketLink client to resubscribe, the underlying socket is newly acquired when you resubscribe. ## Features and bugs From 9ed2ef886f9d848b346808484794c5f5a8ec2767 Mon Sep 17 00:00:00 2001 From: Agufa Tech Date: Fri, 23 May 2025 02:29:20 -0400 Subject: [PATCH 5/5] fix(gql_websocket_link): single-result-operation shouldn not be in for loop --- .../lib/src/graphql_transport_ws.dart | 258 +++++++++++++++++- 1 file changed, 251 insertions(+), 7 deletions(-) diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index 9562f4241..532fc3d66 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -1029,7 +1029,121 @@ class _Client extends TransportWsClient { (() async { state.locks++; - for (;;) { + + final serializedRequest = options.serializer.serializeRequest(payload); + + final query = serializedRequest["query"] as String; + + if (query.startsWith("subscription")) { + for (;;) { + try { + final _c = await state.connect(); + // print("subscribe step 2 connect $id ${state.hashCode} ${_c.socket.hashCode}"); + final socket = _c.socket; + final release = _c.release; + final waitForReleaseOrThrowOnClose = + _c.waitForReleaseOrThrowOnClose; + final waitForLikeCloseEvent = _c.waitForLikeCloseEvent; + // print("isolate debug name: ${Isolate.current.debugName}"); + // print(payload.operation.toString()); + // print(payload.variables.toString()); + // print(payload.context.toString()); + // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + // if done while waiting for connect, release the connection lock right away + final _subscribeMsg = await options.graphQLSocketMessageEncoder( + SubscribeMessage(id, serializedRequest), + ); + // print("subscribe step 3 operation $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + if (done) { + if (!release.isCompleted) release.complete(); + } + + final unlisten = emitter.onMessage(id, (message) { + if (message is NextMessage) { + sink.add(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + // print("subscribe step 5 receive NextMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is ErrorMessage) { + errored = true; + done = true; + sink.addError(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + releaser(); + // print("subscribe step 6 receive ErrorMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is CompleteMessage) { + done = true; + releaser(); // release completes the sink + // print("subscribe step 7 receive CompleteMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } + }); + + socket.sink.add(_subscribeMsg); + + state.nextOrErrorMsgWaitMap[id] = Completer(); + + releaser = () async { + final _completeMsg = await options + .graphQLSocketMessageEncoder(CompleteMessage(id)); + if (!done && state.isOpen) { + // if not completed already and socket is open, send complete message to server on release + socket.sink.add(_completeMsg); + } + state.locks--; + done = true; + if (!release.isCompleted) release.complete(); + // print("subscribe step 8 releaser $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }; + + // print("subscribe step 4 waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // either the releaser will be called, connection completed and + // the promise resolved or the socket closed and the promise rejected. + // whatever happens though, we want to stop listening for messages + final likeCloseEvent = await Future.any( + [waitForReleaseOrThrowOnClose, waitForLikeCloseEvent]) + .whenComplete(() async { + unlisten(); + // print( + // "subscribe step 9 complete waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }); + + // workground for dart linux bug: complete error not being caught by try..catch block + if (likeCloseEvent != null) { + // print("subscribe step 10 error likeCloseEvent $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + throw likeCloseEvent; + } + + // print("subscribe step 11 success $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + + return; // completed, shouldnt try again + } catch (errOrCloseEvent) { + if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return; + } + // final finish = await processMessage( + // id: id, + // serializedRequest: serializedRequest, + // done: done, + // errored: errored, + // releaser: releaser, + // setDone: (value) => done = value, + // setErrored: (value) => errored = value, + // isSubscription: true, + // sink: sink, + // ); + + // if (finish) { + // return; + // } + } + } else { try { final _c = await state.connect(); // print("subscribe step 2 connect $id ${state.hashCode} ${_c.socket.hashCode}"); @@ -1043,8 +1157,6 @@ class _Client extends TransportWsClient { // print(payload.context.toString()); // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); // if done while waiting for connect, release the connection lock right away - final serializedRequest = - options.serializer.serializeRequest(payload); final _subscribeMsg = await options.graphQLSocketMessageEncoder( SubscribeMessage(id, serializedRequest), ); @@ -1056,12 +1168,14 @@ class _Client extends TransportWsClient { final unlisten = emitter.onMessage(id, (message) { if (message is NextMessage) { + done = true; sink.add(message.payload); final completer = state.nextOrErrorMsgWaitMap[id]; if (completer != null && !completer.isCompleted) { completer.complete(); } state.nextOrErrorMsgWaitMap.remove(id); + releaser(); // print("subscribe step 5 receive NextMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); } else if (message is ErrorMessage) { errored = true; @@ -1074,10 +1188,6 @@ class _Client extends TransportWsClient { state.nextOrErrorMsgWaitMap.remove(id); releaser(); // print("subscribe step 6 receive ErrorMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); - } else if (message is CompleteMessage) { - done = true; - releaser(); // release completes the sink - // print("subscribe step 7 receive CompleteMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); } }); @@ -1122,6 +1232,17 @@ class _Client extends TransportWsClient { } catch (errOrCloseEvent) { if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return; } + // await processMessage( + // id: id, + // serializedRequest: serializedRequest, + // done: done, + // errored: errored, + // releaser: releaser, + // setDone: (value) => done = value, + // setErrored: (value) => errored = value, + // isSubscription: false, + // sink: sink, + // ); } })() .then((_) { @@ -1138,6 +1259,129 @@ class _Client extends TransportWsClient { }; } + // TODO: there seems to be a bug either in dart or web_socket_channel that's causing this common function failing test + Future processMessage({ + required String id, + required Map serializedRequest, + required bool done, + required bool errored, + required Function() releaser, + required Function(bool) setDone, + required Function(bool) setErrored, + required bool isSubscription, + required EventSink sink, + }) async { + try { + bool localDone = done; + final _c = await state.connect(); + // print("subscribe step 2 connect $id ${state.hashCode} ${_c.socket.hashCode}"); + final socket = _c.socket; + final release = _c.release; + final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose; + final waitForLikeCloseEvent = _c.waitForLikeCloseEvent; + // print("isolate debug name: ${Isolate.current.debugName}"); + // print(payload.operation.toString()); + // print(payload.variables.toString()); + // print(payload.context.toString()); + // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + // if done while waiting for connect, release the connection lock right away + final _subscribeMsg = await options.graphQLSocketMessageEncoder( + SubscribeMessage(id, serializedRequest), + ); + // print("subscribe step 3 operation $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + if (localDone) { + if (!release.isCompleted) release.complete(); + } + + final unlisten = emitter.onMessage(id, (message) { + if (message is NextMessage) { + if (!isSubscription) { + setDone(true); + localDone = true; + } + sink.add(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + if (!isSubscription) { + releaser(); // release completes the sink + } + // print("subscribe step 5 receive NextMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is ErrorMessage) { + setErrored(true); + setDone(true); + localDone = true; + sink.addError(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + releaser(); + // print("subscribe step 6 receive ErrorMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is CompleteMessage) { + if (isSubscription) { + setDone(true); + localDone = true; + releaser(); // release completes the sink + } + // print( + // "subscribe step 7 receive CompleteMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } + }); + + socket.sink.add(_subscribeMsg); + + state.nextOrErrorMsgWaitMap[id] = Completer(); + + releaser = () async { + final _completeMsg = + await options.graphQLSocketMessageEncoder(CompleteMessage(id)); + if (!localDone && state.isOpen) { + // if not completed already and socket is open, send complete message to server on release + socket.sink.add(_completeMsg); + } + state.locks--; + setDone(true); + localDone = true; + if (!release.isCompleted) release.complete(); + // print( + // "subscribe step 8 releaser $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }; + + // print( + // "subscribe step 4 waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // either the releaser will be called, connection completed and + // the promise resolved or the socket closed and the promise rejected. + // whatever happens though, we want to stop listening for messages + final likeCloseEvent = await Future.any( + [waitForReleaseOrThrowOnClose, waitForLikeCloseEvent]) + .whenComplete(() async { + unlisten(); + // print( + // "subscribe step 9 complete waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }); + + // workground for dart linux bug: complete error not being caught by try..catch block + if (likeCloseEvent != null) { + // print( + // "subscribe step 10 error likeCloseEvent $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + throw likeCloseEvent; + } + + // print( + // "subscribe step 11 success $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + + return true; // completed, shouldnt try again + } catch (errOrCloseEvent) { + if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return true; + return false; + } + } + @override Future dispose() async { options.log?.call("dispose");