Skip to content

Commit 4b7fc00

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent b1e3ea8 commit 4b7fc00

File tree

2 files changed

+232
-7
lines changed

2 files changed

+232
-7
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it } from 'mocha';
22

33
import { expectJSON } from '../../__testUtils__/expectJSON';
44

5+
import { invariant } from '../../jsutils/invariant';
56
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
67

78
import type { DocumentNode } from '../../language/ast';
@@ -112,6 +113,37 @@ const query = new GraphQLObjectType({
112113
yield await Promise.resolve({});
113114
},
114115
},
116+
asyncIterableListDelayed: {
117+
type: new GraphQLList(friendType),
118+
async *resolve() {
119+
for (const friend of friends) {
120+
// pause an additional ms before yielding to allow time
121+
// for tests to return or throw before next value is processed.
122+
// eslint-disable-next-line no-await-in-loop
123+
await new Promise((r) => setTimeout(r, 1));
124+
yield friend; /* c8 ignore start */
125+
// Not reachable, early return
126+
}
127+
} /* c8 ignore stop */,
128+
},
129+
asyncIterableListNoReturn: {
130+
type: new GraphQLList(friendType),
131+
resolve() {
132+
let i = 0;
133+
return {
134+
[Symbol.asyncIterator]: () => ({
135+
async next() {
136+
const friend = friends[i++];
137+
if (friend) {
138+
await new Promise((r) => setTimeout(r, 1));
139+
return { value: friend, done: false };
140+
}
141+
return { value: undefined, done: true };
142+
},
143+
}),
144+
};
145+
},
146+
},
115147
asyncIterableListDelayedClose: {
116148
type: new GraphQLList(friendType),
117149
async *resolve() {
@@ -1002,4 +1034,175 @@ describe('Execute: stream directive', () => {
10021034
},
10031035
]);
10041036
});
1037+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1038+
const document = parse(`
1039+
query {
1040+
asyncIterableListDelayed @stream(initialCount: 1) {
1041+
name
1042+
id
1043+
}
1044+
}
1045+
`);
1046+
const schema = new GraphQLSchema({ query });
1047+
1048+
const executeResult = await execute({ schema, document, rootValue: {} });
1049+
invariant(isAsyncIterable(executeResult));
1050+
const iterator = executeResult[Symbol.asyncIterator]();
1051+
1052+
const result1 = await iterator.next();
1053+
expectJSON(result1).toDeepEqual({
1054+
done: false,
1055+
value: {
1056+
data: {
1057+
asyncIterableListDelayed: [
1058+
{
1059+
id: '1',
1060+
name: 'Luke',
1061+
},
1062+
],
1063+
},
1064+
hasNext: true,
1065+
},
1066+
});
1067+
1068+
const returnPromise = iterator.return();
1069+
1070+
// this result had started processing before return was called
1071+
const result2 = await iterator.next();
1072+
expectJSON(result2).toDeepEqual({
1073+
done: false,
1074+
value: {
1075+
data: {
1076+
id: '2',
1077+
name: 'Han',
1078+
},
1079+
hasNext: true,
1080+
path: ['asyncIterableListDelayed', 1],
1081+
},
1082+
});
1083+
1084+
// third result is not returned because async iterator has returned
1085+
const result3 = await iterator.next();
1086+
expectJSON(result3).toDeepEqual({
1087+
done: true,
1088+
value: undefined,
1089+
});
1090+
await returnPromise;
1091+
});
1092+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1093+
const document = parse(`
1094+
query {
1095+
asyncIterableListNoReturn @stream(initialCount: 1) {
1096+
name
1097+
id
1098+
}
1099+
}
1100+
`);
1101+
const schema = new GraphQLSchema({ query });
1102+
1103+
const executeResult = await execute({ schema, document, rootValue: {} });
1104+
invariant(isAsyncIterable(executeResult));
1105+
const iterator = executeResult[Symbol.asyncIterator]();
1106+
1107+
const result1 = await iterator.next();
1108+
expectJSON(result1).toDeepEqual({
1109+
done: false,
1110+
value: {
1111+
data: {
1112+
asyncIterableListNoReturn: [
1113+
{
1114+
id: '1',
1115+
name: 'Luke',
1116+
},
1117+
],
1118+
},
1119+
hasNext: true,
1120+
},
1121+
});
1122+
1123+
const returnPromise = iterator.return();
1124+
1125+
// this result had started processing before return was called
1126+
const result2 = await iterator.next();
1127+
expectJSON(result2).toDeepEqual({
1128+
done: false,
1129+
value: {
1130+
data: {
1131+
id: '2',
1132+
name: 'Han',
1133+
},
1134+
hasNext: true,
1135+
path: ['asyncIterableListNoReturn', 1],
1136+
},
1137+
});
1138+
1139+
// third result is not returned because async iterator has returned
1140+
const result3 = await iterator.next();
1141+
expectJSON(result3).toDeepEqual({
1142+
done: true,
1143+
value: undefined,
1144+
});
1145+
await returnPromise;
1146+
});
1147+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1148+
const document = parse(`
1149+
query {
1150+
asyncIterableListDelayed @stream(initialCount: 1) {
1151+
name
1152+
id
1153+
}
1154+
}
1155+
`);
1156+
const schema = new GraphQLSchema({ query });
1157+
1158+
const executeResult = await execute({ schema, document, rootValue: {} });
1159+
invariant(isAsyncIterable(executeResult));
1160+
const iterator = executeResult[Symbol.asyncIterator]();
1161+
1162+
const result1 = await iterator.next();
1163+
expectJSON(result1).toDeepEqual({
1164+
done: false,
1165+
value: {
1166+
data: {
1167+
asyncIterableListDelayed: [
1168+
{
1169+
id: '1',
1170+
name: 'Luke',
1171+
},
1172+
],
1173+
},
1174+
hasNext: true,
1175+
},
1176+
});
1177+
1178+
const throwPromise = iterator.throw(new Error('bad'));
1179+
1180+
// this result had started processing before return was called
1181+
const result2 = await iterator.next();
1182+
expectJSON(result2).toDeepEqual({
1183+
done: false,
1184+
value: {
1185+
data: {
1186+
id: '2',
1187+
name: 'Han',
1188+
},
1189+
hasNext: true,
1190+
path: ['asyncIterableListDelayed', 1],
1191+
},
1192+
});
1193+
1194+
// third result is not returned because async iterator has returned
1195+
const result3 = await iterator.next();
1196+
expectJSON(result3).toDeepEqual({
1197+
done: true,
1198+
value: undefined,
1199+
});
1200+
try {
1201+
await throwPromise; /* c8 ignore start */
1202+
// Not reachable, always throws
1203+
/* c8 ignore stop */
1204+
} catch (e) {
1205+
// ignore error
1206+
}
1207+
});
10051208
});

