From bc18f800c45bed5b9218fe502497538f6a1ec049 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 12 Aug 2022 18:45:33 +0300 Subject: [PATCH 1/3] getSubscriber does not return an AsyncGenerator Returns an object that conforms to the AsyncGenerator type signature, but the behavior of the properties do not conform to the AsyncGenerator specification(https://tc39.es/ecma262/#sec-properties-of-asyncgenerator-prototype). In particular, with async generators, all promises returned by next, return, and throw resolve in call order. TS does not include a type that is AsyncGeneratorLike, and AsyncIterableIterator has an optional return, while here return is present, so the best things seems to be to just let TS infer the type. --- src/execution/__tests__/simplePubSub.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/execution/__tests__/simplePubSub.ts b/src/execution/__tests__/simplePubSub.ts index f535ac454b..231595d5b7 100644 --- a/src/execution/__tests__/simplePubSub.ts +++ b/src/execution/__tests__/simplePubSub.ts @@ -18,7 +18,7 @@ export class SimplePubSub { return this._subscribers.size > 0; } - getSubscriber(transform: (value: T) => R): AsyncGenerator { + getSubscriber(transform: (value: T) => R) { const pullQueue: Array<(result: IteratorResult) => void> = []; const pushQueue: Array = []; let listening = true; From e5408a87672df563b1b6cc3e6365bf2c67da6605 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 12 Aug 2022 18:26:55 +0300 Subject: [PATCH 2/3] add repeaters --- src/jsutils/Repeater.ts | 488 ++++++++ src/jsutils/__tests__/Repeater-test.ts | 1500 ++++++++++++++++++++++++ 2 files changed, 1988 insertions(+) create mode 100644 src/jsutils/Repeater.ts create mode 100644 src/jsutils/__tests__/Repeater-test.ts diff --git a/src/jsutils/Repeater.ts b/src/jsutils/Repeater.ts new file mode 100644 index 0000000000..5ead24bed7 --- /dev/null +++ b/src/jsutils/Repeater.ts @@ -0,0 +1,488 @@ +/** + * Implementation from: https://github.com/repeaterjs/repeater/blob/219a0c8faf2c2768d234ecfe8dd21d455a4a98fe/packages/repeater/src/repeater.ts + * Original Author: Brian Kim + * License: MIT + * + * Changes to return() method: + * - Calls to return() should occur sequentially with calls to next just like throw. + * - Calls to return() must be recoverable to mimic finally block in generators. + * - Therefore: return() now simply throws an object of RepeaterClosureSignal class into the Repeater to allow handling of early returns. + * - Unlike throw(), when unhandled, return() simply returns! + * Removal of functionality:: + * - Remove static utility methods. + * - Remove buffer functionality. + * - Remove queue limits. + * Additional minor changes: + * - Use `PromiseOrValue` utility type. + * - Try to avoid a few extra ticks at the expense of readability. + * - Use named arguments for the executor. + * - Separate the stop function from the stopped promise. + * - Use TS enum for internal repeater states. + * - A few TS tweaks. + * - Rename state 'Done' to 'Finished' to match the function. + * - Variable renaming to make spell-check happy. + */ + +import type { PromiseOrValue } from './PromiseOrValue'; + +/** Makes sure promise-likes don't cause unhandled or asynchronously handled rejections. */ +function swallow(value: any): void { + if (value != null && typeof value.then === 'function') { + value.then(undefined, NOOP); + } +} + +/** CLASSES **/ + +/** + * Objects of this class indicate closure of a Repeater. + * @internal + * */ +export class RepeaterClosureSignal { + returnValue: TReturn; + + constructor(returnValue: TReturn) { + this.returnValue = returnValue; + } +} + +/** TYPES **/ +/** The type of the push property passed to the executor callback. */ +export type Push = ( + promiseOrValue: PromiseOrValue, +) => Promise; + +/** The type of the stop property passed to the executor callback. */ +export type Stop = (err?: unknown) => undefined; + +/** The type of the stopped property passed to the executor callback. */ +export type Stopped = Promise; + +export interface RepeaterExecutorOptions { + push: Push; + stop: Stop; + stopped: Promise; +} + +/** The type of the callback passed to the Repeater constructor. */ +export type RepeaterExecutor = ( + options: RepeaterExecutorOptions, +) => PromiseOrValue; + +/** The type of the object passed to the push queue. */ +interface PushOperation { + // The value passed to the push function. + value: Promise; + // The resolve function of the promise returned from push. + resolve: (next?: PromiseLike | TNext) => unknown; +} + +/** The type of the object passed to the next queue. */ +interface NextOperation { + // The value passed to the next method. + value: PromiseLike | TNext | undefined; + // The resolve function of the promise returned from next. + resolve: (iteration: PromiseOrValue>) => unknown; +} + +/** REPEATER STATES **/ + +/** The following is an enumeration of all possible repeater states. These states are ordered, and a repeater may only advance to higher states. */ +enum RepeaterState { + /** The initial state of the repeater. */ + Initial = 0, + /** Repeaters advance to this state the first time the next method is called on the repeater. */ + Started = 1, + /** Repeaters advance to this state when the stop function is called. */ + Stopped = 2, + /** Repeaters advance to this state when there are no values left to be pulled from the repeater. */ + Finished = 3, + /** Repeaters advance to this state if an error is thrown into the repeater. */ + Rejected = 4, +} + +const NOOP = () => undefined; + +/** An interface containing the private data of repeaters, only accessible through a private WeakMap. */ +interface RepeaterRecord { + // A number enum. States are ordered and the repeater will move through these states over the course of its lifetime. See REPEATER STATES. + state: RepeaterState; + + // The function passed to the repeater constructor. + executor: RepeaterExecutor; + + // A queue of values which were pushed. + pushQueue: Array>; + + // A queue of requests for values. + nextQueue: Array>; + // NOTE: both the push queue and the next queue will never contain values at the same time. + + // A promise which is continuously reassigned and chained so that all repeater iterations settle in order. + pending: Promise | undefined; + + // The return value of the executor. + execution: Promise | undefined; + + // An error passed to the stop function. + err: unknown; + + // A callback set to the resolve function of the promise returned from push. + onNext: (value?: PromiseLike | TNext) => unknown; + + // A callback set to the resolve function of the stopped promise. + onStopped: (value?: any) => unknown; +} + +/** A helper function used to mimic the behavior of async generators where the final iteration is consumed. */ +function consumeExecution( + r: RepeaterRecord, +): Promise { + const err = r.err; + + const execution: Promise = Promise.resolve(r.execution) + .then((value) => { + if (err != null) { + throw err; + } + + return value; + }) + .catch((possibleClosureSignal) => { + if (possibleClosureSignal instanceof RepeaterClosureSignal) { + return possibleClosureSignal.returnValue; + } + throw possibleClosureSignal; + }); + r.execution = execution.then( + () => undefined, + () => undefined, + ); + + r.err = undefined; + + return r.pending === undefined ? execution : r.pending.then(() => execution); +} + +/** Helper functions for building iterations from values. Promises are unwrapped, so that iterations never have their value property set to a promise. */ +async function createIteration( + r: RepeaterRecord, + value: Promise, +): Promise> { + const done = r.state >= RepeaterState.Finished; + + const resolvedValue = await value; + + if (!done && r.state >= RepeaterState.Rejected) { + // If the repeater entered the 'Rejected' state while awaiting the value, the just resolved value is swallowed. + const finalValue = await consumeExecution(r); + return { + // This cast is necessary because if the executor was already consumed, re-consuming returns undefined. Type safety is only strictly guaranteed (with respect to undefined) in the case of the first final iteration produced! + value: finalValue as TReturn, + done: true, + }; + } + + // if the resolvedValue is of type TReturn, done is true. + return { value: resolvedValue, done } as IteratorResult; +} + +/** + * This function is bound and passed to the executor as the stop argument. + * + * Advances state to Stopped. + */ +function stop( + r: RepeaterRecord, + err?: unknown, +): void { + if (r.state >= RepeaterState.Stopped) { + return; + } + + r.state = RepeaterState.Stopped; + r.onNext(); + r.onStopped(); + if (r.err == null) { + r.err = err; + } + + if (r.pushQueue.length === 0) { + finish(r); + } else { + for (const pendingPush of r.pushQueue) { + pendingPush.resolve(); + } + } +} + +/** + * The difference between stopping a repeater vs finishing a repeater is that stopping a repeater allows next to continue to drain values from the push queue, while finishing a repeater will clear all pending values and end iteration immediately. Once, a repeater is finished, all iterations will have the done property set to true. + * + * Advances state to Finished. + */ +function finish(r: RepeaterRecord): void { + if (r.state >= RepeaterState.Finished) { + return; + } + + if (r.state < RepeaterState.Stopped) { + stop(r); + } + + r.state = RepeaterState.Finished; + for (const next of r.nextQueue) { + const execution: Promise = + r.pending === undefined + ? consumeExecution(r) + : r.pending.then(() => consumeExecution(r)); + next.resolve(createIteration(r, execution)); + } + + r.pushQueue = []; + r.nextQueue = []; +} + +/** + * Called when a promise passed to push rejects, or when a push call is unhandled. + * + * Advances state to Rejected. + */ +function reject(r: RepeaterRecord): void { + if (r.state >= RepeaterState.Rejected) { + return; + } + + if (r.state < RepeaterState.Finished) { + finish(r); + } + + r.state = RepeaterState.Rejected; +} + +/** This function is bound and passed to the executor as the push argument. */ +function push( + r: RepeaterRecord, + value: PromiseOrValue, +): Promise { + swallow(value); + if (r.state >= RepeaterState.Stopped) { + return Promise.resolve(undefined); + } + + let valueP: Promise = + r.pending === undefined + ? Promise.resolve(value) + : r.pending.then(() => value); + + valueP = valueP.catch((err) => { + if (r.state < RepeaterState.Stopped) { + r.err = err; + } + + reject(r); + return undefined; // void :( + }); + + let nextP: Promise; + const [pendingNext, ...nextQueue] = r.nextQueue; + if (pendingNext) { + r.nextQueue = nextQueue; + pendingNext.resolve(createIteration(r, valueP)); + if (nextQueue.length) { + nextP = Promise.resolve(nextQueue[0].value); + } else { + nextP = new Promise((resolve) => (r.onNext = resolve)); + } + } else { + nextP = new Promise((resolve) => + r.pushQueue.push({ resolve, value: valueP }), + ); + } + + // If an error is thrown into the repeater via the next, throw, or return methods, we give the repeater a chance to handle this by rejecting the promise returned from push. If the push call is not immediately handled we throw the next iteration of the repeater. + // To check that the originalPromise is floating, we modify the then and catch methods of the returned promise so that they flip the floating flag. This function actually does not return a promise, because modern engines do not call the then and catch methods on native promises. By making the returned promise a plain old javascript object, we ensure that the then and catch methods will be called. + let floating = true; + const unhandled = nextP.catch((err) => { + if (floating) { + throw err; + } + + return undefined; // void :( + }); + swallow(unhandled); + + const next = {} as Promise; + next.then = (onfulfilled, onrejected): any => { + floating = false; + return Promise.prototype.then.call(nextP, onfulfilled, onrejected); + }; + + next.catch = (onrejected): any => { + floating = false; + return Promise.prototype.catch.call(nextP, onrejected); + }; + + next.finally = nextP.finally.bind(nextP); + + r.pending = valueP + .then(() => unhandled) + .catch((err) => { + r.err = err; + reject(r); + }); + + return next; +} + +/** + * Calls the executor passed into the constructor. This function is called the first time the next method is called on the repeater. + * + * Advances state to Started. + */ +function execute( + r: RepeaterRecord, +): void { + r.state = RepeaterState.Started; + const push1 = (push as typeof push).bind(null, r); + const stop1 = stop.bind(null, r) as Stop; + const stopped1 = new Promise((resolve) => (r.onStopped = resolve)); + // See: https://stackoverflow.com/questions/26711243/promise-resolve-vs-new-promiseresolve + try { + r.execution = Promise.resolve( + r.executor({ + push: push1, + stop: stop1, + stopped: stopped1, + }), + ); + } catch (err) { + r.execution = Promise.reject(err); + } + // TODO: We should consider stopping all repeaters when the executor settles. + r.execution.catch(() => stop(r)); +} + +type RecordMap = WeakMap< + Repeater, + RepeaterRecord +>; + +let records: RecordMap; + +function createRepeaterRecord( + repeater: Repeater, + executor: RepeaterExecutor, +): void { + if (records === undefined) { + records = new WeakMap(); + } + + records.set(repeater, { + executor, + err: undefined, + state: RepeaterState.Initial, + pushQueue: [], + nextQueue: [], + pending: undefined, + execution: undefined, + onNext: NOOP, + onStopped: NOOP, + }); +} + +function getRepeaterRecord( + repeater: Repeater, +): RepeaterRecord { + const r = records.get(repeater); + + if (r === undefined) { + throw new Error('WeakMap error'); + } + + return r; +} + +/** + * An error subclass which is thrown when there are too many pending push or next operations on a single repeater. + * NOTE: While repeaters implement and are assignable to the AsyncGenerator interface, and you can use the types interchangeably, we don't use typescript's implements syntax here because this would make supporting earlier versions of typescript trickier. This is because TypeScript version 3.6 changed the iterator types by adding the TReturn and TNext type parameters. + * + * @internal + */ +export class Repeater { + constructor(executor: RepeaterExecutor) { + createRepeaterRecord(this, executor); + } + + next( + value?: PromiseLike | TNext, + ): Promise> { + swallow(value); + const r = getRepeaterRecord(this); + + if (r.state <= RepeaterState.Initial) { + execute(r); + } + + // Call existing next handler with the passed value. + r.onNext(value); + + const [pendingPush, ...pushQueue] = r.pushQueue; + // If the push queue is not empty, we return a promise that resolves when the last pushed value resolves. + if (pendingPush) { + // Pop the next push operation from the queue. + r.pushQueue = pushQueue; + + // Reset the next handler. + r.onNext = pendingPush.resolve; + + // Return the value. + return createIteration(r, pendingPush.value); + } + + if (r.state >= RepeaterState.Stopped) { + finish(r); + return createIteration(r, consumeExecution(r)); + } + + return new Promise((resolve) => r.nextQueue.push({ resolve, value })); + } + + return( + value?: PromiseLike | TReturn, + ): Promise> { + const r = getRepeaterRecord(this); + + if (r.state <= RepeaterState.Initial || r.state >= RepeaterState.Stopped) { + finish(r); + + return createIteration( + r, + // We override the execution because return should always return the value passed in. + consumeExecution(r).then(() => value), + ); + } + + return this.next(Promise.reject(new RepeaterClosureSignal(value))); + } + + throw(err: unknown): Promise> { + const r = getRepeaterRecord(this); + + if (r.state <= RepeaterState.Initial || r.state >= RepeaterState.Stopped) { + finish(r); + // If r.err is already set, that mean the repeater has already produced an error, so we throw that error rather than the error passed in, because doing so might be more informative for the caller. + if (r.err == null) { + r.err = err; + } + + return createIteration(r, consumeExecution(r)); + } + + return this.next(Promise.reject(err)); + } + + [Symbol.asyncIterator](): this { + return this; + } +} diff --git a/src/jsutils/__tests__/Repeater-test.ts b/src/jsutils/__tests__/Repeater-test.ts new file mode 100644 index 0000000000..496f44ada9 --- /dev/null +++ b/src/jsutils/__tests__/Repeater-test.ts @@ -0,0 +1,1500 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { expectPromise } from '../../__testUtils__/expectPromise'; + +import { Repeater } from '../Repeater'; + +function fn() { + const callArgs: Array> = []; + const mock = (...args: Array) => { + callArgs.push(args); + }; + mock.callArgs = callArgs; + return mock; +} + +function expectMock(mock: ReturnType) { + return { + toHaveBeenCalledTimes(i: number) { + expect(mock.callArgs.length).to.equal(i); + }, + toHaveBeenCalledWith(...values: Array) { + expect( + mock.callArgs.some((args) => { + for (const [index, value] of values.entries()) { + if (args[index] !== value) { + return false; + } + } + return true; + }), + ).to.equal(true); + }, + }; +} + +function spyOn( + object: { [key in T]: Function }, + methodName: T, +) { + const originalFn = object[methodName].bind(object); + let count = 0; + object[methodName] = (...args: Array) => { + count++; + return originalFn(...args); + }; + return { + count, + }; +} + +function expectSpy(spy: ReturnType) { + return { + toHaveBeenCalledTimes(i: number) { + return spy.count === i; + }, + }; +} + +function delayPromise( + wait: number, + value?: T, + error?: Error, +): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + if (error == null) { + resolve(value); + } else { + reject(error); + } + }, wait); + }); +} + +describe('Repeater', () => { + it('push', async () => { + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + }); + + it('async push', async () => { + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + await push(3); + await push(4); /* c8 ignore start */ + }); /* c8 ignore stop */ + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + }); + + it('push promises', async () => { + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(3)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(4)); + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + }); + + it('async push promises', async () => { + const r = new Repeater(async ({ push }) => { + await push(Promise.resolve(1)); + await push(Promise.resolve(2)); + await push(Promise.resolve(3)); + await push(Promise.resolve(4)); /* c8 ignore start */ + }); /* c8 ignore stop */ + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + }); + + it('push delayed promises', async () => { + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(5, 1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(10, 4)); + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + }); + + it('async push delayed promises', async () => { + const r = new Repeater(async ({ push }) => { + await push(delayPromise(5, 1)); + await push(Promise.resolve(2)); + await push(3); + await push(delayPromise(10, 4)); /* c8 ignore start */ + }); /* c8 ignore stop */ + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + }); + + it('push rejection', async () => { + const error = new Error('push rejection'); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(4)); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith('push rejection'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('async push rejection', async () => { + const error = new Error('async push rejection'); + const r = new Repeater(async ({ push }) => { + await push(Promise.resolve(1)); + await push(Promise.resolve(2)); + await push(Promise.reject(error)); + await push(Promise.resolve(4)); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith('async push rejection'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push rejection immediately', async () => { + const error = new Error('push rejection immediately'); + const r = new Repeater(({ push }) => push(Promise.reject(error))); + await expectPromise(r.next()).toRejectWith('push rejection immediately'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push delayed rejection', async () => { + const error = new Error('push delayed rejection'); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(5, 1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(1, null, error)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + return -1; + }); + + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith('push delayed rejection'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('async push delayed rejection', async () => { + const error = new Error('async push delayed rejection'); + const r = new Repeater(async ({ push }) => { + await push(delayPromise(5, 1)); + await push(Promise.resolve(2)); + await push(delayPromise(1, null, error)); + await push(4); + return -1; + }); + + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith('async push delayed rejection'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push multiple rejections', async () => { + const error1 = new Error('push multiple rejections 1'); + const error2 = new Error('push multiple rejections 2'); + const error3 = new Error('push multiple rejections 3'); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error3)); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith('push multiple rejections 1'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('stop', async () => { + const r = new Repeater(({ stop }) => { + stop(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('stop with error', async () => { + const error = new Error('stop with error'); + const r = new Repeater(({ stop }) => { + stop(error); + return -1; + }); + await expectPromise(r.next()).toRejectWith('stop with error'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push then stop', async () => { + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + stop(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('async push and stop', async () => { + const r = new Repeater(async ({ push, stop }) => { + await push(1); + await push(2); + await push(3); + await push(4); + stop(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push and stop with error', async () => { + const error = new Error('push and stop with error'); + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + stop(error); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + await expectPromise(r.next()).toRejectWith('push and stop with error'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('async push and stop with error', async () => { + const error = new Error('async push and stop with error'); + const r = new Repeater(async ({ push, stop }) => { + await push(1); + await push(2); + await push(3); + await push(4); + stop(error); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + await expectPromise(r.next()).toRejectWith( + 'async push and stop with error', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push promise and stop', async () => { + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(3)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + stop(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push delayed promise and stop', async () => { + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(10, 3)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + stop(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push rejection and stop', async () => { + const error = new Error('push rejection and stop'); + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + stop(); + return -1; + }); + + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('push delayed rejection and stop', async () => { + const error = new Error('push delayed rejection and stop'); + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(50, null, error)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + stop(); + return -1; + }); + + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('async push rejection and stop with error', async () => { + const error1 = new Error('async push rejection and stop with error 1'); + const error2 = new Error('async push rejection and stop with error 2'); + const r = new Repeater(async ({ push, stop }) => { + await push(1); + await push(2); + await push(Promise.reject(error1)); + await push(4); + stop(error2); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith( + 'async push rejection and stop with error 1', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('async push delayed promises and stop with pending next', async () => { + const r = new Repeater(async ({ push, stop }) => { + await push(delayPromise(50, 1)); + await push(delayPromise(50, 2)); + stop(); + return -1; + }); + const result1 = r.next(); + const result2 = r.next(); + const result3 = r.next(); + expect(await result1).to.deep.equal({ value: 1, done: false }); + expect(await result2).to.deep.equal({ value: 2, done: false }); + expect(await result3).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('awaiting stop promise', async () => { + const mock = fn(); + const r = new Repeater(async ({ push, stop, stopped }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + setTimeout(() => stop()); + await stopped; + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + mock(); + }); + expect(await r.next()).to.deep.equal({ done: false, value: 1 }); + expect(await r.next()).to.deep.equal({ done: false, value: 2 }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expectMock(mock).toHaveBeenCalledTimes(1); + }); + + it('throw error in executor', async () => { + const error = new Error('throw error in executor'); + const r = new Repeater(() => { + throw error; + }); + await expectPromise(r.next()).toRejectWith('throw error in executor'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error in executor after push', async () => { + const error = new Error('throw error in executor after push'); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + throw error; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + await expectPromise(r.next()).toRejectWith( + 'throw error in executor after push', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error in executor after async push', async () => { + const error = new Error('throw error in executor after async push'); + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + await push(3); + await push(4); + throw error; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + await expectPromise(r.next()).toRejectWith( + 'throw error in executor after async push', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error in executor after push and stop', async () => { + const error = new Error('throw error in executor after push and stop'); + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + stop(); + throw error; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + await expectPromise(r.next()).toRejectWith( + 'throw error in executor after push and stop', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error in executor after async push and stop', async () => { + const error = new Error( + 'throw error in executor after async push and stop', + ); + const r = new Repeater(async ({ push, stop }) => { + await push(1); + await push(2); + await push(3); + await push(4); + stop(); + throw error; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + await expectPromise(r.next()).toRejectWith( + 'throw error in executor after async push and stop', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error after stop with error', async () => { + const error1 = new Error('throw error after stop with error 1'); + const error2 = new Error('throw error after stop with error 2'); + const r = new Repeater(({ stop }) => { + stop(error1); + throw error2; + }); + await expectPromise(r.next()).toRejectWith( + 'throw error after stop with error 2', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error after stop with error and delay', async () => { + const error1 = new Error('throw error after stop with error and delay 1'); + const error2 = new Error('throw error after stop with error and delay 2'); + const r = new Repeater(async ({ stop }) => { + stop(error1); + await delayPromise(10); + throw error2; + }); + await expectPromise(r.next()).toRejectWith( + 'throw error after stop with error and delay 2', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error after pushing rejection', async () => { + const error1 = new Error('throw error after pushing rejection 1'); + const error2 = new Error('throw error after pushing rejection 2'); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + throw error2; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith( + 'throw error after pushing rejection 2', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error after async pushing rejection', async () => { + const error1 = new Error('throw error after async pushing rejection 1'); + const error2 = new Error('throw error after async pushing rejection 2'); + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + await push(Promise.reject(error1)); + await push(4); + throw error2; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith( + 'throw error after async pushing rejection 2', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw error after stopping with error and pushing rejection', async () => { + const error1 = new Error( + 'throw error after stopping with error and pushing rejection 1', + ); + const error2 = new Error( + 'throw error after stopping with error and pushing rejection 2', + ); + const error3 = new Error( + 'throw error after stopping with error and pushing rejection 3', + ); + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + stop(error2); + throw error3; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith( + 'throw error after stopping with error and pushing rejection 3', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return rejection from executor', async () => { + const error = new Error('return rejection from executor'); + const r = new Repeater(() => Promise.reject(error)); + await expectPromise(r.next()).toRejectWith( + 'return rejection from executor', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return rejection from executor after async pushes', async () => { + const error = new Error( + 'return rejection from executor after async pushes', + ); + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + return Promise.reject(error); + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + await expectPromise(r.next()).toRejectWith( + 'return rejection from executor after async pushes', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('ignored repeater', () => { + const mock = fn(); + // eslint-disable-next-line no-new + new Repeater(() => mock()); + expectMock(mock).toHaveBeenCalledTimes(0); + }); + + it('pushes await next', async () => { + const mock = fn(); + const r = new Repeater(async ({ push }) => { + for (let i = 0; i < 100; i++) { + // eslint-disable-next-line no-await-in-loop + mock(await push(i)); /* c8 ignore start */ + } + }); /* c8 ignore stop */ + expect(await r.next()).to.deep.equal({ value: 0, done: false }); + expectMock(mock).toHaveBeenCalledTimes(0); + for (let i = 1; i < 50; i++) { + expectMock(mock).toHaveBeenCalledTimes(i - 1); + // eslint-disable-next-line no-await-in-loop + expect(await r.next(i)).to.deep.equal({ + value: i, + done: false, + }); + expectMock(mock).toHaveBeenCalledWith(i); + expectMock(mock).toHaveBeenCalledTimes(i); + } + + expect(await r.next()).to.deep.equal({ value: 50, done: false }); + expectMock(mock).toHaveBeenCalledTimes(50); + await delayPromise(1); + expectMock(mock).toHaveBeenCalledTimes(50); + }); + + it('pushes resolve to value passed to next', async () => { + let push!: (value: unknown) => Promise; + const r = new Repeater(({ push: push1 }) => (push = push1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-4); + const push1 = push(1); + const push2 = push(2); + const push3 = push(3); + const push4 = push(4); + expect(await push1).to.deep.equal(-2); + expect(await push2).to.deep.equal(-3); + expect(await push3).to.deep.equal(-4); + expect(await Promise.race([push4, delayPromise(100, -1000)])).to.deep.equal( + -1000, + ); + }); + + it('pushes resolve to value passed to next alternating', async () => { + let push!: (value: unknown) => Promise; + const r = new Repeater(({ push: push1 }) => (push = push1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-1); + const push1 = push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-2); + const push2 = push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-3); + const push3 = push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-4); + const push4 = push(4); + expect(await push1).to.deep.equal(-2); + expect(await push2).to.deep.equal(-3); + expect(await push3).to.deep.equal(-4); + expect(await Promise.race([push4, delayPromise(100, -1000)])).to.deep.equal( + -1000, + ); + }); + + it('pushes resolve to value passed to next irregular', async () => { + let push!: (value: unknown) => Promise; + const r = new Repeater(({ push: push1 }) => (push = push1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-1); + const push1 = push(1); + const push2 = push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-3); + const push3 = push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-4); + const push4 = push(4); + expect(await push1).to.deep.equal(-2); + expect(await push2).to.deep.equal(-3); + expect(await push3).to.deep.equal(-4); + expect(await Promise.race([push4, delayPromise(1, -1000)])).to.deep.equal( + -1000, + ); + }); + + it('pushes resolve to value passed to next pushes first', async () => { + let push!: (value: unknown) => Promise; + const r = new Repeater(({ push: push1 }) => (push = push1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-1); + const push1 = push(1); + const push2 = push(2); + const push3 = push(3); + const push4 = push(4); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + r.next(-4); + expect(await push1).to.deep.equal(-2); + expect(await push2).to.deep.equal(-3); + expect(await push3).to.deep.equal(-4); + expect(await Promise.race([push4, delayPromise(1, -1000)])).to.deep.equal( + -1000, + ); + }); + + it('break in for await', async () => { + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + await push(3); + await push(4); /* c8 ignore start */ + }); /* c8 ignore stop */ + const spy = spyOn(r, 'return'); + const result: Array = []; + for await (const num of r) { + result.push(num); + if (num === 3) { + break; + } + } + expect(result).to.deep.equal([1, 2, 3]); + expectSpy(spy).toHaveBeenCalledTimes(1); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw in for await', async () => { + const error = new Error('throw in for await'); + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + await push(3); + await push(4); /* c8 ignore start */ + }); /* c8 ignore stop */ + const spy = spyOn(r, 'return'); + const result: Array = []; + await expectPromise( + (async () => { + for await (const num of r) { + result.push(num); + if (num === 3) { + throw error; + } /* c8 ignore start */ + } + })() /* c8 ignore stop */, + ).toRejectWith('throw in for await'); + expect(result).to.deep.equal([1, 2, 3]); + expectSpy(spy).toHaveBeenCalledTimes(1); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return method', async () => { + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + await push(3); + await push(4); /* c8 ignore start */ + return -1; + }); /* c8 ignore stop */ + const result: Array = []; + for await (const num of r) { + result.push(num); + if (num === 3) { + expect(await r.return()).to.deep.equal({ + done: true, + value: undefined, + }); + } + } + expect(result).to.deep.equal([1, 2, 3]); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return method before execution', async () => { + const mock = fn(); + const r = new Repeater(() => mock()); + expect(await r.return(-1)).to.deep.equal({ + value: -1, + done: true, + }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expectMock(mock).toHaveBeenCalledTimes(0); + }); + + it('return method with argument', async () => { + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); + await push(3); /* c8 ignore start */ + return -1; + }); /* c8 ignore stop */ + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + expect(await r.return(-2)).to.deep.equal({ + value: -2, + done: true, + }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.return()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.return(-3)).to.deep.equal({ + value: -3, + done: true, + }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return method with promise argument', async () => { + const r = new Repeater(async ({ push }) => { + await push(1); + await push(2); /* c8 ignore start */ + return -1; + }); /* c8 ignore stop */ + + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.return(Promise.resolve(-2))).to.deep.equal({ + value: -2, + done: true, + }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.return()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.return(Promise.resolve(-3))).to.deep.equal({ + value: -3, + done: true, + }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return method with argument and pending next', async () => { + const mock = fn(); + const r = new Repeater(async ({ push, stopped }) => { + for (let i = 1; i < 100; i++) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(i); + } + + await stopped; + mock(); + return -1; + }); + const next1 = r.next(-1); + const next2 = r.next(-2); + const next3 = r.next(-3); + const next4 = r.next(-4); + const returned = r.return(0); + expect(await next1).to.deep.equal({ value: 1, done: false }); + expect(await next2).to.deep.equal({ value: 2, done: false }); + expect(await next3).to.deep.equal({ value: 3, done: false }); + expect(await next4).to.deep.equal({ value: 4, done: false }); + expect(await returned).to.deep.equal({ value: 0, done: true }); + expectMock(mock).toHaveBeenCalledTimes(1); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return method with pushed rejection', async () => { + const error = new Error('return method with pushed rejection'); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error)); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.return(-2)).to.deep.equal({ + value: -2, + done: true, + }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('return method with async pushed rejection', async () => { + const error = new Error('return method with async pushed rejection'); + const r = new Repeater(async ({ push }) => { + await push(1); + await push(Promise.reject(error)); /* c8 ignore start */ + return -1; + }); /* c8 ignore stop */ + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.return(-2)).to.deep.equal({ + value: -2, + done: true, + }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw method', async () => { + const error = new Error('throw method'); + const mock = fn(); + const r = new Repeater(async ({ push, stopped }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + await stopped; + mock(); + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + await expectPromise(r.throw(error)).toRejectWith('throw method'); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expectMock(mock).toHaveBeenCalledTimes(1); + }); + + it('throw method before execution', async () => { + const error = new Error('throw method before execution'); + const mock = fn(); + const r = new Repeater(() => mock()); + await expectPromise(r.throw(error)).toRejectWith( + 'throw method before execution', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expectMock(mock).toHaveBeenCalledTimes(0); + }); + + it('throw method caught in async function', async () => { + const error = new Error('throw method caught in async function'); + const errors: Array = []; + const r = new Repeater(async ({ push, stop }) => { + for (let i = 0; i < 8; i++) { + try { + // eslint-disable-next-line no-await-in-loop + await push(i); + } catch (err) { + errors.push(err); + } + } + + stop(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 0, done: false }); + expect(await r.throw(error)).to.deep.equal({ + value: 1, + done: false, + }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.throw(error)).to.deep.equal({ + value: 3, + done: false, + }); + expect(await r.next()).to.deep.equal({ value: 4, done: false }); + expect(await r.throw(error)).to.deep.equal({ + value: 5, + done: false, + }); + expect(await r.next()).to.deep.equal({ value: 6, done: false }); + expect(await r.throw(error)).to.deep.equal({ + value: 7, + done: false, + }); + expect(await r.next()).to.deep.equal({ value: -1, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(errors).to.deep.equal(Array(4).fill(error)); + }); + + it('throw method with Promise.prototype.catch', async () => { + const error = new Error('throw method with Promise.prototype.catch'); + const mock = fn(); + const r = new Repeater(({ push }) => { + push(1).catch(mock); + push(2).catch(mock); + push(3).catch(mock); + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.throw(error)).to.deep.equal({ + value: 2, + done: false, + }); + expect(await r.throw(error)).to.deep.equal({ + value: 3, + done: false, + }); + expectMock(mock).toHaveBeenCalledTimes(2); + }); + + it('throw method after stop with error', async () => { + const error1 = new Error('throw method after stop with error 1'); + const error2 = new Error('throw method after stop with error 2'); + const r = new Repeater(({ push, stop }) => { + for (let i = 1; i < 100; i++) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(i); + } + stop(error1); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + await expectPromise(r.throw(error2)).toRejectWith( + 'throw method after stop with error 1', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw method with throw in executor', async () => { + const error1 = new Error('throw method with throw in executor 1'); + const error2 = new Error('throw method with throw in executor 2'); + const r = new Repeater(({ push }) => { + for (let i = 1; i < 100; i++) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(i); + } + + throw error1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + expect(await r.next()).to.deep.equal({ value: 2, done: false }); + expect(await r.next()).to.deep.equal({ value: 3, done: false }); + await expectPromise(r.throw(error2)).toRejectWith( + 'throw method with throw in executor 1', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw method with pending next', async () => { + const error = new Error('throw method with pending next'); + const mock = fn(); + const r = new Repeater(async ({ push, stopped }) => { + for (let i = 1; i < 100; i++) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(i); + } + + await stopped; + mock(); + return -1; + }); + const next1 = r.next(-1); + const next2 = r.next(-2); + const next3 = r.next(-3); + const next4 = r.next(-4); + const thrown = r.throw(error); + expect(await next1).to.deep.equal({ value: 1, done: false }); + expect(await next2).to.deep.equal({ value: 2, done: false }); + expect(await next3).to.deep.equal({ value: 3, done: false }); + expect(await next4).to.deep.equal({ value: 4, done: false }); + await expectPromise(thrown).toRejectWith('throw method with pending next'); + expectMock(mock).toHaveBeenCalledTimes(1); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw method before stop', async () => { + const error = new Error('throw method before stop'); + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + stop(); + return -1; + }); + + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + await expectPromise(r.throw(error)).toRejectWith( + 'throw method before stop', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw method before async stop', async () => { + const error = new Error('throw method before async stop'); + const r = new Repeater(async ({ push, stopped }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + await stopped; + return -1; + }); + + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + await expectPromise(r.throw(error)).toRejectWith( + 'throw method before async stop', + ); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('throw method before return method', async () => { + const error = new Error('throw method before return method'); + const mock = fn(); + const r = new Repeater(async ({ push, stopped }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + await stopped; + mock(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + const thrown = r.throw(error); + const returned = r.return(-2); + await expectPromise(thrown).toRejectWith( + 'throw method before return method', + ); + expect(await returned).to.deep.equal({ value: -2, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expectMock(mock).toHaveBeenCalledTimes(1); + }); + + it('throw method after return method', async () => { + const error = new Error('throw method after return method'); + const mock = fn(); + const r = new Repeater(async ({ push, stopped }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(1); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(2); + await stopped; + mock(); + return -1; + }); + expect(await r.next()).to.deep.equal({ value: 1, done: false }); + const returned = r.return(-2); + const thrown = r.throw(error); + await expectPromise(thrown).toRejectWith( + 'throw method after return method', + ); + expect(await returned).to.deep.equal({ value: -2, done: true }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expectMock(mock).toHaveBeenCalledTimes(1); + }); + + it('results settle in order', async () => { + const r = new Repeater(({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(10, 1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.resolve(2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(3); + stop(); + return -1; + }); + const r1 = r.next(); + const r2 = r.next(); + const r3 = r.next(); + const r4 = r.next(); + const r5 = r.next(); + expect( + await Promise.all([ + Promise.race([r5, r4, r3, r2, r1]), + Promise.race([r5, r4, r3, r2]), + Promise.race([r5, r4, r3]), + Promise.race([r5, r4]), + r5, + ]), + ).to.deep.equal([ + { value: 1, done: false }, + { value: 2, done: false }, + { value: 3, done: false }, + { value: -1, done: true }, + { value: undefined, done: true }, + ]); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('results settle in order with rejection', async () => { + const error = new Error('results settle in order with rejection'); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(100, 1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(10, 2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + return 5; + }); + const r1 = r.next(); + const r2 = r.next(); + const r3 = r.next(); + const r4 = r.next(); + const r5 = r.next(); + expect( + await Promise.allSettled([ + Promise.race([r5, r4, r3, r2, r1]), + Promise.race([r5, r4, r3, r2]), + Promise.race([r5, r4, r3]), + Promise.race([r5, r4]), + r5, + ]), + ).to.deep.equal([ + { status: 'fulfilled', value: { value: 1, done: false } }, + { status: 'fulfilled', value: { value: 2, done: false } }, + { status: 'rejected', reason: error }, + { status: 'fulfilled', value: { value: undefined, done: true } }, + { status: 'fulfilled', value: { value: undefined, done: true } }, + ]); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('results settle in order with return method and rejection', async () => { + const error = new Error( + 'results settle in order with return method and rejection', + ); + const r = new Repeater(({ push }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(100, 1)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(delayPromise(10, 2)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(Promise.reject(error)); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push(4); + return 5; + }); + const r1 = r.next(); + const r2 = r.next(); + const r3 = r.return(-1); + const r4 = r.next(); + const r5 = r.return(-2); + expect( + await Promise.all([ + Promise.race([r5, r4, r3, r2, r1]), + Promise.race([r5, r4, r3, r2]), + Promise.race([r5, r4, r3]), + Promise.race([r5, r4]), + r5, + ]), + ).to.deep.equal([ + { value: 1, done: false }, + { value: 2, done: false }, + { value: -1, done: true }, + { value: undefined, done: true }, + { value: -2, done: true }, + ]); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + expect(await r.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('WeakMap errors', () => { + const r = new Repeater(() => undefined); + const nextFn = r.next; + const boundNextFn = r.next.bind(r); + const returnFn = r.return; + const boundReturnFn = r.return.bind(r); + const throwFn = r.throw; + const boundThrowFn = r.throw.bind(r); + expect(() => nextFn()).to.throw('WeakMap error'); + expect(() => boundNextFn()).not.to.throw(); + expect(() => returnFn()).to.throw('WeakMap error'); + expect(() => boundReturnFn()).not.to.throw(); + expect(() => throwFn(1)).to.throw('WeakMap error'); + expect(() => boundThrowFn(1)).not.to.throw(); + }); +}); From 376d192adb063630762a1f372a36d2aca9c7de7f Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 12 Aug 2022 18:59:49 +0300 Subject: [PATCH 3/3] replace mapAsyncIterable with repeater-based implementation --- .../__tests__/mapAsyncIterable-test.ts | 4 +- src/execution/__tests__/subscribe-test.ts | 29 +++-- src/execution/mapAsyncIterable.ts | 107 +++++++++++------- 3 files changed, 85 insertions(+), 55 deletions(-) diff --git a/src/execution/__tests__/mapAsyncIterable-test.ts b/src/execution/__tests__/mapAsyncIterable-test.ts index d70714fa74..a5e794f420 100644 --- a/src/execution/__tests__/mapAsyncIterable-test.ts +++ b/src/execution/__tests__/mapAsyncIterable-test.ts @@ -276,8 +276,8 @@ describe('mapAsyncIterable', () => { yield 3; // Shouldn't be reached. } finally { didVisitFinally = true; - yield 1000; - } + yield 1000; /* c8 ignore start */ + } /* c8 ignore stop */ } const throwOver1 = mapAsyncIterable(source(), mapper); diff --git a/src/execution/__tests__/subscribe-test.ts b/src/execution/__tests__/subscribe-test.ts index 793a53f59f..f51761b89a 100644 --- a/src/execution/__tests__/subscribe-test.ts +++ b/src/execution/__tests__/subscribe-test.ts @@ -739,9 +739,8 @@ describe('Subscription Publish Phase', () => { }, }); + const returned = await subscription.return(); payload = subscription.next(); - await subscription.return(); - // A new email arrives! expect( pubsub.emit({ @@ -752,6 +751,10 @@ describe('Subscription Publish Phase', () => { }), ).to.equal(false); + expect(returned).to.deep.equal({ + done: true, + value: undefined, + }); expect(await payload).to.deep.equal({ done: true, value: undefined, @@ -793,17 +796,21 @@ describe('Subscription Publish Phase', () => { }, }); + const error = new Error('should not trigger when subscription is thrown'); + const caughtError = subscription.throw(error).catch((e) => e); payload = subscription.next(); - // Throw error - let caughtError; - try { - /* c8 ignore next 2 */ - await subscription.throw('ouch'); - } catch (e) { - caughtError = e; - } - expect(caughtError).to.equal('ouch'); + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright 2', + message: 'Tests are good 2', + unread: true, + }), + ).to.equal(true); + + expect(await caughtError).to.equal(error); expect(await payload).to.deep.equal({ done: true, diff --git a/src/execution/mapAsyncIterable.ts b/src/execution/mapAsyncIterable.ts index f61855c8cf..f7e4a2e044 100644 --- a/src/execution/mapAsyncIterable.ts +++ b/src/execution/mapAsyncIterable.ts @@ -1,57 +1,80 @@ +/* eslint-disable no-await-in-loop */ + import type { PromiseOrValue } from '../jsutils/PromiseOrValue'; +import { Repeater, RepeaterClosureSignal } from '../jsutils/Repeater'; /** - * Given an AsyncIterable and a callback function, return an AsyncIterator + * Given an AsyncIterable and a callback function, return an AsyncGenerator * which produces values mapped via calling the callback function. */ export function mapAsyncIterable( iterable: AsyncGenerator | AsyncIterable, - callback: (value: T) => PromiseOrValue, + fn: (value: T) => PromiseOrValue, ): AsyncGenerator { - const iterator = iterable[Symbol.asyncIterator](); + return new Repeater(async ({ push, stop }) => { + const iterator: AsyncIterator = + iterable[Symbol.asyncIterator](); - async function mapResult( - result: IteratorResult, - ): Promise> { - if (result.done) { - return result; - } + let next = iterator.next(); + // eslint-disable-next-line no-constant-condition + while (true) { + let iteration: IteratorResult; + try { + iteration = await next; + } catch (err) { + await abruptClose(iterator); + throw err; + } - try { - return { value: await callback(result.value), done: false }; - } catch (error) { - /* c8 ignore start */ - // FIXME: add test case - if (typeof iterator.return === 'function') { - try { - await iterator.return(); - } catch (_e) { - /* ignore error */ + const { done, value } = iteration; + + if (done) { + stop(); + return value; + } + + let mapped: U; + try { + mapped = await fn(value); + } catch (err) { + await abruptClose(iterator); + throw err; + } + + try { + await push(mapped); + } catch (err) { + if (err instanceof RepeaterClosureSignal) { + if (typeof iterator.return !== 'function') { + stop(); + return undefined as unknown as R; // void :( + } + + next = iterator.return(err.returnValue); + continue; } + + if (typeof iterator.throw !== 'function') { + await abruptClose(iterator); + throw err; + } + + next = iterator.throw(err); + continue; } - throw error; - /* c8 ignore stop */ + + next = iterator.next(); } - } + }); +} - return { - async next() { - return mapResult(await iterator.next()); - }, - async return(): Promise> { - // If iterator.return() does not exist, then type R must be undefined. - return typeof iterator.return === 'function' - ? mapResult(await iterator.return()) - : { value: undefined as any, done: true }; - }, - async throw(error?: unknown) { - if (typeof iterator.throw === 'function') { - return mapResult(await iterator.throw(error)); - } - throw error; - }, - [Symbol.asyncIterator]() { - return this; - }, - }; +async function abruptClose(iterator: AsyncIterator): Promise { + if (typeof iterator.return === 'function') { + try { + await iterator.return(); /* c8 ignore start */ + } catch (_err) { + // FIXME: add test case + /* ignore error */ + } /* c8 ignore stop */ + } }