Skip to content

Commit 46da75f

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 559975e commit 46da75f

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,22 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
243243
return result;
244244
}
245245

246+
async function completeAsync(document: DocumentNode, numCalls: number) {
247+
const schema = new GraphQLSchema({ query });
248+
249+
const result = await execute({ schema, document, rootValue: {} });
250+
251+
invariant(isAsyncIterable(result));
252+
253+
const iterator = result[Symbol.asyncIterator]();
254+
255+
const promises = [];
256+
for (let i = 0; i < numCalls; i++) {
257+
promises.push(iterator.next());
258+
}
259+
return Promise.all(promises);
260+
}
261+
246262
describe('Execute: stream directive', () => {
247263
it('Can stream a list field', async () => {
248264
const document = parse('{ scalarList @stream(initialCount: 1) }');
@@ -683,6 +699,60 @@ describe('Execute: stream directive', () => {
683699
},
684700
});
685701
});
702+
it('Can handle concurrent calls to .next() without waiting', async () => {
703+
const document = parse(`
704+
query {
705+
asyncIterableList @stream(initialCount: 2) {
706+
name
707+
id
708+
}
709+
}
710+
`);
711+
const result = await completeAsync(document, 4);
712+
expectJSON(result).toDeepEqual([
713+
{
714+
done: false,
715+
value: {
716+
data: {
717+
asyncIterableList: [
718+
{
719+
name: 'Luke',
720+
id: '1',
721+
},
722+
{
723+
name: 'Han',
724+
id: '2',
725+
},
726+
],
727+
},
728+
hasNext: true,
729+
},
730+
},
731+
{
732+
done: false,
733+
value: {
734+
data: [
735+
{
736+
name: 'Leia',
737+
id: '3',
738+
},
739+
],
740+
path: ['asyncIterableList', 2],
741+
hasNext: true,
742+
},
743+
},
744+
{
745+
done: false,
746+
value: {
747+
hasNext: false,
748+
},
749+
},
750+
{
751+
done: true,
752+
value: undefined,
753+
},
754+
]);
755+
});
686756
it('Handles error thrown in async iterable before initialCount is reached', async () => {
687757
const document = parse(`
688758
query {

src/execution/execute.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,7 +1648,18 @@ function yieldSubsequentPayloads(
16481648

16491649
const data = await asyncPayloadRecord.data;
16501650

1651+
if (exeContext.subsequentPayloads.length === 0) {
1652+
// a different call to next has exhausted all payloads
1653+
return { value: undefined, done: true };
1654+
}
1655+
16511656
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1657+
1658+
if (index === -1) {
1659+
// a different call to next has consumed this payload
1660+
return race();
1661+
}
1662+
16521663
exeContext.subsequentPayloads.splice(index, 1);
16531664

16541665
if (asyncPayloadRecord.isCompletedIterator) {

0 commit comments

Comments
 (0)