@@ -38,43 +38,21 @@ func subscribe(
38
38
)
39
39
40
40
return sourceResult. map { sourceStream in
41
- // We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
42
- // (which we cannot know),
43
- // and we need the result to be a concrete type.
44
- let subscriptionStream = AsyncThrowingStream < GraphQLResult , Error > { continuation in
45
- let task = Task {
46
- do {
47
- for try await eventPayload in sourceStream {
48
- // For each payload yielded from a subscription, map it over the normal
49
- // GraphQL `execute` function, with `payload` as the rootValue.
50
- // This implements the "MapSourceToResponseEvent" algorithm described in
51
- // the GraphQL specification. The `execute` function provides the
52
- // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
53
- // "ExecuteQuery" algorithm, for which `execute` is also used.
54
- let newEvent = try await execute (
55
- queryStrategy: queryStrategy,
56
- mutationStrategy: mutationStrategy,
57
- subscriptionStrategy: subscriptionStrategy,
58
- schema: schema,
59
- documentAST: documentAST,
60
- rootValue: eventPayload,
61
- context: context,
62
- variableValues: variableValues,
63
- operationName: operationName
64
- )
65
- continuation. yield ( newEvent)
66
- }
67
- continuation. finish ( )
68
- } catch {
69
- continuation. finish ( throwing: error)
70
- }
71
- }
72
-
73
- continuation. onTermination = { @Sendable reason in
74
- task. cancel ( )
75
- }
41
+ return AsyncThrowingStream < GraphQLResult , Error > {
42
+ var iterator = sourceStream. makeAsyncIterator ( )
43
+ let eventPayload = try await iterator. next ( )
44
+ return try await execute (
45
+ queryStrategy: queryStrategy,
46
+ mutationStrategy: mutationStrategy,
47
+ subscriptionStrategy: subscriptionStrategy,
48
+ schema: schema,
49
+ documentAST: documentAST,
50
+ rootValue: eventPayload,
51
+ context: context,
52
+ variableValues: variableValues,
53
+ operationName: operationName
54
+ )
76
55
}
77
- return subscriptionStream
78
56
}
79
57
}
80
58
@@ -111,7 +89,7 @@ func createSourceEventStream(
111
89
context: Any ,
112
90
variableValues: [ String : Map ] = [ : ] ,
113
91
operationName: String ? = nil
114
- ) async throws -> Result < any AsyncSequence , GraphQLErrors > {
92
+ ) async throws -> Result < any AsyncSequence & Sendable , GraphQLErrors > {
115
93
// If a valid context cannot be created due to incorrect arguments,
116
94
// this will throw an error.
117
95
let exeContext = try buildExecutionContext (
@@ -141,7 +119,7 @@ func createSourceEventStream(
141
119
142
120
func executeSubscription(
143
121
context: ExecutionContext
144
- ) async throws -> Result < any AsyncSequence , GraphQLErrors > {
122
+ ) async throws -> Result < any AsyncSequence & Sendable , GraphQLErrors > {
145
123
// Get the first node
146
124
let type = try getOperationRootType ( schema: context. schema, operation: context. operation)
147
125
var inputFields : OrderedDictionary < String , [ Field ] > = [ : ]
0 commit comments