Skip to content

Commit 768f69e

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent 0451aef commit 768f69e

File tree

2 files changed

+225
-6
lines changed

2 files changed

+225
-6
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it } from 'mocha';
22

33
import { expectJSON } from '../../__testUtils__/expectJSON';
44

5+
import { invariant } from '../../jsutils/invariant';
56
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
67

78
import type { DocumentNode } from '../../language/ast';
@@ -134,6 +135,37 @@ const query = new GraphQLObjectType({
134135
yield await Promise.resolve({ string: friends[1].name });
135136
},
136137
},
138+
asyncIterableListDelayed: {
139+
type: new GraphQLList(friendType),
140+
async *resolve() {
141+
for (const friend of friends) {
142+
// pause an additional ms before yielding to allow time
143+
// for tests to return or throw before next value is processed.
144+
// eslint-disable-next-line no-await-in-loop
145+
await new Promise((r) => setTimeout(r, 1));
146+
yield friend; /* c8 ignore start */
147+
// Not reachable, early return
148+
}
149+
} /* c8 ignore stop */,
150+
},
151+
asyncIterableListNoReturn: {
152+
type: new GraphQLList(friendType),
153+
resolve() {
154+
let i = 0;
155+
return {
156+
[Symbol.asyncIterator]: () => ({
157+
async next() {
158+
const friend = friends[i++];
159+
if (friend) {
160+
await new Promise((r) => setTimeout(r, 1));
161+
return { value: friend, done: false };
162+
}
163+
return { value: undefined, done: true };
164+
},
165+
}),
166+
};
167+
},
168+
},
137169
asyncIterableListDelayedClose: {
138170
type: new GraphQLList(friendType),
139171
async *resolve() {
@@ -1080,4 +1112,175 @@ describe('Execute: stream directive', () => {
10801112
},
10811113
]);
10821114
});
1115+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1116+
const document = parse(`
1117+
query {
1118+
asyncIterableListDelayed @stream(initialCount: 1) {
1119+
name
1120+
id
1121+
}
1122+
}
1123+
`);
1124+
const schema = new GraphQLSchema({ query });
1125+
1126+
const executeResult = await execute({ schema, document, rootValue: {} });
1127+
invariant(isAsyncIterable(executeResult));
1128+
const iterator = executeResult[Symbol.asyncIterator]();
1129+
1130+
const result1 = await iterator.next();
1131+
expectJSON(result1).toDeepEqual({
1132+
done: false,
1133+
value: {
1134+
data: {
1135+
asyncIterableListDelayed: [
1136+
{
1137+
id: '1',
1138+
name: 'Luke',
1139+
},
1140+
],
1141+
},
1142+
hasNext: true,
1143+
},
1144+
});
1145+
1146+
const returnPromise = iterator.return();
1147+
1148+
// this result had started processing before return was called
1149+
const result2 = await iterator.next();
1150+
expectJSON(result2).toDeepEqual({
1151+
done: false,
1152+
value: {
1153+
data: {
1154+
id: '2',
1155+
name: 'Han',
1156+
},
1157+
hasNext: true,
1158+
path: ['asyncIterableListDelayed', 1],
1159+
},
1160+
});
1161+
1162+
// third result is not returned because async iterator has returned
1163+
const result3 = await iterator.next();
1164+
expectJSON(result3).toDeepEqual({
1165+
done: true,
1166+
value: undefined,
1167+
});
1168+
await returnPromise;
1169+
});
1170+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1171+
const document = parse(`
1172+
query {
1173+
asyncIterableListNoReturn @stream(initialCount: 1) {
1174+
name
1175+
id
1176+
}
1177+
}
1178+
`);
1179+
const schema = new GraphQLSchema({ query });
1180+
1181+
const executeResult = await execute({ schema, document, rootValue: {} });
1182+
invariant(isAsyncIterable(executeResult));
1183+
const iterator = executeResult[Symbol.asyncIterator]();
1184+
1185+
const result1 = await iterator.next();
1186+
expectJSON(result1).toDeepEqual({
1187+
done: false,
1188+
value: {
1189+
data: {
1190+
asyncIterableListNoReturn: [
1191+
{
1192+
id: '1',
1193+
name: 'Luke',
1194+
},
1195+
],
1196+
},
1197+
hasNext: true,
1198+
},
1199+
});
1200+
1201+
const returnPromise = iterator.return();
1202+
1203+
// this result had started processing before return was called
1204+
const result2 = await iterator.next();
1205+
expectJSON(result2).toDeepEqual({
1206+
done: false,
1207+
value: {
1208+
data: {
1209+
id: '2',
1210+
name: 'Han',
1211+
},
1212+
hasNext: true,
1213+
path: ['asyncIterableListNoReturn', 1],
1214+
},
1215+
});
1216+
1217+
// third result is not returned because async iterator has returned
1218+
const result3 = await iterator.next();
1219+
expectJSON(result3).toDeepEqual({
1220+
done: true,
1221+
value: undefined,
1222+
});
1223+
await returnPromise;
1224+
});
1225+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1226+
const document = parse(`
1227+
query {
1228+
asyncIterableListDelayed @stream(initialCount: 1) {
1229+
name
1230+
id
1231+
}
1232+
}
1233+
`);
1234+
const schema = new GraphQLSchema({ query });
1235+
1236+
const executeResult = await execute({ schema, document, rootValue: {} });
1237+
invariant(isAsyncIterable(executeResult));
1238+
const iterator = executeResult[Symbol.asyncIterator]();
1239+
1240+
const result1 = await iterator.next();
1241+
expectJSON(result1).toDeepEqual({
1242+
done: false,
1243+
value: {
1244+
data: {
1245+
asyncIterableListDelayed: [
1246+
{
1247+
id: '1',
1248+
name: 'Luke',
1249+
},
1250+
],
1251+
},
1252+
hasNext: true,
1253+
},
1254+
});
1255+
1256+
const throwPromise = iterator.throw(new Error('bad'));
1257+
1258+
// this result had started processing before return was called
1259+
const result2 = await iterator.next();
1260+
expectJSON(result2).toDeepEqual({
1261+
done: false,
1262+
value: {
1263+
data: {
1264+
id: '2',
1265+
name: 'Han',
1266+
},
1267+
hasNext: true,
1268+
path: ['asyncIterableListDelayed', 1],
1269+
},
1270+
});
1271+
1272+
// third result is not returned because async iterator has returned
1273+
const result3 = await iterator.next();
1274+
expectJSON(result3).toDeepEqual({
1275+
done: true,
1276+
value: undefined,
1277+
});
1278+
try {
1279+
await throwPromise; /* c8 ignore start */
1280+
// Not reachable, always throws
1281+
/* c8 ignore stop */
1282+
} catch (e) {
1283+
// ignore error
1284+
}
1285+
});
10831286
});