src/execution/execute.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,7 @@ function executeStreamIterator(
14951495
const asyncPayloadRecord = new AsyncPayloadRecord({
14961496
label,
14971497
path: fieldPath,
1498+
iterator,
14981499
});
14991500
const dataPromise: Promise<unknown> = iterator
15001501
.next()
@@ -1560,6 +1561,7 @@ function yieldSubsequentPayloads(
15601561
initialResult: ExecutionResult,
15611562
): AsyncGenerator<AsyncExecutionResult, void, void> {
15621563
let _hasReturnedInitialResult = false;
1564+
let isDone = false;
15631565

15641566
async function race(): Promise<IteratorResult<AsyncExecutionResult>> {
15651567
if (exeContext.subsequentPayloads.length === 0) {
@@ -1625,17 +1627,31 @@ function yieldSubsequentPayloads(
16251627
},
16261628
done: false,
16271629
});
1628-
} else if (exeContext.subsequentPayloads.length === 0) {
1630+
} else if (exeContext.subsequentPayloads.length === 0 || isDone) {
16291631
return Promise.resolve({ value: undefined, done: true });
16301632
}
16311633
return race();
16321634
},
1633-
// TODO: implement return & throw
1634-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1635-
Promise.resolve({ value: undefined, done: true }),
1636-
throw: /* istanbul ignore next: will be covered in follow up */ (
1635+
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1636+
await Promise.all(
1637+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1638+
asyncPayloadRecord.iterator?.return?.(),
1639+
),
1640+
);
1641+
isDone = true;
1642+
return { value: undefined, done: true };
1643+
},
1644+
async throw(
16371645
error?: unknown,
1638-
) => Promise.reject(error),
1646+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1647+
await Promise.all(
1648+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1649+
asyncPayloadRecord.iterator?.return?.(),
1650+
),
1651+
);
1652+
isDone = true;
1653+
return Promise.reject(error);
1654+
},
16391655
};
16401656
}
16411657

@@ -1644,10 +1660,16 @@ class AsyncPayloadRecord {
16441660
label?: string;
16451661
path?: Path;
16461662
dataPromise?: Promise<unknown | null | undefined>;
1663+
iterator?: AsyncIterator<unknown>;
16471664
isCompletedIterator?: boolean;
1648-
constructor(opts: { label?: string; path?: Path }) {
1665+
constructor(opts: {
1666+
label?: string;
1667+
path?: Path;
1668+
iterator?: AsyncIterator<unknown>;
1669+
}) {
16491670
this.label = opts.label;
16501671
this.path = opts.path;
1672+
this.iterator = opts.iterator;
16511673
this.errors = [];
16521674
}
16531675

0 commit comments

Comments
 (0)