diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts new file mode 100644 index 0000000000..9b7c0b2df9 --- /dev/null +++ b/src/execution/__tests__/defer-test.ts @@ -0,0 +1,701 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { expectJSON } from '../../__testUtils__/expectJSON'; +import { expectPromise } from '../../__testUtils__/expectPromise'; +import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick'; + +import type { DocumentNode } from '../../language/ast'; +import { parse } from '../../language/parser'; + +import { + GraphQLList, + GraphQLNonNull, + GraphQLObjectType, +} from '../../type/definition'; +import { GraphQLID, GraphQLString } from '../../type/scalars'; +import { GraphQLSchema } from '../../type/schema'; + +import type { + InitialIncrementalExecutionResult, + SubsequentIncrementalExecutionResult, +} from '../execute'; +import { execute, experimentalExecuteIncrementally } from '../execute'; + +const friendType = new GraphQLObjectType({ + fields: { + id: { type: GraphQLID }, + name: { type: GraphQLString }, + }, + name: 'Friend', +}); + +const friends = [ + { name: 'Han', id: 2 }, + { name: 'Leia', id: 3 }, + { name: 'C-3PO', id: 4 }, +]; + +const heroType = new GraphQLObjectType({ + fields: { + id: { type: GraphQLID }, + name: { type: GraphQLString }, + slowField: { + type: GraphQLString, + resolve: async () => { + await resolveOnNextTick(); + return 'slow'; + }, + }, + errorField: { + type: GraphQLString, + resolve: () => { + throw new Error('bad'); + }, + }, + nonNullErrorField: { + type: new GraphQLNonNull(GraphQLString), + resolve: () => null, + }, + promiseNonNullErrorField: { + type: new GraphQLNonNull(GraphQLString), + resolve: () => Promise.resolve(null), + }, + friends: { + type: new GraphQLList(friendType), + resolve: () => friends, + }, + }, + name: 'Hero', +}); + +const hero = { name: 'Luke', id: 1 }; + +const query = new GraphQLObjectType({ + fields: { + hero: { + type: heroType, + resolve: () => hero, + }, + }, + name: 'Query', +}); + +const schema = new GraphQLSchema({ query }); + +async function complete(document: DocumentNode) { + const result = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: {}, + }); + + if ('initialResult' in result) { + const results: Array< + InitialIncrementalExecutionResult | SubsequentIncrementalExecutionResult + > = [result.initialResult]; + for await (const patch of result.subsequentResults) { + results.push(patch); + } + return results; + } + return result.singleResult; +} + +describe('Execute: defer directive', () => { + it('Can defer fragments containing scalar types', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + id + name + } + `); + const result = await complete(document); + + expectJSON(result).toDeepEqual([ + { + data: { + hero: { + id: '1', + }, + }, + hasNext: true, + }, + { + incremental: [ + { + data: { + id: '1', + name: 'Luke', + }, + path: ['hero'], + }, + ], + hasNext: false, + }, + ]); + }); + it('Can disable defer using if argument', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer(if: false) + } + } + fragment NameFragment on Hero { + name + } + `); + const result = await complete(document); + + expectJSON(result).toDeepEqual({ + data: { + hero: { + id: '1', + name: 'Luke', + }, + }, + }); + }); + it('Does not disable defer with null if argument', async () => { + const document = parse(` + query HeroNameQuery($shouldDefer: Boolean) { + hero { + id + ...NameFragment @defer(if: $shouldDefer) + } + } + fragment NameFragment on Hero { + name + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { hero: { id: '1' } }, + hasNext: true, + }, + { + incremental: [ + { + data: { name: 'Luke' }, + path: ['hero'], + }, + ], + hasNext: false, + }, + ]); + }); + it('Can defer fragments on the top level Query field', async () => { + const document = parse(` + query HeroNameQuery { + ...QueryFragment @defer(label: "DeferQuery") + } + fragment QueryFragment on Query { + hero { + id + } + } + `); + const result = await complete(document); + + expectJSON(result).toDeepEqual([ + { + data: {}, + hasNext: true, + }, + { + incremental: [ + { + data: { + hero: { + id: '1', + }, + }, + path: [], + label: 'DeferQuery', + }, + ], + hasNext: false, + }, + ]); + }); + it('Can defer fragments with errors on the top level Query field', async () => { + const document = parse(` + query HeroNameQuery { + ...QueryFragment @defer(label: "DeferQuery") + } + fragment QueryFragment on Query { + hero { + errorField + } + } + `); + const result = await complete(document); + + expectJSON(result).toDeepEqual([ + { + data: {}, + hasNext: true, + }, + { + incremental: [ + { + data: { + hero: { + errorField: null, + }, + }, + errors: [ + { + message: 'bad', + locations: [{ line: 7, column: 11 }], + path: ['hero', 'errorField'], + }, + ], + path: [], + label: 'DeferQuery', + }, + ], + hasNext: false, + }, + ]); + }); + it('Can defer a fragment within an already deferred fragment', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...TopFragment @defer(label: "DeferTop") + } + } + fragment TopFragment on Hero { + name + ...NestedFragment @defer(label: "DeferNested") + } + fragment NestedFragment on Hero { + friends { + name + } + } + `); + const result = await complete(document); + + expectJSON(result).toDeepEqual([ + { + data: { + hero: { + id: '1', + }, + }, + hasNext: true, + }, + { + incremental: [ + { + data: { + friends: [{ name: 'Han' }, { name: 'Leia' }, { name: 'C-3PO' }], + }, + path: ['hero'], + label: 'DeferNested', + }, + { + data: { + name: 'Luke', + }, + path: ['hero'], + label: 'DeferTop', + }, + ], + hasNext: false, + }, + ]); + }); + it('Can defer a fragment that is also not deferred, deferred fragment is first', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...TopFragment @defer(label: "DeferTop") + ...TopFragment + } + } + fragment TopFragment on Hero { + name + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { + hero: { + id: '1', + name: 'Luke', + }, + }, + hasNext: true, + }, + { + incremental: [ + { + data: { + name: 'Luke', + }, + path: ['hero'], + label: 'DeferTop', + }, + ], + hasNext: false, + }, + ]); + }); + it('Can defer a fragment that is also not deferred, non-deferred fragment is first', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...TopFragment + ...TopFragment @defer(label: "DeferTop") + } + } + fragment TopFragment on Hero { + name + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { + hero: { + id: '1', + name: 'Luke', + }, + }, + hasNext: true, + }, + { + incremental: [ + { + data: { + name: 'Luke', + }, + path: ['hero'], + label: 'DeferTop', + }, + ], + hasNext: false, + }, + ]); + }); + + it('Can defer an inline fragment', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ... on Hero @defer(label: "InlineDeferred") { + name + } + } + } + `); + const result = await complete(document); + + expectJSON(result).toDeepEqual([ + { + data: { hero: { id: '1' } }, + hasNext: true, + }, + { + incremental: [ + { data: { name: 'Luke' }, path: ['hero'], label: 'InlineDeferred' }, + ], + hasNext: false, + }, + ]); + }); + it('Handles errors thrown in deferred fragments', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + errorField + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { hero: { id: '1' } }, + hasNext: true, + }, + { + incremental: [ + { + data: { errorField: null }, + path: ['hero'], + errors: [ + { + message: 'bad', + locations: [{ line: 9, column: 9 }], + path: ['hero', 'errorField'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles non-nullable errors thrown in deferred fragments', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + nonNullErrorField + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { hero: { id: '1' } }, + hasNext: true, + }, + { + incremental: [ + { + data: null, + path: ['hero'], + errors: [ + { + message: + 'Cannot return null for non-nullable field Hero.nonNullErrorField.', + locations: [{ line: 9, column: 9 }], + path: ['hero', 'nonNullErrorField'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles non-nullable errors thrown outside deferred fragments', async () => { + const document = parse(` + query HeroNameQuery { + hero { + nonNullErrorField + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + id + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual({ + errors: [ + { + message: + 'Cannot return null for non-nullable field Hero.nonNullErrorField.', + locations: [ + { + line: 4, + column: 11, + }, + ], + path: ['hero', 'nonNullErrorField'], + }, + ], + data: { + hero: null, + }, + }); + }); + it('Handles async non-nullable errors thrown in deferred fragments', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + promiseNonNullErrorField + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { hero: { id: '1' } }, + hasNext: true, + }, + { + incremental: [ + { + data: null, + path: ['hero'], + errors: [ + { + message: + 'Cannot return null for non-nullable field Hero.promiseNonNullErrorField.', + locations: [{ line: 9, column: 9 }], + path: ['hero', 'promiseNonNullErrorField'], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + it('Returns payloads in correct order', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + slowField + friends { + ...NestedFragment @defer + } + } + fragment NestedFragment on Friend { + name + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { + hero: { id: '1' }, + }, + hasNext: true, + }, + { + incremental: [ + { + data: { slowField: 'slow', friends: [{}, {}, {}] }, + path: ['hero'], + }, + ], + hasNext: true, + }, + { + incremental: [ + { data: { name: 'Han' }, path: ['hero', 'friends', 0] }, + { data: { name: 'Leia' }, path: ['hero', 'friends', 1] }, + { data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] }, + ], + hasNext: false, + }, + ]); + }); + it('Returns payloads from synchronous data in correct order', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + name + friends { + ...NestedFragment @defer + } + } + fragment NestedFragment on Friend { + name + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ + { + data: { + hero: { id: '1' }, + }, + hasNext: true, + }, + { + incremental: [ + { + data: { + name: 'Luke', + friends: [{}, {}, {}], + }, + path: ['hero'], + }, + ], + hasNext: true, + }, + { + incremental: [ + { data: { name: 'Han' }, path: ['hero', 'friends', 0] }, + { data: { name: 'Leia' }, path: ['hero', 'friends', 1] }, + { data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] }, + ], + hasNext: false, + }, + ]); + }); + + it('original execute function throws error if anything is deferred and everything else is sync', () => { + const doc = ` + query Deferred { + ... @defer { hero { id } } + } + `; + expect(() => + execute({ + schema, + document: parse(doc), + rootValue: {}, + }), + ).to.throw( + 'Executing this GraphQL operation would unexpectedly produce multiple payloads (due to @defer or @stream directive)', + ); + }); + + it('original execute function resolves to error if anything is deferred and something else is async', async () => { + const doc = ` + query Deferred { + hero { slowField } + ... @defer { hero { id } } + } + `; + expectJSON( + await expectPromise( + execute({ + schema, + document: parse(doc), + rootValue: {}, + }), + ).toResolve(), + ).toDeepEqual({ + errors: [ + { + message: + 'Executing this GraphQL operation would unexpectedly produce multiple payloads (due to @defer or @stream directive)', + }, + ], + }); + }); +}); diff --git a/src/execution/__tests__/flattenAsyncIterable-test.ts b/src/execution/__tests__/flattenAsyncIterable-test.ts new file mode 100644 index 0000000000..abd3b1d658 --- /dev/null +++ b/src/execution/__tests__/flattenAsyncIterable-test.ts @@ -0,0 +1,149 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { flattenAsyncIterable } from '../flattenAsyncIterable'; + +describe('flattenAsyncIterable', () => { + it('flatten nested async generators', async () => { + async function* source() { + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + ); + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(2.1); + yield await Promise.resolve(2.2); + })(), + ); + } + + const doubles = flattenAsyncIterable(source()); + + const result = []; + for await (const x of doubles) { + result.push(x); + } + expect(result).to.deep.equal([1.1, 1.2, 2.1, 2.2]); + }); + + it('allows returning early from a nested async generator', async () => { + async function* source() { + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + ); + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(2.1); /* c8 ignore start */ + // Not reachable, early return + yield await Promise.resolve(2.2); + })(), + ); + // Not reachable, early return + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(3.1); + yield await Promise.resolve(3.2); + })(), + ); + } + /* c8 ignore stop */ + + const doubles = flattenAsyncIterable(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1.1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Early return + expect(await doubles.return()).to.deep.equal({ + value: undefined, + done: true, + }); + + // Subsequent next calls + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('allows throwing errors from a nested async generator', async () => { + async function* source() { + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + ); + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(2.1); /* c8 ignore start */ + // Not reachable, early return + yield await Promise.resolve(2.2); + })(), + ); + // Not reachable, early return + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(3.1); + yield await Promise.resolve(3.2); + })(), + ); + } + /* c8 ignore stop */ + + const doubles = flattenAsyncIterable(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1.1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Throw error + let caughtError; + try { + await doubles.throw('ouch'); /* c8 ignore start */ + } catch (e) { + caughtError = e; + } + expect(caughtError).to.equal('ouch'); + }); + it('completely yields sub-iterables even when next() called in parallel', async () => { + async function* source() { + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + ); + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(2.1); + yield await Promise.resolve(2.2); + })(), + ); + } + + const result = flattenAsyncIterable(source()); + + const promise1 = result.next(); + const promise2 = result.next(); + expect(await promise1).to.deep.equal({ value: 1.1, done: false }); + expect(await promise2).to.deep.equal({ value: 1.2, done: false }); + expect(await result.next()).to.deep.equal({ value: 2.1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2.2, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); +}); diff --git a/src/execution/__tests__/mutations-test.ts b/src/execution/__tests__/mutations-test.ts index 0f0ad1cbf8..2b2b90ede5 100644 --- a/src/execution/__tests__/mutations-test.ts +++ b/src/execution/__tests__/mutations-test.ts @@ -1,4 +1,4 @@ -import { expect } from 'chai'; +import { assert, expect } from 'chai'; import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON'; @@ -10,7 +10,11 @@ import { GraphQLObjectType } from '../../type/definition'; import { GraphQLInt } from '../../type/scalars'; import { GraphQLSchema } from '../../type/schema'; -import { execute, executeSync } from '../execute'; +import { + execute, + executeSync, + experimentalExecuteIncrementally, +} from '../execute'; class NumberHolder { theNumber: number; @@ -50,6 +54,13 @@ class Root { const numberHolderType = new GraphQLObjectType({ fields: { theNumber: { type: GraphQLInt }, + promiseToGetTheNumber: { + type: GraphQLInt, + resolve: async (root) => { + await new Promise((resolve) => setTimeout(resolve, 0)); + return root.theNumber; + }, + }, }, name: 'NumberHolder', }); @@ -191,4 +202,132 @@ describe('Execute: Handles mutation execution ordering', () => { ], }); }); + it('Mutation fields with @defer do not block next mutation', async () => { + const document = parse(` + mutation M { + first: promiseToChangeTheNumber(newNumber: 1) { + ...DeferFragment @defer(label: "defer-label") + }, + second: immediatelyChangeTheNumber(newNumber: 2) { + theNumber + } + } + fragment DeferFragment on NumberHolder { + promiseToGetTheNumber + } + `); + + const rootValue = new Root(6); + const mutationResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue, + }); + const patches = []; + + assert('initialResult' in mutationResult); + patches.push(mutationResult.initialResult); + for await (const patch of mutationResult.subsequentResults) { + patches.push(patch); + } + + expect(patches).to.deep.equal([ + { + data: { + first: {}, + second: { theNumber: 2 }, + }, + hasNext: true, + }, + { + incremental: [ + { + label: 'defer-label', + path: ['first'], + data: { + promiseToGetTheNumber: 2, + }, + }, + ], + hasNext: false, + }, + ]); + }); + it('Mutation inside of a fragment', async () => { + const document = parse(` + mutation M { + ...MutationFragment + second: immediatelyChangeTheNumber(newNumber: 2) { + theNumber + } + } + fragment MutationFragment on Mutation { + first: promiseToChangeTheNumber(newNumber: 1) { + theNumber + }, + } + `); + + const rootValue = new Root(6); + const mutationResult = await execute({ schema, document, rootValue }); + + expect(mutationResult).to.deep.equal({ + data: { + first: { theNumber: 1 }, + second: { theNumber: 2 }, + }, + }); + }); + it('Mutation with @defer is not executed serially', async () => { + const document = parse(` + mutation M { + ...MutationFragment @defer(label: "defer-label") + second: immediatelyChangeTheNumber(newNumber: 2) { + theNumber + } + } + fragment MutationFragment on Mutation { + first: promiseToChangeTheNumber(newNumber: 1) { + theNumber + }, + } + `); + + const rootValue = new Root(6); + const mutationResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue, + }); + const patches = []; + + assert('initialResult' in mutationResult); + patches.push(mutationResult.initialResult); + for await (const patch of mutationResult.subsequentResults) { + patches.push(patch); + } + + expect(patches).to.deep.equal([ + { + data: { + second: { theNumber: 2 }, + }, + hasNext: true, + }, + { + incremental: [ + { + label: 'defer-label', + path: [], + data: { + first: { + theNumber: 1, + }, + }, + }, + ], + hasNext: false, + }, + ]); + }); }); diff --git a/src/execution/__tests__/nonnull-test.ts b/src/execution/__tests__/nonnull-test.ts index 427f2a64d6..60d68c2b90 100644 --- a/src/execution/__tests__/nonnull-test.ts +++ b/src/execution/__tests__/nonnull-test.ts @@ -3,6 +3,8 @@ import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON'; +import type { PromiseOrValue } from '../../jsutils/PromiseOrValue'; + import { parse } from '../../language/parser'; import { GraphQLNonNull, GraphQLObjectType } from '../../type/definition'; @@ -109,7 +111,7 @@ const schema = buildSchema(` function executeQuery( query: string, rootValue: unknown, -): ExecutionResult | Promise { +): PromiseOrValue { return execute({ schema, document: parse(query), rootValue }); } diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts new file mode 100644 index 0000000000..4e6ea364b2 --- /dev/null +++ b/src/execution/__tests__/stream-test.ts @@ -0,0 +1,1455 @@ +import { assert } from 'chai'; +import { describe, it } from 'mocha'; + +import { expectJSON } from '../../__testUtils__/expectJSON'; + +import type { PromiseOrValue } from '../../jsutils/PromiseOrValue'; + +import type { DocumentNode } from '../../language/ast'; +import { parse } from '../../language/parser'; + +import { + GraphQLList, + GraphQLNonNull, + GraphQLObjectType, +} from '../../type/definition'; +import { GraphQLID, GraphQLString } from '../../type/scalars'; +import { GraphQLSchema } from '../../type/schema'; + +import type { + InitialIncrementalExecutionResult, + SubsequentIncrementalExecutionResult, +} from '../execute'; +import { experimentalExecuteIncrementally } from '../execute'; + +const friendType = new GraphQLObjectType({ + fields: { + id: { type: GraphQLID }, + name: { type: GraphQLString }, + nonNullName: { type: new GraphQLNonNull(GraphQLString) }, + }, + name: 'Friend', +}); + +const friends = [ + { name: 'Luke', id: 1 }, + { name: 'Han', id: 2 }, + { name: 'Leia', id: 3 }, +]; + +const query = new GraphQLObjectType({ + fields: { + scalarList: { + type: new GraphQLList(GraphQLString), + }, + scalarListList: { + type: new GraphQLList(new GraphQLList(GraphQLString)), + }, + friendList: { + type: new GraphQLList(friendType), + }, + nonNullFriendList: { + type: new GraphQLList(new GraphQLNonNull(friendType)), + }, + nestedObject: { + type: new GraphQLObjectType({ + name: 'NestedObject', + fields: { + scalarField: { + type: GraphQLString, + }, + nestedFriendList: { type: new GraphQLList(friendType) }, + }, + }), + }, + }, + name: 'Query', +}); + +const schema = new GraphQLSchema({ query }); + +async function complete(document: DocumentNode, rootValue: unknown = {}) { + const result = await experimentalExecuteIncrementally({ + schema, + document, + rootValue, + }); + + if ('initialResult' in result) { + const results: Array< + InitialIncrementalExecutionResult | SubsequentIncrementalExecutionResult + > = [result.initialResult]; + for await (const patch of result.subsequentResults) { + results.push(patch); + } + return results; + } + return result.singleResult; +} + +async function completeAsync( + document: DocumentNode, + numCalls: number, + rootValue: unknown = {}, +) { + const result = await experimentalExecuteIncrementally({ + schema, + document, + rootValue, + }); + + assert('initialResult' in result); + + const iterator = result.subsequentResults[Symbol.asyncIterator](); + + const promises: Array< + PromiseOrValue< + IteratorResult< + InitialIncrementalExecutionResult | SubsequentIncrementalExecutionResult + > + > + > = [{ done: false, value: result.initialResult }]; + for (let i = 0; i < numCalls; i++) { + promises.push(iterator.next()); + } + return Promise.all(promises); +} + +function createResolvablePromise(): [Promise, (value?: T) => void] { + let resolveFn; + const promise = new Promise((resolve) => { + resolveFn = resolve; + }); + return [promise, resolveFn as unknown as (value?: T) => void]; +} + +describe('Execute: stream directive', () => { + it('Can stream a list field', async () => { + const document = parse('{ scalarList @stream(initialCount: 1) }'); + const result = await complete(document, { + scalarList: () => ['apple', 'banana', 'coconut'], + }); + expectJSON(result).toDeepEqual([ + { + data: { + scalarList: ['apple'], + }, + hasNext: true, + }, + { + incremental: [{ items: ['banana'], path: ['scalarList', 1] }], + hasNext: true, + }, + { + incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + hasNext: false, + }, + ]); + }); + it('Can use default value of initialCount', async () => { + const document = parse('{ scalarList @stream }'); + const result = await complete(document, { + scalarList: () => ['apple', 'banana', 'coconut'], + }); + expectJSON(result).toDeepEqual([ + { + data: { + scalarList: [], + }, + hasNext: true, + }, + { + incremental: [{ items: ['apple'], path: ['scalarList', 0] }], + hasNext: true, + }, + { + incremental: [{ items: ['banana'], path: ['scalarList', 1] }], + hasNext: true, + }, + { + incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + hasNext: false, + }, + ]); + }); + it('Negative values of initialCount throw field errors', async () => { + const document = parse('{ scalarList @stream(initialCount: -2) }'); + const result = await complete(document, { + scalarList: () => ['apple', 'banana', 'coconut'], + }); + expectJSON(result).toDeepEqual({ + errors: [ + { + message: 'initialCount must be a positive integer', + locations: [ + { + line: 1, + column: 3, + }, + ], + path: ['scalarList'], + }, + ], + data: { + scalarList: null, + }, + }); + }); + it('Returns label from stream directive', async () => { + const document = parse( + '{ scalarList @stream(initialCount: 1, label: "scalar-stream") }', + ); + const result = await complete(document, { + scalarList: () => ['apple', 'banana', 'coconut'], + }); + expectJSON(result).toDeepEqual([ + { + data: { + scalarList: ['apple'], + }, + hasNext: true, + }, + { + incremental: [ + { + items: ['banana'], + path: ['scalarList', 1], + label: 'scalar-stream', + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: ['coconut'], + path: ['scalarList', 2], + label: 'scalar-stream', + }, + ], + hasNext: false, + }, + ]); + }); + it('Can disable @stream using if argument', async () => { + const document = parse( + '{ scalarList @stream(initialCount: 0, if: false) }', + ); + const result = await complete(document, { + scalarList: () => ['apple', 'banana', 'coconut'], + }); + expectJSON(result).toDeepEqual({ + data: { scalarList: ['apple', 'banana', 'coconut'] }, + }); + }); + it('Does not disable stream with null if argument', async () => { + const document = parse( + 'query ($shouldStream: Boolean) { scalarList @stream(initialCount: 2, if: $shouldStream) }', + ); + const result = await complete(document, { + scalarList: () => ['apple', 'banana', 'coconut'], + }); + expectJSON(result).toDeepEqual([ + { + data: { scalarList: ['apple', 'banana'] }, + hasNext: true, + }, + { + incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + hasNext: false, + }, + ]); + }); + it('Can stream multi-dimensional lists', async () => { + const document = parse('{ scalarListList @stream(initialCount: 1) }'); + const result = await complete(document, { + scalarListList: () => [ + ['apple', 'apple', 'apple'], + ['banana', 'banana', 'banana'], + ['coconut', 'coconut', 'coconut'], + ], + }); + expectJSON(result).toDeepEqual([ + { + data: { + scalarListList: [['apple', 'apple', 'apple']], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [['banana', 'banana', 'banana']], + path: ['scalarListList', 1], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [['coconut', 'coconut', 'coconut']], + path: ['scalarListList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Can stream a field that returns a list of promises', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document, { + friendList: () => friends.map((f) => Promise.resolve(f)), + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [ + { + name: 'Leia', + id: '3', + }, + ], + path: ['friendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Can stream in correct order with lists of promises', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 0) { + name + id + } + } + `); + const result = await complete(document, { + friendList: () => friends.map((f) => Promise.resolve(f)), + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Luke', id: '1' }], + path: ['friendList', 0], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Han', id: '2' }], + path: ['friendList', 1], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles rejections in a field that returns a list of promises before initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document, { + friendList: () => + friends.map((f, i) => { + if (i === 1) { + return Promise.reject(new Error('bad')); + } + return Promise.resolve(f); + }), + }); + expectJSON(result).toDeepEqual([ + { + errors: [ + { + message: 'bad', + locations: [{ line: 3, column: 9 }], + path: ['friendList', 1], + }, + ], + data: { + friendList: [{ name: 'Luke', id: '1' }, null], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles rejections in a field that returns a list of promises after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + name + id + } + } + `); + const result = await complete(document, { + friendList: () => + friends.map((f, i) => { + if (i === 1) { + return Promise.reject(new Error('bad')); + } + return Promise.resolve(f); + }), + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ name: 'Luke', id: '1' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + errors: [ + { + message: 'bad', + locations: [{ line: 3, column: 9 }], + path: ['friendList', 1], + }, + ], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Can stream a field that returns an async iterable', async () => { + const document = parse(` + query { + friendList @stream { + name + id + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(friends[1]); + yield await Promise.resolve(friends[2]); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Luke', id: '1' }], + path: ['friendList', 0], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Han', id: '2' }], + path: ['friendList', 1], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Can stream a field that returns an async iterable, using a non-zero initialCount', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(friends[1]); + yield await Promise.resolve(friends[2]); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + ], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Negative values of initialCount throw field errors on a field that returns an async iterable', async () => { + const document = parse(` + query { + friendList @stream(initialCount: -2) { + name + id + } + } + `); + const result = await complete(document, { + // eslint-disable-next-line @typescript-eslint/no-empty-function + async *friendList() {}, + }); + expectJSON(result).toDeepEqual({ + errors: [ + { + message: 'initialCount must be a positive integer', + locations: [{ line: 3, column: 9 }], + path: ['friendList'], + }, + ], + data: { + friendList: null, + }, + }); + }); + it('Can handle concurrent calls to .next() without waiting', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await completeAsync(document, 3, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(friends[1]); + yield await Promise.resolve(friends[2]); + }, + }); + expectJSON(result).toDeepEqual([ + { + done: false, + value: { + data: { + friendList: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + ], + }, + hasNext: true, + }, + }, + { + done: false, + value: { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + }, + { done: false, value: { hasNext: false } }, + { done: true, value: undefined }, + ]); + }); + it('Handles error thrown in async iterable before initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + throw new Error('bad'); + }, + }); + expectJSON(result).toDeepEqual({ + errors: [ + { + message: 'bad', + locations: [{ line: 3, column: 9 }], + path: ['friendList', 1], + }, + ], + data: { + friendList: [{ name: 'Luke', id: '1' }, null], + }, + }); + }); + it('Handles error thrown in async iterable after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + name + id + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + throw new Error('bad'); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ name: 'Luke', id: '1' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + errors: [ + { + message: 'bad', + locations: [{ line: 3, column: 9 }], + path: ['friendList', 1], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles null returned in non-null list items after initialCount is reached', async () => { + const document = parse(` + query { + nonNullFriendList @stream(initialCount: 1) { + name + } + } + `); + const result = await complete(document, { + nonNullFriendList: () => [friends[0], null], + }); + + expectJSON(result).toDeepEqual([ + { + data: { + nonNullFriendList: [{ name: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: null, + path: ['nonNullFriendList', 1], + errors: [ + { + message: + 'Cannot return null for non-nullable field Query.nonNullFriendList.', + locations: [{ line: 3, column: 9 }], + path: ['nonNullFriendList', 1], + }, + ], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles null returned in non-null async iterable list items after initialCount is reached', async () => { + const document = parse(` + query { + nonNullFriendList @stream(initialCount: 1) { + name + } + } + `); + const result = await complete(document, { + async *nonNullFriendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(null); + yield await Promise.resolve(friends[1]); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + nonNullFriendList: [{ name: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: null, + path: ['nonNullFriendList', 1], + errors: [ + { + message: + 'Cannot return null for non-nullable field Query.nonNullFriendList.', + locations: [{ line: 3, column: 9 }], + path: ['nonNullFriendList', 1], + }, + ], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Han' }], + path: ['nonNullFriendList', 2], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Handles errors thrown by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + scalarList @stream(initialCount: 1) + } + `); + const result = await complete(document, { + async *scalarList() { + yield await Promise.resolve(friends[0].name); + yield await Promise.resolve({}); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + scalarList: ['Luke'], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['scalarList', 1], + errors: [ + { + message: 'String cannot represent value: {}', + locations: [{ line: 3, column: 9 }], + path: ['scalarList', 1], + }, + ], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Handles async errors thrown by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + nonNullFriendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + nonNullFriendList: () => [ + Promise.resolve({ nonNullName: friends[0].name }), + Promise.resolve({ + nonNullName: () => Promise.reject(new Error('Oops')), + }), + Promise.resolve({ nonNullName: friends[1].name }), + ], + }); + expectJSON(result).toDeepEqual([ + { + data: { + nonNullFriendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: null, + path: ['nonNullFriendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['nonNullFriendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ nonNullName: 'Han' }], + path: ['nonNullFriendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles async errors thrown by completeValue after initialCount is reached from async iterable', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + nonNullName + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve({ nonNullName: friends[0].name }); + yield await Promise.resolve({ + nonNullName: () => Promise.reject(new Error('Oops')), + }); + yield await Promise.resolve({ nonNullName: friends[1].name }); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ nonNullName: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [null], + path: ['friendList', 1], + errors: [ + { + message: 'Oops', + locations: [{ line: 4, column: 11 }], + path: ['friendList', 1, 'nonNullName'], + }, + ], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ nonNullName: 'Han' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Handles promises returned by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 1) { + id + name + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(friends[1]); + yield await Promise.resolve({ + id: friends[2].id, + name: () => Promise.resolve(friends[2].name), + }); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [{ id: '1', name: 'Luke' }], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [{ id: '2', name: 'Han' }], + path: ['friendList', 1], + }, + ], + hasNext: true, + }, + { + incremental: [ + { + items: [{ id: '3', name: 'Leia' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Returns payloads in correct order when parent deferred fragment resolves slower than stream', async () => { + const [slowFieldPromise, resolveSlowField] = createResolvablePromise(); + const document = parse(` + query { + nestedObject { + ... DeferFragment @defer + } + } + fragment DeferFragment on NestedObject { + scalarField + nestedFriendList @stream(initialCount: 0) { + name + } + } + `); + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + nestedObject: { + scalarField: () => slowFieldPromise, + async *nestedFriendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(friends[1]); + }, + }, + }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + nestedObject: {}, + }, + hasNext: true, + }); + + const result2Promise = iterator.next(); + resolveSlowField('slow'); + const result2 = await result2Promise; + expectJSON(result2).toDeepEqual({ + value: { + incremental: [ + { + data: { scalarField: 'slow', nestedFriendList: [] }, + path: ['nestedObject'], + }, + ], + hasNext: true, + }, + done: false, + }); + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + value: { + incremental: [ + { + items: [{ name: 'Luke' }], + path: ['nestedObject', 'nestedFriendList', 0], + }, + ], + hasNext: true, + }, + done: false, + }); + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ + value: { + incremental: [ + { + items: [{ name: 'Han' }], + path: ['nestedObject', 'nestedFriendList', 1], + }, + ], + hasNext: true, + }, + done: false, + }); + const result5 = await iterator.next(); + expectJSON(result5).toDeepEqual({ + value: { hasNext: false }, + done: false, + }); + const result6 = await iterator.next(); + expectJSON(result6).toDeepEqual({ + value: undefined, + done: true, + }); + }); + it('Can @defer fields that are resolved after async iterable is complete', async () => { + const [slowFieldPromise, resolveSlowField] = createResolvablePromise(); + const [iterableCompletionPromise, resolveIterableCompletion] = + createResolvablePromise(); + + const document = parse(` + query { + friendList @stream(initialCount: 1, label:"stream-label") { + ...NameFragment @defer(label: "DeferName") @defer(label: "DeferName") + id + } + } + fragment NameFragment on Friend { + name + } + `); + + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve({ + id: friends[1].id, + name: () => slowFieldPromise, + }); + await iterableCompletionPromise; + }, + }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + friendList: [{ id: '1' }], + }, + hasNext: true, + }); + + const result2Promise = iterator.next(); + resolveIterableCompletion(); + const result2 = await result2Promise; + expectJSON(result2).toDeepEqual({ + value: { + incremental: [ + { + data: { name: 'Luke' }, + path: ['friendList', 0], + label: 'DeferName', + }, + { + items: [{ id: '2' }], + path: ['friendList', 1], + label: 'stream-label', + }, + ], + hasNext: true, + }, + done: false, + }); + + const result3Promise = iterator.next(); + resolveSlowField('Han'); + const result3 = await result3Promise; + expectJSON(result3).toDeepEqual({ + value: { + incremental: [ + { + data: { name: 'Han' }, + path: ['friendList', 1], + label: 'DeferName', + }, + ], + hasNext: false, + }, + done: false, + }); + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ + value: undefined, + done: true, + }); + }); + it('Can @defer fields that are resolved before async iterable is complete', async () => { + const [slowFieldPromise, resolveSlowField] = createResolvablePromise(); + const [iterableCompletionPromise, resolveIterableCompletion] = + createResolvablePromise(); + + const document = parse(` + query { + friendList @stream(initialCount: 1, label:"stream-label") { + ...NameFragment @defer(label: "DeferName") @defer(label: "DeferName") + id + } + } + fragment NameFragment on Friend { + name + } + `); + + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve({ + id: friends[1].id, + name: () => slowFieldPromise, + }); + await iterableCompletionPromise; + }, + }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + friendList: [{ id: '1' }], + }, + hasNext: true, + }); + + const result2Promise = iterator.next(); + resolveSlowField('Han'); + const result2 = await result2Promise; + expectJSON(result2).toDeepEqual({ + value: { + incremental: [ + { + data: { name: 'Luke' }, + path: ['friendList', 0], + label: 'DeferName', + }, + { + items: [{ id: '2' }], + path: ['friendList', 1], + label: 'stream-label', + }, + ], + hasNext: true, + }, + done: false, + }); + + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + value: { + incremental: [ + { + data: { name: 'Han' }, + path: ['friendList', 1], + label: 'DeferName', + }, + ], + hasNext: true, + }, + done: false, + }); + const result4Promise = iterator.next(); + resolveIterableCompletion(); + const result4 = await result4Promise; + expectJSON(result4).toDeepEqual({ + value: { hasNext: false }, + done: false, + }); + + const result5 = await iterator.next(); + expectJSON(result5).toDeepEqual({ + value: undefined, + done: true, + }); + }); + it('Returns underlying async iterables when returned generator is returned', async () => { + let returned = false; + let index = 0; + const iterable = { + [Symbol.asyncIterator]: () => ({ + next: () => { + const friend = friends[index++]; + if (!friend) { + return Promise.resolve({ done: true, value: undefined }); + } + return Promise.resolve({ done: false, value: friend }); + }, + return: () => { + returned = true; + }, + }), + }; + + const document = parse(` + query { + friendList @stream(initialCount: 1) { + id + ... @defer { + name + } + } + } + `); + + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + friendList: iterable, + }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + friendList: [ + { + id: '1', + }, + ], + }, + hasNext: true, + }); + const returnPromise = iterator.return(); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + done: true, + value: undefined, + }); + await returnPromise; + assert(returned); + }); + it('Can return async iterable when underlying iterable does not have a return method', async () => { + let index = 0; + const iterable = { + [Symbol.asyncIterator]: () => ({ + next: () => { + const friend = friends[index++]; + if (!friend) { + return Promise.resolve({ done: true, value: undefined }); + } + return Promise.resolve({ done: false, value: friend }); + }, + }), + }; + + const document = parse(` + query { + friendList @stream(initialCount: 1) { + name + id + } + } + `); + + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + friendList: iterable, + }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + friendList: [ + { + id: '1', + name: 'Luke', + }, + ], + }, + hasNext: true, + }); + + const returnPromise = iterator.return(); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + done: true, + value: undefined, + }); + await returnPromise; + }); + it('Returns underlying async iterables when returned generator is thrown', async () => { + let index = 0; + let returned = false; + const iterable = { + [Symbol.asyncIterator]: () => ({ + next: () => { + const friend = friends[index++]; + if (!friend) { + return Promise.resolve({ done: true, value: undefined }); + } + return Promise.resolve({ done: false, value: friend }); + }, + return: () => { + returned = true; + }, + }), + }; + const document = parse(` + query { + friendList @stream(initialCount: 1) { + ... @defer { + name + } + id + } + } + `); + + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + friendList: iterable, + }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + friendList: [ + { + id: '1', + }, + ], + }, + hasNext: true, + }); + + const throwPromise = iterator.throw(new Error('bad')); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + done: true, + value: undefined, + }); + try { + await throwPromise; /* c8 ignore start */ + // Not reachable, always throws + /* c8 ignore stop */ + } catch (e) { + // ignore error + } + assert(returned); + }); +}); diff --git a/src/execution/__tests__/subscribe-test.ts b/src/execution/__tests__/subscribe-test.ts index 793a53f59f..7144d09948 100644 --- a/src/execution/__tests__/subscribe-test.ts +++ b/src/execution/__tests__/subscribe-test.ts @@ -17,7 +17,11 @@ import { GraphQLBoolean, GraphQLInt, GraphQLString } from '../../type/scalars'; import { GraphQLSchema } from '../../type/schema'; import type { ExecutionArgs, ExecutionResult } from '../execute'; -import { createSourceEventStream, subscribe } from '../execute'; +import { + createSourceEventStream, + experimentalSubscribeIncrementally, + subscribe, +} from '../execute'; import { SimplePubSub } from './simplePubSub'; @@ -33,6 +37,10 @@ const EmailType = new GraphQLObjectType({ fields: { from: { type: GraphQLString }, subject: { type: GraphQLString }, + asyncSubject: { + type: GraphQLString, + resolve: (email) => Promise.resolve(email.subject), + }, message: { type: GraphQLString }, unread: { type: GraphQLBoolean }, }, @@ -84,17 +92,26 @@ const emailSchema = new GraphQLSchema({ }), }); -function createSubscription(pubsub: SimplePubSub) { +function createSubscription( + pubsub: SimplePubSub, + variableValues?: { readonly [variable: string]: unknown }, + originalSubscribe: boolean = false, +) { const document = parse(` - subscription ($priority: Int = 0) { + subscription ($priority: Int = 0, $shouldDefer: Boolean = false, $asyncResolver: Boolean = false) { importantEmail(priority: $priority) { email { from subject + ... @include(if: $asyncResolver) { + asyncSubject + } } - inbox { - unread - total + ... @defer(if: $shouldDefer) { + inbox { + unread + total + } } } } @@ -124,7 +141,12 @@ function createSubscription(pubsub: SimplePubSub) { }), }; - return subscribe({ schema: emailSchema, document, rootValue: data }); + return (originalSubscribe ? subscribe : experimentalSubscribeIncrementally)({ + schema: emailSchema, + document, + rootValue: data, + variableValues, + }); } const DummyQueryType = new GraphQLObjectType({ @@ -549,6 +571,45 @@ describe('Subscription Publish Phase', () => { expect(await payload2).to.deep.equal(expectedPayload); }); + it('produces a payload when queried fields are async', async () => { + const pubsub = new SimplePubSub(); + + const subscription = createSubscription(pubsub, { asyncResolver: true }); + assert(isAsyncIterable(subscription)); + + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + asyncSubject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + expect(await subscription.return()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + it('produces a payload per subscription event', async () => { const pubsub = new SimplePubSub(); const subscription = createSubscription(pubsub); @@ -638,6 +699,213 @@ describe('Subscription Publish Phase', () => { }); }); + it('produces additional payloads for subscriptions with @defer', async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub, { + shouldDefer: true, + }); + assert(isAsyncIterable(subscription)); + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + // The previously waited on payload now has a value. + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + }, + }, + hasNext: true, + }, + }); + + // Wait for the next payload from @defer + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + incremental: [ + { + data: { + inbox: { + unread: 1, + total: 2, + }, + }, + path: ['importantEmail'], + }, + ], + hasNext: false, + }, + }); + + // Another new email arrives, after all incrementally delivered payloads are received. + expect( + pubsub.emit({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + }), + ).to.equal(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'hyo@graphql.org', + subject: 'Tools', + }, + }, + }, + hasNext: true, + }, + }); + + // Another new email arrives, before the incrementally delivered payloads from the last email was received. + expect( + pubsub.emit({ + from: 'adam@graphql.org', + subject: 'Important', + message: 'Read me please', + unread: true, + }), + ).to.equal(true); + + // Deferred payload from previous event is received. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + incremental: [ + { + data: { + inbox: { + unread: 2, + total: 3, + }, + }, + path: ['importantEmail'], + }, + ], + hasNext: false, + }, + }); + + // Next payload from last event + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'adam@graphql.org', + subject: 'Important', + }, + }, + }, + hasNext: true, + }, + }); + + // The client disconnects before the deferred payload is consumed. + expect(await subscription.return()).to.deep.equal({ + done: true, + value: undefined, + }); + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + + it('original subscribe function returns errors with @defer', async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription( + pubsub, + { + shouldDefer: true, + }, + true, + ); + assert(isAsyncIterable(subscription)); + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + const errorPayload = { + done: false, + value: { + errors: [ + { + message: + 'Executing this GraphQL operation would unexpectedly produce multiple payloads (due to @defer or @stream directive)', + }, + ], + }, + }; + + // The previously waited on payload now has a value. + expectJSON(await payload).toDeepEqual(errorPayload); + + // Wait for the next payload from @defer + expectJSON(await subscription.next()).toDeepEqual(errorPayload); + + // Another new email arrives, after all incrementally delivered payloads are received. + expect( + pubsub.emit({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + }), + ).to.equal(true); + + // The next waited on payload will have a value. + expectJSON(await subscription.next()).toDeepEqual(errorPayload); + // The next waited on payload will have a value. + expectJSON(await subscription.next()).toDeepEqual(errorPayload); + + // The client disconnects before the deferred payload is consumed. + expectJSON(await subscription.return()).toDeepEqual({ + done: true, + value: undefined, + }); + + // Awaiting a subscription after closing it results in completed results. + expectJSON(await subscription.next()).toDeepEqual({ + done: true, + value: undefined, + }); + }); + it('produces a payload when there are multiple events', async () => { const pubsub = new SimplePubSub(); const subscription = createSubscription(pubsub); diff --git a/src/execution/__tests__/sync-test.ts b/src/execution/__tests__/sync-test.ts index 021f09fa3c..2ea2ce8bd5 100644 --- a/src/execution/__tests__/sync-test.ts +++ b/src/execution/__tests__/sync-test.ts @@ -113,6 +113,24 @@ describe('Execute: synchronously when possible', () => { }); }).to.throw('GraphQL execution failed to complete synchronously.'); }); + + it('throws if encountering async iterable execution', () => { + const doc = ` + query Example { + ...deferFrag @defer(label: "deferLabel") + } + fragment deferFrag on Query { + syncField + } + `; + expect(() => { + executeSync({ + schema, + document: parse(doc), + rootValue: 'rootValue', + }); + }).to.throw('GraphQL execution failed to complete synchronously.'); + }); }); describe('graphqlSync', () => { diff --git a/src/execution/collectFields.ts b/src/execution/collectFields.ts index bd85f73dcc..43d003216b 100644 --- a/src/execution/collectFields.ts +++ b/src/execution/collectFields.ts @@ -13,6 +13,7 @@ import { Kind } from '../language/kinds'; import type { GraphQLObjectType } from '../type/definition'; import { isAbstractType } from '../type/definition'; import { + GraphQLDeferDirective, GraphQLIncludeDirective, GraphQLSkipDirective, } from '../type/directives'; @@ -22,6 +23,16 @@ import { typeFromAST } from '../utilities/typeFromAST'; import { getDirectiveValues } from './values'; +export interface PatchFields { + label: string | undefined; + fields: Map>; +} + +export interface FieldsAndPatches { + fields: Map>; + patches: Array; +} + /** * Given a selectionSet, collects all of the fields and returns them. * @@ -37,8 +48,9 @@ export function collectFields( variableValues: { [variable: string]: unknown }, runtimeType: GraphQLObjectType, selectionSet: SelectionSetNode, -): Map> { +): FieldsAndPatches { const fields = new AccumulatorMap(); + const patches: Array = []; collectFieldsImpl( schema, fragments, @@ -46,9 +58,10 @@ export function collectFields( runtimeType, selectionSet, fields, + patches, new Set(), ); - return fields; + return { fields, patches }; } /** @@ -67,9 +80,16 @@ export function collectSubfields( variableValues: { [variable: string]: unknown }, returnType: GraphQLObjectType, fieldNodes: ReadonlyArray, -): Map> { +): FieldsAndPatches { const subFieldNodes = new AccumulatorMap(); const visitedFragmentNames = new Set(); + + const subPatches: Array = []; + const subFieldsAndPatches = { + fields: subFieldNodes, + patches: subPatches, + }; + for (const node of fieldNodes) { if (node.selectionSet) { collectFieldsImpl( @@ -79,11 +99,12 @@ export function collectSubfields( returnType, node.selectionSet, subFieldNodes, + subPatches, visitedFragmentNames, ); } } - return subFieldNodes; + return subFieldsAndPatches; } // eslint-disable-next-line max-params @@ -94,6 +115,7 @@ function collectFieldsImpl( runtimeType: GraphQLObjectType, selectionSet: SelectionSetNode, fields: AccumulatorMap, + patches: Array, visitedFragmentNames: Set, ): void { for (const selection of selectionSet.selections) { @@ -112,26 +134,51 @@ function collectFieldsImpl( ) { continue; } - collectFieldsImpl( - schema, - fragments, - variableValues, - runtimeType, - selection.selectionSet, - fields, - visitedFragmentNames, - ); + + const defer = getDeferValues(variableValues, selection); + + if (defer) { + const patchFields = new AccumulatorMap(); + collectFieldsImpl( + schema, + fragments, + variableValues, + runtimeType, + selection.selectionSet, + patchFields, + patches, + visitedFragmentNames, + ); + patches.push({ + label: defer.label, + fields: patchFields, + }); + } else { + collectFieldsImpl( + schema, + fragments, + variableValues, + runtimeType, + selection.selectionSet, + fields, + patches, + visitedFragmentNames, + ); + } break; } case Kind.FRAGMENT_SPREAD: { const fragName = selection.name.value; - if ( - visitedFragmentNames.has(fragName) || - !shouldIncludeNode(variableValues, selection) - ) { + + if (!shouldIncludeNode(variableValues, selection)) { + continue; + } + + const defer = getDeferValues(variableValues, selection); + if (visitedFragmentNames.has(fragName) && !defer) { continue; } - visitedFragmentNames.add(fragName); + const fragment = fragments[fragName]; if ( !fragment || @@ -139,21 +186,69 @@ function collectFieldsImpl( ) { continue; } - collectFieldsImpl( - schema, - fragments, - variableValues, - runtimeType, - fragment.selectionSet, - fields, - visitedFragmentNames, - ); + + if (!defer) { + visitedFragmentNames.add(fragName); + } + + if (defer) { + const patchFields = new AccumulatorMap(); + collectFieldsImpl( + schema, + fragments, + variableValues, + runtimeType, + fragment.selectionSet, + patchFields, + patches, + visitedFragmentNames, + ); + patches.push({ + label: defer.label, + fields: patchFields, + }); + } else { + collectFieldsImpl( + schema, + fragments, + variableValues, + runtimeType, + fragment.selectionSet, + fields, + patches, + visitedFragmentNames, + ); + } break; } } } } +/** + * Returns an object containing the `@defer` arguments if a field should be + * deferred based on the experimental flag, defer directive present and + * not disabled by the "if" argument. + */ +function getDeferValues( + variableValues: { [variable: string]: unknown }, + node: FragmentSpreadNode | InlineFragmentNode, +): undefined | { label: string | undefined } { + const defer = getDirectiveValues(GraphQLDeferDirective, node, variableValues); + + if (!defer) { + return; + } + + if (defer.if === false) { + return; + } + + return { + label: typeof defer.label === 'string' ? defer.label : undefined, + }; +} + /** * Determines if a field should be included based on the `@include` and `@skip` * directives, where `@skip` has higher precedence than `@include`. diff --git a/src/execution/execute.ts b/src/execution/execute.ts index c288e5e591..7d38a179d2 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -44,6 +44,7 @@ import { isNonNullType, isObjectType, } from '../type/definition'; +import { GraphQLStreamDirective } from '../type/directives'; import type { GraphQLSchema } from '../type/schema'; import { assertValidSchema } from '../type/validate'; @@ -51,8 +52,13 @@ import { collectFields, collectSubfields as _collectSubfields, } from './collectFields'; +import { flattenAsyncIterable } from './flattenAsyncIterable'; import { mapAsyncIterable } from './mapAsyncIterable'; -import { getArgumentValues, getVariableValues } from './values'; +import { + getArgumentValues, + getDirectiveValues, + getVariableValues, +} from './values'; /* eslint-disable max-params */ // This file contains a lot of such errors but we plan to refactor it anyway @@ -115,6 +121,7 @@ export interface ExecutionContext { typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; errors: Array; + subsequentPayloads: Set; } /** @@ -122,7 +129,9 @@ export interface ExecutionContext { * * - `errors` is included when any errors occurred as a non-empty array. * - `data` is the result of a successful execution of the query. + * - `hasNext` is true if a future payload is expected. * - `extensions` is reserved for adding non-standard properties. + * - `incremental` is a list of the results from defer/stream directives. */ export interface ExecutionResult< TData = ObjMap, @@ -142,6 +151,108 @@ export interface FormattedExecutionResult< extensions?: TExtensions; } +export type ExperimentalExecuteIncrementallyResults< + TData = ObjMap, + TExtensions = ObjMap, +> = + | { singleResult: ExecutionResult } + | { + initialResult: InitialIncrementalExecutionResult; + subsequentResults: AsyncGenerator< + SubsequentIncrementalExecutionResult, + void, + void + >; + }; + +export interface InitialIncrementalExecutionResult< + TData = ObjMap, + TExtensions = ObjMap, +> extends ExecutionResult { + hasNext: boolean; + incremental?: ReadonlyArray>; + extensions?: TExtensions; +} + +export interface FormattedInitialIncrementalExecutionResult< + TData = ObjMap, + TExtensions = ObjMap, +> extends FormattedExecutionResult { + hasNext: boolean; + incremental?: ReadonlyArray>; + extensions?: TExtensions; +} + +export interface SubsequentIncrementalExecutionResult< + TData = ObjMap, + TExtensions = ObjMap, +> { + hasNext: boolean; + incremental?: ReadonlyArray>; + extensions?: TExtensions; +} + +export interface FormattedSubsequentIncrementalExecutionResult< + TData = ObjMap, + TExtensions = ObjMap, +> { + hasNext: boolean; + incremental?: ReadonlyArray>; + extensions?: TExtensions; +} + +export interface IncrementalDeferResult< + TData = ObjMap, + TExtensions = ObjMap, +> extends ExecutionResult { + path?: ReadonlyArray; + label?: string; +} + +export interface FormattedIncrementalDeferResult< + TData = ObjMap, + TExtensions = ObjMap, +> extends FormattedExecutionResult { + path?: ReadonlyArray; + label?: string; +} + +export interface IncrementalStreamResult< + TData = Array, + TExtensions = ObjMap, +> { + errors?: ReadonlyArray; + items?: TData | null; + path?: ReadonlyArray; + label?: string; + extensions?: TExtensions; +} + +export interface FormattedIncrementalStreamResult< + TData = Array, + TExtensions = ObjMap, +> { + errors?: ReadonlyArray; + items?: TData | null; + path?: ReadonlyArray; + label?: string; + extensions?: TExtensions; +} + +export type IncrementalResult< + TData = ObjMap, + TExtensions = ObjMap, +> = + | IncrementalDeferResult + | IncrementalStreamResult; + +export type FormattedIncrementalResult< + TData = ObjMap, + TExtensions = ObjMap, +> = + | FormattedIncrementalDeferResult + | FormattedIncrementalStreamResult; + export interface ExecutionArgs { schema: GraphQLSchema; document: DocumentNode; @@ -154,6 +265,9 @@ export interface ExecutionArgs { subscribeFieldResolver?: Maybe>; } +const UNEXPECTED_MULTIPLE_PAYLOADS = + 'Executing this GraphQL operation would unexpectedly produce multiple payloads (due to @defer or @stream directive)'; + /** * Implements the "Executing requests" section of the GraphQL specification. * @@ -163,15 +277,54 @@ export interface ExecutionArgs { * * If the arguments to this function do not result in a legal execution context, * a GraphQLError will be thrown immediately explaining the invalid input. + * + * This function does not support incremental delivery (`@defer` and `@stream`). + * If an operation which would defer or stream data is executed with this + * function, it will throw or resolve to an object containing an error instead. + * Use `experimentalExecuteIncrementally` if you want to support incremental + * delivery. */ export function execute(args: ExecutionArgs): PromiseOrValue { + const result = experimentalExecuteIncrementally(args); + if (!isPromise(result)) { + if ('singleResult' in result) { + return result.singleResult; + } + throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS); + } + + return result.then((incrementalResult) => { + if ('singleResult' in incrementalResult) { + return incrementalResult.singleResult; + } + return { + errors: [new GraphQLError(UNEXPECTED_MULTIPLE_PAYLOADS)], + }; + }); +} + +/** + * Implements the "Executing requests" section of the GraphQL specification, + * including `@defer` and `@stream` as proposed in + * https://github.com/graphql/graphql-spec/pull/742 + * + * This function returns a Promise of an ExperimentalExecuteIncrementallyResults + * object. This object either contains a single ExecutionResult as + * `singleResult`, or an `initialResult` and a stream of `subsequentResults`. + * + * If the arguments to this function do not result in a legal execution context, + * a GraphQLError will be thrown immediately explaining the invalid input. + */ +export function experimentalExecuteIncrementally( + args: ExecutionArgs, +): PromiseOrValue { // If a valid execution context cannot be created due to incorrect arguments, // a "Response" with only errors is returned. const exeContext = buildExecutionContext(args); // Return early errors if execution context failed. if (!('schema' in exeContext)) { - return { errors: exeContext }; + return { singleResult: { errors: exeContext } }; } return executeImpl(exeContext); @@ -179,7 +332,7 @@ export function execute(args: ExecutionArgs): PromiseOrValue { function executeImpl( exeContext: ExecutionContext, -): PromiseOrValue { +): PromiseOrValue { // Return a Promise that will eventually resolve to the data described by // The "Response" section of the GraphQL specification. // @@ -195,17 +348,39 @@ function executeImpl( const result = executeOperation(exeContext); if (isPromise(result)) { return result.then( - (data) => buildResponse(data, exeContext.errors), + (data) => { + const initialResult = buildResponse(data, exeContext.errors); + if (exeContext.subsequentPayloads.size > 0) { + return { + initialResult: { + ...initialResult, + hasNext: true, + }, + subsequentResults: yieldSubsequentPayloads(exeContext), + }; + } + return { singleResult: initialResult }; + }, (error) => { exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + return { singleResult: buildResponse(null, exeContext.errors) }; }, ); } - return buildResponse(result, exeContext.errors); + const initialResult = buildResponse(result, exeContext.errors); + if (exeContext.subsequentPayloads.size > 0) { + return { + initialResult: { + ...initialResult, + hasNext: true, + }, + subsequentResults: yieldSubsequentPayloads(exeContext), + }; + } + return { singleResult: initialResult }; } catch (error) { exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + return { singleResult: buildResponse(null, exeContext.errors) }; } } @@ -215,14 +390,14 @@ function executeImpl( * that all field resolvers are also synchronous. */ export function executeSync(args: ExecutionArgs): ExecutionResult { - const result = execute(args); + const result = experimentalExecuteIncrementally(args); // Assert that the execution was synchronous. - if (isPromise(result)) { + if (isPromise(result) || 'initialResult' in result) { throw new Error('GraphQL execution failed to complete synchronously.'); } - return result; + return result.singleResult; } /** @@ -321,6 +496,7 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, + subsequentPayloads: new Set(), errors: [], }; } @@ -352,7 +528,7 @@ function executeOperation( ); } - const rootFields = collectFields( + const { fields: rootFields, patches } = collectFields( schema, fragments, variableValues, @@ -360,23 +536,40 @@ function executeOperation( operation.selectionSet, ); const path = undefined; + let result; switch (operation.operation) { case OperationTypeNode.QUERY: - return executeFields(exeContext, rootType, rootValue, path, rootFields); + result = executeFields(exeContext, rootType, rootValue, path, rootFields); + break; case OperationTypeNode.MUTATION: - return executeFieldsSerially( + result = executeFieldsSerially( exeContext, rootType, rootValue, path, rootFields, ); + break; case OperationTypeNode.SUBSCRIPTION: // TODO: deprecate `subscribe` and move all logic here // Temporary solution until we finish merging execute and subscribe together - return executeFields(exeContext, rootType, rootValue, path, rootFields); + result = executeFields(exeContext, rootType, rootValue, path, rootFields); } + + for (const patch of patches) { + const { label, fields: patchFields } = patch; + executeDeferredFragment( + exeContext, + rootType, + rootValue, + patchFields, + label, + path, + ); + } + + return result; } /** @@ -427,6 +620,7 @@ function executeFields( sourceValue: unknown, path: Path | undefined, fields: Map>, + asyncPayloadRecord?: AsyncPayloadRecord, ): PromiseOrValue> { const results = Object.create(null); let containsPromise = false; @@ -439,6 +633,7 @@ function executeFields( sourceValue, fieldNodes, fieldPath, + asyncPayloadRecord, ); if (result !== undefined) { @@ -472,7 +667,9 @@ function executeField( source: unknown, fieldNodes: ReadonlyArray, path: Path, + asyncPayloadRecord?: AsyncPayloadRecord, ): PromiseOrValue { + const errors = asyncPayloadRecord?.errors ?? exeContext.errors; const fieldName = fieldNodes[0].name.value; const fieldDef = exeContext.schema.getField(parentType, fieldName); if (!fieldDef) { @@ -511,7 +708,15 @@ function executeField( let completed; if (isPromise(result)) { completed = result.then((resolved) => - completeValue(exeContext, returnType, fieldNodes, info, path, resolved), + completeValue( + exeContext, + returnType, + fieldNodes, + info, + path, + resolved, + asyncPayloadRecord, + ), ); } else { completed = completeValue( @@ -521,6 +726,7 @@ function executeField( info, path, result, + asyncPayloadRecord, ); } @@ -529,13 +735,13 @@ function executeField( // to take a second callback for the error case. return completed.then(undefined, (rawError) => { const error = locatedError(rawError, fieldNodes, pathToArray(path)); - return handleFieldError(error, returnType, exeContext); + return handleFieldError(error, returnType, errors); }); } return completed; } catch (rawError) { const error = locatedError(rawError, fieldNodes, pathToArray(path)); - return handleFieldError(error, returnType, exeContext); + return handleFieldError(error, returnType, errors); } } @@ -569,7 +775,7 @@ export function buildResolveInfo( function handleFieldError( error: GraphQLError, returnType: GraphQLOutputType, - exeContext: ExecutionContext, + errors: Array, ): null { // If the field type is non-nullable, then it is resolved without any // protection from errors, however it still properly locates the error. @@ -579,7 +785,7 @@ function handleFieldError( // Otherwise, error protection is applied, logging the error and resolving // a null value for this field if one is encountered. - exeContext.errors.push(error); + errors.push(error); return null; } @@ -611,6 +817,7 @@ function completeValue( info: GraphQLResolveInfo, path: Path, result: unknown, + asyncPayloadRecord?: AsyncPayloadRecord, ): PromiseOrValue { // If result is an Error, throw a located error. if (result instanceof Error) { @@ -627,6 +834,7 @@ function completeValue( info, path, result, + asyncPayloadRecord, ); if (completed === null) { throw new Error( @@ -650,6 +858,7 @@ function completeValue( info, path, result, + asyncPayloadRecord, ); } @@ -669,6 +878,7 @@ function completeValue( info, path, result, + asyncPayloadRecord, ); } @@ -681,6 +891,7 @@ function completeValue( info, path, result, + asyncPayloadRecord, ); } /* c8 ignore next 6 */ @@ -691,6 +902,58 @@ function completeValue( ); } +/** + * Returns an object containing the `@stream` arguments if a field should be + * streamed based on the experimental flag, stream directive present and + * not disabled by the "if" argument. + */ +function getStreamValues( + exeContext: ExecutionContext, + fieldNodes: ReadonlyArray, + path: Path, +): + | undefined + | { + initialCount: number | undefined; + label: string | undefined; + } { + // do not stream inner lists of multi-dimensional lists + if (typeof path.key === 'number') { + return; + } + + // validation only allows equivalent streams on multiple fields, so it is + // safe to only check the first fieldNode for the stream directive + const stream = getDirectiveValues( + GraphQLStreamDirective, + fieldNodes[0], + exeContext.variableValues, + ); + + if (!stream) { + return; + } + + if (stream.if === false) { + return; + } + + invariant( + typeof stream.initialCount === 'number', + 'initialCount must be a number', + ); + + invariant( + stream.initialCount >= 0, + 'initialCount must be a positive integer', + ); + + return { + initialCount: stream.initialCount, + label: typeof stream.label === 'string' ? stream.label : undefined, + }; +} + /** * Complete a async iterator value by completing the result and calling * recursively until all the results are completed. @@ -702,12 +965,35 @@ async function completeAsyncIteratorValue( info: GraphQLResolveInfo, path: Path, iterator: AsyncIterator, + asyncPayloadRecord?: AsyncPayloadRecord, ): Promise> { + const errors = asyncPayloadRecord?.errors ?? exeContext.errors; + const stream = getStreamValues(exeContext, fieldNodes, path); let containsPromise = false; const completedResults = []; let index = 0; // eslint-disable-next-line no-constant-condition while (true) { + if ( + stream && + typeof stream.initialCount === 'number' && + index >= stream.initialCount + ) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + executeStreamIterator( + index, + iterator, + exeContext, + fieldNodes, + info, + itemType, + path, + stream.label, + asyncPayloadRecord, + ); + break; + } + const fieldPath = addPath(path, index, undefined); try { // eslint-disable-next-line no-await-in-loop @@ -725,6 +1011,7 @@ async function completeAsyncIteratorValue( info, fieldPath, value, + asyncPayloadRecord, ); if (isPromise(completedItem)) { containsPromise = true; @@ -737,12 +1024,12 @@ async function completeAsyncIteratorValue( fieldNodes, pathToArray(fieldPath), ); - handleFieldError(error, itemType, exeContext); + handleFieldError(error, itemType, errors); } } catch (rawError) { completedResults.push(null); const error = locatedError(rawError, fieldNodes, pathToArray(fieldPath)); - handleFieldError(error, itemType, exeContext); + handleFieldError(error, itemType, errors); break; } index += 1; @@ -761,8 +1048,10 @@ function completeListValue( info: GraphQLResolveInfo, path: Path, result: unknown, + asyncPayloadRecord?: AsyncPayloadRecord, ): PromiseOrValue> { const itemType = returnType.ofType; + const errors = asyncPayloadRecord?.errors ?? exeContext.errors; if (isAsyncIterable(result)) { const iterator = result[Symbol.asyncIterator](); @@ -774,6 +1063,7 @@ function completeListValue( info, path, iterator, + asyncPayloadRecord, ); } @@ -783,15 +1073,39 @@ function completeListValue( ); } + const stream = getStreamValues(exeContext, fieldNodes, path); + // This is specified as a simple map, however we're optimizing the path // where the list contains no Promises by avoiding creating another Promise. let containsPromise = false; - const completedResults = Array.from(result, (item, index) => { + let previousAsyncPayloadRecord = asyncPayloadRecord; + const completedResults = []; + let index = 0; + for (const item of result) { // No need to modify the info object containing the path, // since from here on it is not ever accessed by resolver functions. const itemPath = addPath(path, index, undefined); try { let completedItem; + + if ( + stream && + typeof stream.initialCount === 'number' && + index >= stream.initialCount + ) { + previousAsyncPayloadRecord = executeStreamField( + itemPath, + item, + exeContext, + fieldNodes, + info, + itemType, + stream.label, + previousAsyncPayloadRecord, + ); + index++; + continue; + } if (isPromise(item)) { completedItem = item.then((resolved) => completeValue( @@ -801,6 +1115,7 @@ function completeListValue( info, itemPath, resolved, + asyncPayloadRecord, ), ); } else { @@ -811,6 +1126,7 @@ function completeListValue( info, itemPath, item, + asyncPayloadRecord, ); } @@ -818,21 +1134,25 @@ function completeListValue( containsPromise = true; // Note: we don't rely on a `catch` method, but we do expect "thenable" // to take a second callback for the error case. - return completedItem.then(undefined, (rawError) => { - const error = locatedError( - rawError, - fieldNodes, - pathToArray(itemPath), - ); - return handleFieldError(error, itemType, exeContext); - }); + completedResults.push( + completedItem.then(undefined, (rawError) => { + const error = locatedError( + rawError, + fieldNodes, + pathToArray(itemPath), + ); + return handleFieldError(error, itemType, errors); + }), + ); + } else { + completedResults.push(completedItem); } - return completedItem; } catch (rawError) { const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); - return handleFieldError(error, itemType, exeContext); + completedResults.push(handleFieldError(error, itemType, errors)); } - }); + index++; + } return containsPromise ? Promise.all(completedResults) : completedResults; } @@ -866,6 +1186,7 @@ function completeAbstractValue( info: GraphQLResolveInfo, path: Path, result: unknown, + asyncPayloadRecord?: AsyncPayloadRecord, ): PromiseOrValue> { const resolveTypeFn = returnType.resolveType ?? exeContext.typeResolver; const contextValue = exeContext.contextValue; @@ -887,6 +1208,7 @@ function completeAbstractValue( info, path, result, + asyncPayloadRecord, ), ); } @@ -905,6 +1227,7 @@ function completeAbstractValue( info, path, result, + asyncPayloadRecord, ); } @@ -973,10 +1296,8 @@ function completeObjectValue( info: GraphQLResolveInfo, path: Path, result: unknown, + asyncPayloadRecord?: AsyncPayloadRecord, ): PromiseOrValue> { - // Collect sub-fields to execute to complete this value. - const subFieldNodes = collectSubfields(exeContext, returnType, fieldNodes); - // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather // than continuing execution. @@ -988,12 +1309,13 @@ function completeObjectValue( if (!resolvedIsTypeOf) { throw invalidReturnTypeError(returnType, result, fieldNodes); } - return executeFields( + return collectAndExecuteSubfields( exeContext, returnType, - result, + fieldNodes, path, - subFieldNodes, + result, + asyncPayloadRecord, ); }); } @@ -1003,7 +1325,14 @@ function completeObjectValue( } } - return executeFields(exeContext, returnType, result, path, subFieldNodes); + return collectAndExecuteSubfields( + exeContext, + returnType, + fieldNodes, + path, + result, + asyncPayloadRecord, + ); } function invalidReturnTypeError( @@ -1017,6 +1346,46 @@ function invalidReturnTypeError( ); } +function collectAndExecuteSubfields( + exeContext: ExecutionContext, + returnType: GraphQLObjectType, + fieldNodes: ReadonlyArray, + path: Path, + result: unknown, + asyncPayloadRecord?: AsyncPayloadRecord, +): PromiseOrValue> { + // Collect sub-fields to execute to complete this value. + const { fields: subFieldNodes, patches: subPatches } = collectSubfields( + exeContext, + returnType, + fieldNodes, + ); + + const subFields = executeFields( + exeContext, + returnType, + result, + path, + subFieldNodes, + asyncPayloadRecord, + ); + + for (const subPatch of subPatches) { + const { label, fields: subPatchFieldNodes } = subPatch; + executeDeferredFragment( + exeContext, + returnType, + result, + subPatchFieldNodes, + label, + path, + asyncPayloadRecord, + ); + } + + return subFields; +} + /** * If a resolveType function is not given, then a default resolve behavior is * used which attempts two strategies: @@ -1090,22 +1459,102 @@ export const defaultFieldResolver: GraphQLFieldResolver = * is not an async iterable. * * If the client-provided arguments to this function do not result in a - * compliant subscription, a GraphQL Response (ExecutionResult) with - * descriptive errors and no data will be returned. + * compliant subscription, a GraphQL Response (ExecutionResult) with descriptive + * errors and no data will be returned. * - * If the source stream could not be created due to faulty subscription - * resolver logic or underlying systems, the promise will resolve to a single + * If the source stream could not be created due to faulty subscription resolver + * logic or underlying systems, the promise will resolve to a single * ExecutionResult containing `errors` and no `data`. * * If the operation succeeded, the promise resolves to an AsyncIterator, which * yields a stream of ExecutionResults representing the response stream. * - * Accepts either an object with named arguments, or individual arguments. + * This function does not support incremental delivery (`@defer` and `@stream`). + * If an operation which would defer or stream data is executed with this + * function, each `InitialIncrementalExecutionResult` and + * `SubsequentIncrementalExecutionResult` in the result stream will be replaced + * with an `ExecutionResult` with a single error stating that defer/stream is + * not supported. Use `experimentalSubscribeIncrementally` if you want to + * support incremental delivery. + * + * Accepts an object with named arguments. */ export function subscribe( args: ExecutionArgs, ): PromiseOrValue< AsyncGenerator | ExecutionResult +> { + const maybePromise = experimentalSubscribeIncrementally(args); + if (isPromise(maybePromise)) { + return maybePromise.then((resultOrIterable) => + isAsyncIterable(resultOrIterable) + ? mapAsyncIterable(resultOrIterable, ensureSingleExecutionResult) + : resultOrIterable, + ); + } + return isAsyncIterable(maybePromise) + ? mapAsyncIterable(maybePromise, ensureSingleExecutionResult) + : maybePromise; +} + +function ensureSingleExecutionResult( + result: + | ExecutionResult + | InitialIncrementalExecutionResult + | SubsequentIncrementalExecutionResult, +): ExecutionResult { + if ('hasNext' in result) { + return { + errors: [new GraphQLError(UNEXPECTED_MULTIPLE_PAYLOADS)], + }; + } + return result; +} + +/** + * Implements the "Subscribe" algorithm described in the GraphQL specification, + * including `@defer` and `@stream` as proposed in + * https://github.com/graphql/graphql-spec/pull/742 + * + * Returns a Promise which resolves to either an AsyncIterator (if successful) + * or an ExecutionResult (error). The promise will be rejected if the schema or + * other arguments to this function are invalid, or if the resolved event stream + * is not an async iterable. + * + * If the client-provided arguments to this function do not result in a + * compliant subscription, a GraphQL Response (ExecutionResult) with descriptive + * errors and no data will be returned. + * + * If the source stream could not be created due to faulty subscription resolver + * logic or underlying systems, the promise will resolve to a single + * ExecutionResult containing `errors` and no `data`. + * + * If the operation succeeded, the promise resolves to an AsyncIterator, which + * yields a stream of result representing the response stream. + * + * Each result may be an ExecutionResult with no `hasNext` (if executing the + * event did not use `@defer` or `@stream`), or an + * `InitialIncrementalExecutionResult` or `SubsequentIncrementalExecutionResult` + * (if executing the event used `@defer` or `@stream`). In the case of + * incremental execution results, each event produces a single + * `InitialIncrementalExecutionResult` followed by one or more + * `SubsequentIncrementalExecutionResult`s; all but the last have `hasNext: true`, + * and the last has `hasNext: false`. There is no interleaving between results + * generated from the same original event. + * + * Accepts an object with named arguments. + */ +export function experimentalSubscribeIncrementally( + args: ExecutionArgs, +): PromiseOrValue< + | AsyncGenerator< + | ExecutionResult + | InitialIncrementalExecutionResult + | SubsequentIncrementalExecutionResult, + void, + void + > + | ExecutionResult > { // If a valid execution context cannot be created due to incorrect arguments, // a "Response" with only errors is returned. @@ -1127,11 +1576,35 @@ export function subscribe( return mapSourceToResponse(exeContext, resultOrStream); } +async function* ensureAsyncIterable( + someExecutionResult: ExperimentalExecuteIncrementallyResults, +): AsyncGenerator< + | ExecutionResult + | InitialIncrementalExecutionResult + | SubsequentIncrementalExecutionResult, + void, + void +> { + if ('initialResult' in someExecutionResult) { + yield someExecutionResult.initialResult; + yield* someExecutionResult.subsequentResults; + } else { + yield someExecutionResult.singleResult; + } +} + function mapSourceToResponse( exeContext: ExecutionContext, resultOrStream: ExecutionResult | AsyncIterable, ): PromiseOrValue< - AsyncGenerator | ExecutionResult + | AsyncGenerator< + | ExecutionResult + | InitialIncrementalExecutionResult + | SubsequentIncrementalExecutionResult, + void, + void + > + | ExecutionResult > { if (!isAsyncIterable(resultOrStream)) { return resultOrStream; @@ -1143,8 +1616,12 @@ function mapSourceToResponse( // the GraphQL specification. The `execute` function provides the // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the // "ExecuteQuery" algorithm, for which `execute` is also used. - return mapAsyncIterable(resultOrStream, (payload: unknown) => - executeImpl(buildPerEventExecutionContext(exeContext, payload)), + return flattenAsyncIterable( + mapAsyncIterable(resultOrStream, async (payload: unknown) => + ensureAsyncIterable( + await executeImpl(buildPerEventExecutionContext(exeContext, payload)), + ), + ), ); } @@ -1220,7 +1697,7 @@ function executeSubscription( ); } - const rootFields = collectFields( + const { fields: rootFields } = collectFields( schema, fragments, variableValues, @@ -1294,3 +1771,454 @@ function assertEventStream(result: unknown): AsyncIterable { return result; } + +function executeDeferredFragment( + exeContext: ExecutionContext, + parentType: GraphQLObjectType, + sourceValue: unknown, + fields: Map>, + label?: string, + path?: Path, + parentContext?: AsyncPayloadRecord, +): void { + const asyncPayloadRecord = new DeferredFragmentRecord({ + label, + path, + parentContext, + exeContext, + }); + let promiseOrData; + try { + promiseOrData = executeFields( + exeContext, + parentType, + sourceValue, + path, + fields, + asyncPayloadRecord, + ); + + if (isPromise(promiseOrData)) { + promiseOrData = promiseOrData.then(null, (e) => { + asyncPayloadRecord.errors.push(e); + return null; + }); + } + } catch (e) { + asyncPayloadRecord.errors.push(e); + promiseOrData = null; + } + asyncPayloadRecord.addData(promiseOrData); +} + +function executeStreamField( + path: Path, + item: PromiseOrValue, + exeContext: ExecutionContext, + fieldNodes: ReadonlyArray, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, + label?: string, + parentContext?: AsyncPayloadRecord, +): AsyncPayloadRecord { + const asyncPayloadRecord = new StreamRecord({ + label, + path, + parentContext, + exeContext, + }); + let completedItem: PromiseOrValue; + let completedItems: PromiseOrValue | null>; + try { + try { + if (isPromise(item)) { + completedItem = item.then((resolved) => + completeValue( + exeContext, + itemType, + fieldNodes, + info, + path, + resolved, + asyncPayloadRecord, + ), + ); + } else { + completedItem = completeValue( + exeContext, + itemType, + fieldNodes, + info, + path, + item, + asyncPayloadRecord, + ); + } + + if (isPromise(completedItem)) { + // Note: we don't rely on a `catch` method, but we do expect "thenable" + // to take a second callback for the error case. + completedItem = completedItem.then(undefined, (rawError) => { + const error = locatedError(rawError, fieldNodes, pathToArray(path)); + return handleFieldError(error, itemType, asyncPayloadRecord.errors); + }); + } + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(path)); + completedItems = handleFieldError( + error, + itemType, + asyncPayloadRecord.errors, + ); + } + } catch (error) { + asyncPayloadRecord.errors.push(error); + asyncPayloadRecord.addItems(null); + return asyncPayloadRecord; + } + + if (isPromise(completedItem)) { + completedItems = completedItem.then( + (value) => [value], + (error) => { + asyncPayloadRecord.errors.push(error); + return null; + }, + ); + } else { + completedItems = [completedItem]; + } + + asyncPayloadRecord.addItems(completedItems); + return asyncPayloadRecord; +} + +async function executeStreamIteratorItem( + iterator: AsyncIterator, + exeContext: ExecutionContext, + fieldNodes: ReadonlyArray, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, + asyncPayloadRecord: StreamRecord, + fieldPath: Path, +): Promise> { + let item; + try { + const { value, done } = await iterator.next(); + if (done) { + asyncPayloadRecord.setIsCompletedIterator(); + return { done, value: undefined }; + } + item = value; + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(fieldPath)); + const value = handleFieldError(error, itemType, asyncPayloadRecord.errors); + // don't continue if iterator throws + return { done: true, value }; + } + let completedItem; + try { + completedItem = completeValue( + exeContext, + itemType, + fieldNodes, + info, + fieldPath, + item, + asyncPayloadRecord, + ); + + if (isPromise(completedItem)) { + completedItem = completedItem.then(undefined, (rawError) => { + const error = locatedError( + rawError, + fieldNodes, + pathToArray(fieldPath), + ); + return handleFieldError(error, itemType, asyncPayloadRecord.errors); + }); + } + return { done: false, value: completedItem }; + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(fieldPath)); + const value = handleFieldError(error, itemType, asyncPayloadRecord.errors); + return { done: false, value }; + } +} + +async function executeStreamIterator( + initialIndex: number, + iterator: AsyncIterator, + exeContext: ExecutionContext, + fieldNodes: ReadonlyArray, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, + path?: Path, + label?: string, + parentContext?: AsyncPayloadRecord, +): Promise { + let index = initialIndex; + let previousAsyncPayloadRecord = parentContext ?? undefined; + // eslint-disable-next-line no-constant-condition + while (true) { + const fieldPath = addPath(path, index, undefined); + const asyncPayloadRecord = new StreamRecord({ + label, + path: fieldPath, + parentContext: previousAsyncPayloadRecord, + iterator, + exeContext, + }); + + const dataPromise = executeStreamIteratorItem( + iterator, + exeContext, + fieldNodes, + info, + itemType, + asyncPayloadRecord, + fieldPath, + ); + + asyncPayloadRecord.addItems( + dataPromise + .then(({ value }) => value) + .then( + (value) => [value], + (err) => { + asyncPayloadRecord.errors.push(err); + return null; + }, + ), + ); + try { + // eslint-disable-next-line no-await-in-loop + const { done } = await dataPromise; + if (done) { + break; + } + } catch (err) { + // do nothing, error is already handled above + } + previousAsyncPayloadRecord = asyncPayloadRecord; + index++; + } +} + +function getCompletedIncrementalResults( + exeContext: ExecutionContext, +): Array { + const incrementalResults: Array = []; + for (const asyncPayloadRecord of exeContext.subsequentPayloads) { + const incrementalResult: IncrementalResult = {}; + if (!asyncPayloadRecord.isCompleted) { + continue; + } + exeContext.subsequentPayloads.delete(asyncPayloadRecord); + if (isStreamPayload(asyncPayloadRecord)) { + const items = asyncPayloadRecord.items; + if (asyncPayloadRecord.isCompletedIterator) { + // async iterable resolver just finished but there may be pending payloads + continue; + } + (incrementalResult as IncrementalStreamResult).items = items; + } else { + const data = asyncPayloadRecord.data; + (incrementalResult as IncrementalDeferResult).data = data ?? null; + } + + incrementalResult.path = asyncPayloadRecord.path + ? pathToArray(asyncPayloadRecord.path) + : []; + if (asyncPayloadRecord.label) { + incrementalResult.label = asyncPayloadRecord.label; + } + if (asyncPayloadRecord.errors.length > 0) { + incrementalResult.errors = asyncPayloadRecord.errors; + } + incrementalResults.push(incrementalResult); + } + return incrementalResults; +} + +function yieldSubsequentPayloads( + exeContext: ExecutionContext, +): AsyncGenerator { + let isDone = false; + + async function next(): Promise< + IteratorResult + > { + if (isDone) { + return { value: undefined, done: true }; + } + + await Promise.race( + Array.from(exeContext.subsequentPayloads).map((p) => p.promise), + ); + + if (isDone) { + // a different call to next has exhausted all payloads + return { value: undefined, done: true }; + } + + const incremental = getCompletedIncrementalResults(exeContext); + const hasNext = exeContext.subsequentPayloads.size > 0; + + if (!incremental.length && hasNext) { + return next(); + } + + if (!hasNext) { + isDone = true; + } + + return { + value: incremental.length ? { incremental, hasNext } : { hasNext }, + done: false, + }; + } + + function returnStreamIterators() { + const promises: Array>> = []; + exeContext.subsequentPayloads.forEach((asyncPayloadRecord) => { + if ( + isStreamPayload(asyncPayloadRecord) && + asyncPayloadRecord.iterator?.return + ) { + promises.push(asyncPayloadRecord.iterator.return()); + } + }); + return Promise.all(promises); + } + + return { + [Symbol.asyncIterator]() { + return this; + }, + next, + async return(): Promise< + IteratorResult + > { + await returnStreamIterators(); + isDone = true; + return { value: undefined, done: true }; + }, + async throw( + error?: unknown, + ): Promise> { + await returnStreamIterators(); + isDone = true; + return Promise.reject(error); + }, + }; +} + +class DeferredFragmentRecord { + type: 'defer'; + errors: Array; + label: string | undefined; + path: Path | undefined; + promise: Promise; + data: ObjMap | null; + parentContext: AsyncPayloadRecord | undefined; + isCompleted: boolean; + _exeContext: ExecutionContext; + _resolve?: (arg: PromiseOrValue | null>) => void; + constructor(opts: { + label: string | undefined; + path: Path | undefined; + parentContext: AsyncPayloadRecord | undefined; + exeContext: ExecutionContext; + }) { + this.type = 'defer'; + this.label = opts.label; + this.path = opts.path; + this.parentContext = opts.parentContext; + this.errors = []; + this._exeContext = opts.exeContext; + this._exeContext.subsequentPayloads.add(this); + this.isCompleted = false; + this.data = null; + this.promise = new Promise | null>((resolve) => { + this._resolve = (promiseOrValue) => { + resolve(promiseOrValue); + }; + }).then((data) => { + this.data = data; + this.isCompleted = true; + }); + } + + addData(data: PromiseOrValue | null>) { + const parentData = this.parentContext?.promise; + if (parentData) { + this._resolve?.(parentData.then(() => data)); + return; + } + this._resolve?.(data); + } +} + +class StreamRecord { + type: 'stream'; + errors: Array; + label: string | undefined; + path: Path | undefined; + items: Array | null; + promise: Promise; + parentContext: AsyncPayloadRecord | undefined; + iterator: AsyncIterator | undefined; + isCompletedIterator?: boolean; + isCompleted: boolean; + _exeContext: ExecutionContext; + _resolve?: (arg: PromiseOrValue | null>) => void; + constructor(opts: { + label: string | undefined; + path: Path | undefined; + iterator?: AsyncIterator; + parentContext: AsyncPayloadRecord | undefined; + exeContext: ExecutionContext; + }) { + this.type = 'stream'; + this.items = null; + this.label = opts.label; + this.path = opts.path; + this.parentContext = opts.parentContext; + this.iterator = opts.iterator; + this.errors = []; + this._exeContext = opts.exeContext; + this._exeContext.subsequentPayloads.add(this); + this.isCompleted = false; + this.items = null; + this.promise = new Promise | null>((resolve) => { + this._resolve = (promiseOrValue) => { + resolve(promiseOrValue); + }; + }).then((items) => { + this.items = items; + this.isCompleted = true; + }); + } + + addItems(items: PromiseOrValue | null>) { + const parentData = this.parentContext?.promise; + if (parentData) { + this._resolve?.(parentData.then(() => items)); + return; + } + this._resolve?.(items); + } + + setIsCompletedIterator() { + this.isCompletedIterator = true; + } +} + +type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord; + +function isStreamPayload( + asyncPayload: AsyncPayloadRecord, +): asyncPayload is StreamRecord { + return asyncPayload.type === 'stream'; +} diff --git a/src/execution/flattenAsyncIterable.ts b/src/execution/flattenAsyncIterable.ts new file mode 100644 index 0000000000..22bdb02338 --- /dev/null +++ b/src/execution/flattenAsyncIterable.ts @@ -0,0 +1,105 @@ +type AsyncIterableOrGenerator = + | AsyncGenerator + | AsyncIterable; + +/** + * Given an AsyncIterable of AsyncIterables, flatten all yielded results into a + * single AsyncIterable. + */ +export function flattenAsyncIterable( + iterable: AsyncIterableOrGenerator>, +): AsyncGenerator { + // You might think this whole function could be replaced with + // + // async function* flattenAsyncIterable(iterable) { + // for await (const subIterator of iterable) { + // yield* subIterator; + // } + // } + // + // but calling `.return()` on the iterator it returns won't interrupt the `for await`. + + const topIterator = iterable[Symbol.asyncIterator](); + let currentNestedIterator: AsyncIterator | undefined; + let waitForCurrentNestedIterator: Promise | undefined; + let done = false; + + async function next(): Promise> { + if (done) { + return { value: undefined, done: true }; + } + + try { + if (!currentNestedIterator) { + // Somebody else is getting it already. + if (waitForCurrentNestedIterator) { + await waitForCurrentNestedIterator; + return await next(); + } + // Nobody else is getting it. We should! + let resolve: () => void; + waitForCurrentNestedIterator = new Promise((r) => { + resolve = r; + }); + const topIteratorResult = await topIterator.next(); + if (topIteratorResult.done) { + // Given that done only ever transitions from false to true, + // require-atomic-updates is being unnecessarily cautious. + // eslint-disable-next-line require-atomic-updates + done = true; + return await next(); + } + // eslint is making a reasonable point here, but we've explicitly protected + // ourself from the race condition by ensuring that only the single call + // that assigns to waitForCurrentNestedIterator is allowed to assign to + // currentNestedIterator or waitForCurrentNestedIterator. + // eslint-disable-next-line require-atomic-updates + currentNestedIterator = topIteratorResult.value[Symbol.asyncIterator](); + // eslint-disable-next-line require-atomic-updates + waitForCurrentNestedIterator = undefined; + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + resolve!(); + return await next(); + } + + const rememberCurrentNestedIterator = currentNestedIterator; + const nestedIteratorResult = await currentNestedIterator.next(); + if (!nestedIteratorResult.done) { + return nestedIteratorResult; + } + + // The nested iterator is done. If it's still the current one, make it not + // current. (If it's not the current one, somebody else has made us move on.) + if (currentNestedIterator === rememberCurrentNestedIterator) { + currentNestedIterator = undefined; + } + return await next(); + } catch (err) { + done = true; + throw err; + } + } + return { + next, + async return() { + done = true; + await Promise.all([ + currentNestedIterator?.return?.(), + topIterator.return?.(), + ]); + return { value: undefined, done: true }; + }, + async throw(error?: unknown): Promise> { + done = true; + await Promise.all([ + currentNestedIterator?.throw?.(error), + topIterator.throw?.(error), + ]); + /* c8 ignore next */ + throw error; + }, + [Symbol.asyncIterator]() { + return this; + }, + }; +} diff --git a/src/execution/index.ts b/src/execution/index.ts index b27a2c291c..d0cf442e34 100644 --- a/src/execution/index.ts +++ b/src/execution/index.ts @@ -3,16 +3,29 @@ export { pathToArray as responsePathAsArray } from '../jsutils/Path'; export { createSourceEventStream, execute, + experimentalExecuteIncrementally, executeSync, defaultFieldResolver, defaultTypeResolver, subscribe, + experimentalSubscribeIncrementally, } from './execute'; export type { ExecutionArgs, ExecutionResult, + ExperimentalExecuteIncrementallyResults, + InitialIncrementalExecutionResult, + SubsequentIncrementalExecutionResult, + IncrementalDeferResult, + IncrementalStreamResult, + IncrementalResult, FormattedExecutionResult, + FormattedInitialIncrementalExecutionResult, + FormattedSubsequentIncrementalExecutionResult, + FormattedIncrementalDeferResult, + FormattedIncrementalStreamResult, + FormattedIncrementalResult, } from './execute'; export { diff --git a/src/graphql.ts b/src/graphql.ts index ffad9123c1..3036e60677 100644 --- a/src/graphql.ts +++ b/src/graphql.ts @@ -26,6 +26,8 @@ import { execute } from './execution/execute'; * may wish to separate the validation and execution phases to a static time * tooling step, and a server runtime step. * + * This function does not support incremental delivery (`@defer` and `@stream`). + * * Accepts either an object with named arguments, or individual arguments: * * schema: diff --git a/src/index.ts b/src/index.ts index b3a06a18cd..4cf82a24e5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -62,6 +62,8 @@ export { specifiedDirectives, GraphQLIncludeDirective, GraphQLSkipDirective, + GraphQLDeferDirective, + GraphQLStreamDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // "Enum" of Type Kinds @@ -317,6 +319,7 @@ export type { // Execute GraphQL queries. export { execute, + experimentalExecuteIncrementally, executeSync, defaultFieldResolver, defaultTypeResolver, @@ -325,13 +328,25 @@ export { getVariableValues, getDirectiveValues, subscribe, + experimentalSubscribeIncrementally, createSourceEventStream, } from './execution/index'; export type { ExecutionArgs, ExecutionResult, + ExperimentalExecuteIncrementallyResults, + InitialIncrementalExecutionResult, + SubsequentIncrementalExecutionResult, + IncrementalDeferResult, + IncrementalStreamResult, + IncrementalResult, FormattedExecutionResult, + FormattedInitialIncrementalExecutionResult, + FormattedSubsequentIncrementalExecutionResult, + FormattedIncrementalDeferResult, + FormattedIncrementalStreamResult, + FormattedIncrementalResult, } from './execution/index'; // Validate GraphQL documents. diff --git a/src/type/directives.ts b/src/type/directives.ts index 13f4ad2721..c1df87fa1c 100644 --- a/src/type/directives.ts +++ b/src/type/directives.ts @@ -16,7 +16,7 @@ import { defineArguments, GraphQLNonNull, } from './definition'; -import { GraphQLBoolean, GraphQLString } from './scalars'; +import { GraphQLBoolean, GraphQLInt, GraphQLString } from './scalars'; /** * Test if the given value is a GraphQL directive. @@ -153,6 +153,56 @@ export const GraphQLSkipDirective: GraphQLDirective = new GraphQLDirective({ }, }); +/** + * Used to conditionally defer fragments. + */ +export const GraphQLDeferDirective = new GraphQLDirective({ + name: 'defer', + description: + 'Directs the executor to defer this fragment when the `if` argument is true or undefined.', + locations: [ + DirectiveLocation.FRAGMENT_SPREAD, + DirectiveLocation.INLINE_FRAGMENT, + ], + args: { + if: { + type: new GraphQLNonNull(GraphQLBoolean), + description: 'Deferred when true or undefined.', + defaultValue: true, + }, + label: { + type: GraphQLString, + description: 'Unique name', + }, + }, +}); + +/** + * Used to conditionally stream list fields. + */ +export const GraphQLStreamDirective = new GraphQLDirective({ + name: 'stream', + description: + 'Directs the executor to stream plural fields when the `if` argument is true or undefined.', + locations: [DirectiveLocation.FIELD], + args: { + if: { + type: new GraphQLNonNull(GraphQLBoolean), + description: 'Stream when true or undefined.', + defaultValue: true, + }, + label: { + type: GraphQLString, + description: 'Unique name', + }, + initialCount: { + defaultValue: 0, + type: GraphQLInt, + description: 'Number of items to return immediately', + }, + }, +}); + /** * Constant string used for default reason for a deprecation. */ diff --git a/src/type/index.ts b/src/type/index.ts index 43b867f999..8c3e28e2c7 100644 --- a/src/type/index.ts +++ b/src/type/index.ts @@ -133,6 +133,8 @@ export { specifiedDirectives, GraphQLIncludeDirective, GraphQLSkipDirective, + GraphQLDeferDirective, + GraphQLStreamDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // Constant Deprecation Reason diff --git a/src/validation/__tests__/DeferStreamDirectiveLabelRule-test.ts b/src/validation/__tests__/DeferStreamDirectiveLabelRule-test.ts new file mode 100644 index 0000000000..c00ef14228 --- /dev/null +++ b/src/validation/__tests__/DeferStreamDirectiveLabelRule-test.ts @@ -0,0 +1,171 @@ +import { describe, it } from 'mocha'; + +import { DeferStreamDirectiveLabelRule } from '../rules/DeferStreamDirectiveLabelRule'; + +import { expectValidationErrors } from './harness'; + +function expectErrors(queryStr: string) { + return expectValidationErrors(DeferStreamDirectiveLabelRule, queryStr); +} + +function expectValid(queryStr: string) { + expectErrors(queryStr).toDeepEqual([]); +} + +describe('Validate: Defer/Stream directive on root field', () => { + it('Defer fragments with no label', () => { + expectValid(` + { + dog { + ...dogFragmentA @defer + ...dogFragmentB @defer + } + } + fragment dogFragmentA on Dog { + name + } + fragment dogFragmentB on Dog { + nickname + } + `); + }); + + it('Defer fragments, one with label, one without', () => { + expectValid(` + { + dog { + ...dogFragmentA @defer(label: "fragA") + ...dogFragmentB @defer + } + } + fragment dogFragmentA on Dog { + name + } + fragment dogFragmentB on Dog { + nickname + } + `); + }); + + it('Defer fragment with variable label', () => { + expectErrors(` + query($label: String) { + dog { + ...dogFragmentA @defer(label: $label) + ...dogFragmentB @defer(label: "fragA") + } + } + fragment dogFragmentA on Dog { + name + } + fragment dogFragmentB on Dog { + nickname + } + `).toDeepEqual([ + { + message: 'Directive "defer"\'s label argument must be a static string.', + locations: [{ line: 4, column: 25 }], + }, + ]); + }); + + it('Defer fragments with different labels', () => { + expectValid(` + { + dog { + ...dogFragmentA @defer(label: "fragB") + ...dogFragmentB @defer(label: "fragA") + } + } + fragment dogFragmentA on Dog { + name + } + fragment dogFragmentB on Dog { + nickname + } + `); + }); + it('Defer fragments with same label', () => { + expectErrors(` + { + dog { + ...dogFragmentA @defer(label: "fragA") + ...dogFragmentB @defer(label: "fragA") + } + } + fragment dogFragmentA on Dog { + name + } + fragment dogFragmentB on Dog { + nickname + } + `).toDeepEqual([ + { + message: 'Defer/Stream directive label argument must be unique.', + locations: [ + { line: 4, column: 25 }, + { line: 5, column: 25 }, + ], + }, + ]); + }); + it('Defer and stream with no label', () => { + expectValid(` + { + dog { + ...dogFragment @defer + } + pets @stream(initialCount: 0) @stream { + name + } + } + fragment dogFragment on Dog { + name + } + `); + }); + it('Stream with variable label', () => { + expectErrors(` + query ($label: String!) { + dog { + ...dogFragment @defer + } + pets @stream(initialCount: 0) @stream(label: $label) { + name + } + } + fragment dogFragment on Dog { + name + } + `).toDeepEqual([ + { + message: + 'Directive "stream"\'s label argument must be a static string.', + locations: [{ line: 6, column: 39 }], + }, + ]); + }); + it('Defer and stream with the same label', () => { + expectErrors(` + { + dog { + ...dogFragment @defer(label: "MyLabel") + } + pets @stream(initialCount: 0) @stream(label: "MyLabel") { + name + } + } + fragment dogFragment on Dog { + name + } + `).toDeepEqual([ + { + message: 'Defer/Stream directive label argument must be unique.', + locations: [ + { line: 4, column: 26 }, + { line: 6, column: 39 }, + ], + }, + ]); + }); +}); diff --git a/src/validation/__tests__/DeferStreamDirectiveOnRootFieldRule-test.ts b/src/validation/__tests__/DeferStreamDirectiveOnRootFieldRule-test.ts new file mode 100644 index 0000000000..5798047258 --- /dev/null +++ b/src/validation/__tests__/DeferStreamDirectiveOnRootFieldRule-test.ts @@ -0,0 +1,258 @@ +import { describe, it } from 'mocha'; + +import { buildSchema } from '../../utilities/buildASTSchema'; + +import { DeferStreamDirectiveOnRootFieldRule } from '../rules/DeferStreamDirectiveOnRootFieldRule'; + +import { expectValidationErrorsWithSchema } from './harness'; + +function expectErrors(queryStr: string) { + return expectValidationErrorsWithSchema( + schema, + DeferStreamDirectiveOnRootFieldRule, + queryStr, + ); +} + +function expectValid(queryStr: string) { + expectErrors(queryStr).toDeepEqual([]); +} + +const schema = buildSchema(` + type Message { + body: String + sender: String + } + + type SubscriptionRoot { + subscriptionField: Message + subscriptionListField: [Message] + } + + type MutationRoot { + mutationField: Message + mutationListField: [Message] + } + + type QueryRoot { + message: Message + messages: [Message] + } + + schema { + query: QueryRoot + mutation: MutationRoot + subscription: SubscriptionRoot + } +`); + +describe('Validate: Defer/Stream directive on root field', () => { + it('Defer fragment spread on root query field', () => { + expectValid(` + { + ...rootQueryFragment @defer + } + fragment rootQueryFragment on QueryRoot { + message { + body + } + } + `); + }); + + it('Defer inline fragment spread on root query field', () => { + expectValid(` + { + ... @defer { + message { + body + } + } + } + `); + }); + + it('Defer fragment spread on root mutation field', () => { + expectErrors(` + mutation { + ...rootFragment @defer + } + fragment rootFragment on MutationRoot { + mutationField { + body + } + } + `).toDeepEqual([ + { + message: + 'Defer directive cannot be used on root mutation type "MutationRoot".', + locations: [{ line: 3, column: 25 }], + }, + ]); + }); + it('Defer inline fragment spread on root mutation field', () => { + expectErrors(` + mutation { + ... @defer { + mutationField { + body + } + } + } + `).toDeepEqual([ + { + message: + 'Defer directive cannot be used on root mutation type "MutationRoot".', + locations: [{ line: 3, column: 13 }], + }, + ]); + }); + + it('Defer fragment spread on nested mutation field', () => { + expectValid(` + mutation { + mutationField { + ... @defer { + body + } + } + } + `); + }); + + it('Defer fragment spread on root subscription field', () => { + expectErrors(` + subscription { + ...rootFragment @defer + } + fragment rootFragment on SubscriptionRoot { + subscriptionField { + body + } + } + `).toDeepEqual([ + { + message: + 'Defer directive cannot be used on root subscription type "SubscriptionRoot".', + locations: [{ line: 3, column: 25 }], + }, + ]); + }); + it('Defer inline fragment spread on root subscription field', () => { + expectErrors(` + subscription { + ... @defer { + subscriptionField { + body + } + } + } + `).toDeepEqual([ + { + message: + 'Defer directive cannot be used on root subscription type "SubscriptionRoot".', + locations: [{ line: 3, column: 13 }], + }, + ]); + }); + + it('Defer fragment spread on nested subscription field', () => { + expectValid(` + subscription { + subscriptionField { + ...nestedFragment + } + } + fragment nestedFragment on Message { + body + } + `); + }); + it('Stream field on root query field', () => { + expectValid(` + { + messages @stream { + name + } + } + `); + }); + it('Stream field on fragment on root query field', () => { + expectValid(` + { + ...rootFragment + } + fragment rootFragment on QueryType { + messages @stream { + name + } + } + `); + }); + it('Stream field on root mutation field', () => { + expectErrors(` + mutation { + mutationListField @stream { + name + } + } + `).toDeepEqual([ + { + message: + 'Stream directive cannot be used on root mutation type "MutationRoot".', + locations: [{ line: 3, column: 27 }], + }, + ]); + }); + it('Stream field on fragment on root mutation field', () => { + expectErrors(` + mutation { + ...rootFragment + } + fragment rootFragment on MutationRoot { + mutationListField @stream { + name + } + } + `).toDeepEqual([ + { + message: + 'Stream directive cannot be used on root mutation type "MutationRoot".', + locations: [{ line: 6, column: 27 }], + }, + ]); + }); + it('Stream field on root subscription field', () => { + expectErrors(` + subscription { + subscriptionListField @stream { + name + } + } + `).toDeepEqual([ + { + message: + 'Stream directive cannot be used on root subscription type "SubscriptionRoot".', + locations: [{ line: 3, column: 31 }], + }, + ]); + }); + it('Stream field on fragment on root subscription field', () => { + expectErrors(` + subscription { + ...rootFragment + } + fragment rootFragment on SubscriptionRoot { + subscriptionListField @stream { + name + } + } + `).toDeepEqual([ + { + message: + 'Stream directive cannot be used on root subscription type "SubscriptionRoot".', + locations: [{ line: 6, column: 31 }], + }, + ]); + }); +}); diff --git a/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.ts b/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.ts index 46cf014e46..c3fee6114d 100644 --- a/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.ts +++ b/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.ts @@ -98,6 +98,114 @@ describe('Validate: Overlapping fields can be merged', () => { `); }); + it('Same stream directives supported', () => { + expectValid(` + fragment differentDirectivesWithDifferentAliases on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "streamLabel", initialCount: 1) + } + `); + }); + + it('different stream directive label', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "anotherLabel", initialCount: 1) + } + `).toDeepEqual([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive initialCount', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "streamLabel", initialCount: 2) + } + `).toDeepEqual([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive first missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name @stream(label: "streamLabel", initialCount: 1) + } + `).toDeepEqual([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive second missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream + } + `).toDeepEqual([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('mix of stream and no stream', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name + } + `).toDeepEqual([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive both missing args', () => { + expectValid(` + fragment conflictingArgs on Dog { + name @stream + name @stream + } + `); + }); + it('Same aliases with different field targets', () => { expectErrors(` fragment sameAliasesWithDifferentFieldTargets on Dog { diff --git a/src/validation/__tests__/StreamDirectiveOnListFieldRule-test.ts b/src/validation/__tests__/StreamDirectiveOnListFieldRule-test.ts new file mode 100644 index 0000000000..261f3e6951 --- /dev/null +++ b/src/validation/__tests__/StreamDirectiveOnListFieldRule-test.ts @@ -0,0 +1,79 @@ +import { describe, it } from 'mocha'; + +import { StreamDirectiveOnListFieldRule } from '../rules/StreamDirectiveOnListFieldRule'; + +import { expectValidationErrors } from './harness'; + +function expectErrors(queryStr: string) { + return expectValidationErrors(StreamDirectiveOnListFieldRule, queryStr); +} + +function expectValid(queryStr: string) { + expectErrors(queryStr).toDeepEqual([]); +} + +describe('Validate: Stream directive on list field', () => { + it('Stream on list field', () => { + expectValid(` + fragment objectFieldSelection on Human { + pets @stream(initialCount: 0) { + name + } + } + `); + }); + + it('Stream on non-null list field', () => { + expectValid(` + fragment objectFieldSelection on Human { + relatives @stream(initialCount: 0) { + name + } + } + `); + }); + + it("Doesn't validate other directives on list fields", () => { + expectValid(` + fragment objectFieldSelection on Human { + pets @include(if: true) { + name + } + } + `); + }); + + it("Doesn't validate other directives on non-list fields", () => { + expectValid(` + fragment objectFieldSelection on Human { + pets { + name @include(if: true) + } + } + `); + }); + + it("Doesn't validate misplaced stream directives", () => { + expectValid(` + fragment objectFieldSelection on Human { + ... @stream(initialCount: 0) { + name + } + } + `); + }); + + it('reports errors when stream is used on non-list field', () => { + expectErrors(` + fragment objectFieldSelection on Human { + name @stream(initialCount: 0) + } + `).toDeepEqual([ + { + message: + 'Stream directive cannot be used on non-list field "name" on type "Human".', + locations: [{ line: 3, column: 14 }], + }, + ]); + }); +}); diff --git a/src/validation/__tests__/harness.ts b/src/validation/__tests__/harness.ts index 661256c56d..ced039d6cf 100644 --- a/src/validation/__tests__/harness.ts +++ b/src/validation/__tests__/harness.ts @@ -58,7 +58,7 @@ export const testSchema: GraphQLSchema = buildSchema(` type Human { name(surname: Boolean): String pets: [Pet] - relatives: [Human] + relatives: [Human]! } enum FurColor { diff --git a/src/validation/index.ts b/src/validation/index.ts index 58cc012ee8..bea947b121 100644 --- a/src/validation/index.ts +++ b/src/validation/index.ts @@ -6,6 +6,12 @@ export type { ValidationRule } from './ValidationContext'; // All validation rules in the GraphQL Specification. export { specifiedRules } from './specifiedRules'; +// Spec Section: "Defer And Stream Directive Labels Are Unique" +export { DeferStreamDirectiveLabelRule } from './rules/DeferStreamDirectiveLabelRule'; + +// Spec Section: "Defer And Stream Directives Are Used On Valid Root Field" +export { DeferStreamDirectiveOnRootFieldRule } from './rules/DeferStreamDirectiveOnRootFieldRule'; + // Spec Section: "Executable Definitions" export { ExecutableDefinitionsRule } from './rules/ExecutableDefinitionsRule'; @@ -57,6 +63,9 @@ export { ScalarLeafsRule } from './rules/ScalarLeafsRule'; // Spec Section: "Subscriptions with Single Root Field" export { SingleFieldSubscriptionsRule } from './rules/SingleFieldSubscriptionsRule'; +// Spec Section: "Stream Directives Are Used On List Fields" +export { StreamDirectiveOnListFieldRule } from './rules/StreamDirectiveOnListFieldRule'; + // Spec Section: "Argument Uniqueness" export { UniqueArgumentNamesRule } from './rules/UniqueArgumentNamesRule'; diff --git a/src/validation/rules/DeferStreamDirectiveLabelRule.ts b/src/validation/rules/DeferStreamDirectiveLabelRule.ts new file mode 100644 index 0000000000..2e96714a4a --- /dev/null +++ b/src/validation/rules/DeferStreamDirectiveLabelRule.ts @@ -0,0 +1,55 @@ +import { GraphQLError } from '../../error/GraphQLError'; + +import { Kind } from '../../language/kinds'; +import type { ASTVisitor } from '../../language/visitor'; + +import { + GraphQLDeferDirective, + GraphQLStreamDirective, +} from '../../type/directives'; + +import type { ValidationContext } from '../ValidationContext'; + +/** + * Stream directive on list field + * + * A GraphQL document is only valid if defer and stream directives' label argument is static and unique. + */ +export function DeferStreamDirectiveLabelRule( + context: ValidationContext, +): ASTVisitor { + const knownLabels = Object.create(null); + return { + Directive(node) { + if ( + node.name.value === GraphQLDeferDirective.name || + node.name.value === GraphQLStreamDirective.name + ) { + const labelArgument = node.arguments?.find( + (arg) => arg.name.value === 'label', + ); + const labelValue = labelArgument?.value; + if (!labelValue) { + return; + } + if (labelValue.kind !== Kind.STRING) { + context.reportError( + new GraphQLError( + `Directive "${node.name.value}"'s label argument must be a static string.`, + { nodes: node }, + ), + ); + } else if (knownLabels[labelValue.value]) { + context.reportError( + new GraphQLError( + 'Defer/Stream directive label argument must be unique.', + { nodes: [knownLabels[labelValue.value], node] }, + ), + ); + } else { + knownLabels[labelValue.value] = node; + } + } + }, + }; +} diff --git a/src/validation/rules/DeferStreamDirectiveOnRootFieldRule.ts b/src/validation/rules/DeferStreamDirectiveOnRootFieldRule.ts new file mode 100644 index 0000000000..8bf462936e --- /dev/null +++ b/src/validation/rules/DeferStreamDirectiveOnRootFieldRule.ts @@ -0,0 +1,63 @@ +import { GraphQLError } from '../../error/GraphQLError'; + +import type { ASTVisitor } from '../../language/visitor'; + +import { + GraphQLDeferDirective, + GraphQLStreamDirective, +} from '../../type/directives'; + +import type { ValidationContext } from '../ValidationContext'; + +/** + * Stream directive on list field + * + * A GraphQL document is only valid if defer directives are not used on root mutation or subscription types. + */ +export function DeferStreamDirectiveOnRootFieldRule( + context: ValidationContext, +): ASTVisitor { + return { + Directive(node) { + const mutationType = context.getSchema().getMutationType(); + const subscriptionType = context.getSchema().getSubscriptionType(); + const parentType = context.getParentType(); + if (parentType && node.name.value === GraphQLDeferDirective.name) { + if (mutationType && parentType === mutationType) { + context.reportError( + new GraphQLError( + `Defer directive cannot be used on root mutation type "${parentType.name}".`, + { nodes: node }, + ), + ); + } + if (subscriptionType && parentType === subscriptionType) { + context.reportError( + new GraphQLError( + `Defer directive cannot be used on root subscription type "${parentType.name}".`, + { nodes: node }, + ), + ); + } + } + if (parentType && node.name.value === GraphQLStreamDirective.name) { + if (mutationType && parentType === mutationType) { + context.reportError( + new GraphQLError( + `Stream directive cannot be used on root mutation type "${parentType.name}".`, + { nodes: node }, + ), + ); + } + if (subscriptionType && parentType === subscriptionType) { + context.reportError( + new GraphQLError( + `Stream directive cannot be used on root subscription type "${parentType.name}".`, + { nodes: node }, + ), + ); + } + } + }, + }; +} diff --git a/src/validation/rules/OverlappingFieldsCanBeMergedRule.ts b/src/validation/rules/OverlappingFieldsCanBeMergedRule.ts index 341f768ebe..b26ca1dd9e 100644 --- a/src/validation/rules/OverlappingFieldsCanBeMergedRule.ts +++ b/src/validation/rules/OverlappingFieldsCanBeMergedRule.ts @@ -5,6 +5,7 @@ import type { ObjMap } from '../../jsutils/ObjMap'; import { GraphQLError } from '../../error/GraphQLError'; import type { + DirectiveNode, FieldNode, FragmentDefinitionNode, ObjectValueNode, @@ -601,6 +602,17 @@ function findConflict( } } + // FIXME https://github.com/graphql/graphql-js/issues/2203 + const directives1 = /* c8 ignore next */ node1.directives ?? []; + const directives2 = /* c8 ignore next */ node2.directives ?? []; + if (!sameStreams(directives1, directives2)) { + return [ + [responseName, 'they have differing stream directives'], + [node1], + [node2], + ]; + } + // The return type for each field. const type1 = def1?.type; const type2 = def2?.type; @@ -638,7 +650,7 @@ function findConflict( } } -function stringifyArguments(fieldNode: FieldNode): string { +function stringifyArguments(fieldNode: FieldNode | DirectiveNode): string { // FIXME https://github.com/graphql/graphql-js/issues/2203 const args = /* c8 ignore next */ fieldNode.arguments ?? []; @@ -653,6 +665,29 @@ function stringifyArguments(fieldNode: FieldNode): string { return print(sortValueNode(inputObjectWithArgs)); } +function getStreamDirective( + directives: ReadonlyArray, +): DirectiveNode | undefined { + return directives.find((directive) => directive.name.value === 'stream'); +} + +function sameStreams( + directives1: ReadonlyArray, + directives2: ReadonlyArray, +): boolean { + const stream1 = getStreamDirective(directives1); + const stream2 = getStreamDirective(directives2); + if (!stream1 && !stream2) { + // both fields do not have streams + return true; + } else if (stream1 && stream2) { + // check if both fields have equivalent streams + return stringifyArguments(stream1) === stringifyArguments(stream2); + } + // fields have a mix of stream and no stream + return false; +} + // Two types conflict if both types could not apply to a value simultaneously. // Composite types are ignored as their individual field types will be compared // later recursively. However List and Non-Null types must match. diff --git a/src/validation/rules/SingleFieldSubscriptionsRule.ts b/src/validation/rules/SingleFieldSubscriptionsRule.ts index 01fdc9f966..dff6f3b370 100644 --- a/src/validation/rules/SingleFieldSubscriptionsRule.ts +++ b/src/validation/rules/SingleFieldSubscriptionsRule.ts @@ -41,7 +41,7 @@ export function SingleFieldSubscriptionsRule( fragments[definition.name.value] = definition; } } - const fields = collectFields( + const { fields } = collectFields( schema, fragments, variableValues, diff --git a/src/validation/rules/StreamDirectiveOnListFieldRule.ts b/src/validation/rules/StreamDirectiveOnListFieldRule.ts new file mode 100644 index 0000000000..ef94f3dd6f --- /dev/null +++ b/src/validation/rules/StreamDirectiveOnListFieldRule.ts @@ -0,0 +1,41 @@ +import { GraphQLError } from '../../error/GraphQLError'; + +import type { DirectiveNode } from '../../language/ast'; +import type { ASTVisitor } from '../../language/visitor'; + +import { isListType, isWrappingType } from '../../type/definition'; +import { GraphQLStreamDirective } from '../../type/directives'; + +import type { ValidationContext } from '../ValidationContext'; + +/** + * Stream directive on list field + * + * A GraphQL document is only valid if stream directives are used on list fields. + */ +export function StreamDirectiveOnListFieldRule( + context: ValidationContext, +): ASTVisitor { + return { + Directive(node: DirectiveNode) { + const fieldDef = context.getFieldDef(); + const parentType = context.getParentType(); + if ( + fieldDef && + parentType && + node.name.value === GraphQLStreamDirective.name && + !( + isListType(fieldDef.type) || + (isWrappingType(fieldDef.type) && isListType(fieldDef.type.ofType)) + ) + ) { + context.reportError( + new GraphQLError( + `Stream directive cannot be used on non-list field "${fieldDef.name}" on type "${parentType.name}".`, + { nodes: node }, + ), + ); + } + }, + }; +} diff --git a/src/validation/specifiedRules.ts b/src/validation/specifiedRules.ts index 16e555db8a..9b5db96f3a 100644 --- a/src/validation/specifiedRules.ts +++ b/src/validation/specifiedRules.ts @@ -1,3 +1,7 @@ +// Spec Section: "Defer And Stream Directive Labels Are Unique" +import { DeferStreamDirectiveLabelRule } from './rules/DeferStreamDirectiveLabelRule'; +// Spec Section: "Defer And Stream Directives Are Used On Valid Root Field" +import { DeferStreamDirectiveOnRootFieldRule } from './rules/DeferStreamDirectiveOnRootFieldRule'; // Spec Section: "Executable Definitions" import { ExecutableDefinitionsRule } from './rules/ExecutableDefinitionsRule'; // Spec Section: "Field Selections on Objects, Interfaces, and Unions Types" @@ -41,6 +45,8 @@ import { import { ScalarLeafsRule } from './rules/ScalarLeafsRule'; // Spec Section: "Subscriptions with Single Root Field" import { SingleFieldSubscriptionsRule } from './rules/SingleFieldSubscriptionsRule'; +// Spec Section: "Stream Directives Are Used On List Fields" +import { StreamDirectiveOnListFieldRule } from './rules/StreamDirectiveOnListFieldRule'; import { UniqueArgumentDefinitionNamesRule } from './rules/UniqueArgumentDefinitionNamesRule'; // Spec Section: "Argument Uniqueness" import { UniqueArgumentNamesRule } from './rules/UniqueArgumentNamesRule'; @@ -93,6 +99,9 @@ export const specifiedRules: ReadonlyArray = Object.freeze([ NoUnusedVariablesRule, KnownDirectivesRule, UniqueDirectivesPerLocationRule, + DeferStreamDirectiveOnRootFieldRule, + DeferStreamDirectiveLabelRule, + StreamDirectiveOnListFieldRule, KnownArgumentNamesRule, UniqueArgumentNamesRule, ValuesOfCorrectTypeRule, diff --git a/website/docs/tutorials/defer-stream.md b/website/docs/tutorials/defer-stream.md new file mode 100644 index 0000000000..5235cab4fd --- /dev/null +++ b/website/docs/tutorials/defer-stream.md @@ -0,0 +1,26 @@ +--- +title: Enabling Defer & Stream +sidebar_label: Enabling Defer & Stream +--- + +The `@defer` and `@stream` directives are not enabled by default. In order to use these directives, you must add them to your GraphQL Schema. + +```js +import { + GraphQLSchema, + GraphQLDeferDirective, + GraphQLStreamDirective, + specifiedDirectives, +} from 'graphql'; + +const schema = new GraphQLSchema({ + query, + directives: [ + ...specifiedDirectives, + GraphQLDeferDirective, + GraphQLStreamDirective, + ], +}); +``` + +If the `directives` option is passed to `GraphQLSchema`, the default directives will not be included. `specifiedDirectives` must be passed to ensure all standard directives are added in addition to `defer` & `stream`. diff --git a/website/sidebars.js b/website/sidebars.js index 79fe5e9d8b..5201b4fd95 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -16,5 +16,6 @@ module.exports = { items: ['tutorials/constructing-types'], }, 'tutorials/express-graphql', + 'tutorials/defer-stream', ], };