src/execution/execute.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,6 +1554,7 @@ async function executeStreamIterator(
15541554
label,
15551555
path: fieldPath,
15561556
parentContext,
1557+
iterator,
15571558
});
15581559
const dataPromise = executeStreamIteratorItem(
15591560
iterator,
@@ -1581,6 +1582,7 @@ function yieldSubsequentPayloads(
15811582
initialResult: ExecutionResult,
15821583
): AsyncGenerator<AsyncExecutionResult, void, void> {
15831584
let _hasReturnedInitialResult = false;
1585+
let isDone = false;
15841586

15851587
async function race(): Promise<IteratorResult<AsyncExecutionResult>> {
15861588
if (exeContext.subsequentPayloads.length === 0) {
@@ -1651,17 +1653,31 @@ function yieldSubsequentPayloads(
16511653
},
16521654
done: false,
16531655
});
1654-
} else if (exeContext.subsequentPayloads.length === 0) {
1656+
} else if (exeContext.subsequentPayloads.length === 0 || isDone) {
16551657
return Promise.resolve({ value: undefined, done: true });
16561658
}
16571659
return race();
16581660
},
1659-
// TODO: implement return & throw
1660-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1661-
Promise.resolve({ value: undefined, done: true }),
1662-
throw: /* istanbul ignore next: will be covered in follow up */ (
1661+
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1662+
await Promise.all(
1663+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1664+
asyncPayloadRecord.iterator?.return?.(),
1665+
),
1666+
);
1667+
isDone = true;
1668+
return { value: undefined, done: true };
1669+
},
1670+
async throw(
16631671
error?: unknown,
1664-
) => Promise.reject(error),
1672+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1673+
await Promise.all(
1674+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1675+
asyncPayloadRecord.iterator?.return?.(),
1676+
),
1677+
);
1678+
isDone = true;
1679+
return Promise.reject(error);
1680+
},
16651681
};
16661682
}
16671683

0 commit comments

Comments
 (0)