diff --git a/src/state_machine.ts b/src/state_machine.ts index e584a713..70bc4ea4 100644 --- a/src/state_machine.ts +++ b/src/state_machine.ts @@ -77,9 +77,10 @@ enum ExecutionState { export class PendingMessage { constructor( readonly messageType: bigint, - readonly message?: ProtocolMessage | Uint8Array, - readonly resolve?: (value: unknown) => void, - readonly reject?: (reason: Failure | Error) => void + readonly promise: Promise, + readonly resolve: (value: unknown) => void, + readonly reject: (reason: Failure | Error) => void, + readonly message?: ProtocolMessage | Uint8Array ) {} } @@ -89,7 +90,6 @@ export class PendingMessage { * The CompletionResult is the data of those out-of-order messages that we store in the map. */ type CompletionResult = { - journalIndex: number; messageType: bigint; /* eslint-disable @typescript-eslint/no-explicit-any */ message: any; @@ -173,15 +173,12 @@ export class DurableExecutionStateMachine implements RestateContext { this.incrementJournalIndex(); const msg = GetStateEntryMessage.create({ key: Buffer.from(name) }); - const promise = new Promise((resolve, reject) => { - this.storePendingMsg( - this.currentJournalIndex, - GET_STATE_ENTRY_MESSAGE_TYPE, - msg, - resolve, - reject - ); - }); + + const promise = this.storePendingMsg( + this.currentJournalIndex, + GET_STATE_ENTRY_MESSAGE_TYPE, + msg + ); if (this.state !== ExecutionState.REPLAYING) { // Not in replay mode: GetState message will be forwarded to the runtime @@ -290,16 +287,12 @@ export class DurableExecutionStateMachine implements RestateContext { const msg = AwakeableEntryMessage.create(); - const awakeablePromise = new Promise((resolve, reject) => { - this.storePendingMsg( + const awakeablePromise = this.storePendingMsg( this.currentJournalIndex, AWAKEABLE_ENTRY_MESSAGE_TYPE, - msg, - resolve, - reject - ); - }).then((result: Buffer) => { - return JSON.parse(result.toString()) as T; + msg + ).then((result: Buffer) => { + return JSON.parse(result.toString()) as T; }); if (this.state !== ExecutionState.REPLAYING) { @@ -446,15 +439,11 @@ export class DurableExecutionStateMachine implements RestateContext { parameter: Buffer.from(data), }); - const promise = new Promise((resolve, reject) => { - this.storePendingMsg( + const promise = this.storePendingMsg( this.currentJournalIndex, INVOKE_ENTRY_MESSAGE_TYPE, - msg, - resolve, - reject + msg ); - }); if (this.state !== ExecutionState.REPLAYING) { // Not in replay mode: invoke will be forwarded to the runtime rlog.debugJournalMessage( @@ -532,17 +521,10 @@ export class DurableExecutionStateMachine implements RestateContext { // If we are replaying, it needs to be resolved by the value of the replayed SideEffectEntryMessage. // For journal mismatch checks during replay, // we only check the message type to avoid having to re-execute the user code. - const promiseToResolve = new Promise( - (resolveWithCompletion, rejectWithCompletion) => { - this.storePendingMsg( + const promiseToResolve = this.storePendingMsg( this.currentJournalIndex, - SIDE_EFFECT_ENTRY_MESSAGE_TYPE, - undefined, - resolveWithCompletion, - rejectWithCompletion + SIDE_EFFECT_ENTRY_MESSAGE_TYPE ); - } - ); if (this.state === ExecutionState.REPLAYING) { // In replay mode: side effect will be ignored. Expecting completion. @@ -588,12 +570,10 @@ export class DurableExecutionStateMachine implements RestateContext { // When the runtime has acked the sideEffect with an empty completion, // then we resolve the promise with the result of the user-defined function. - this.flush() - .then(() => promiseToResolve) - .then( - () => resolve(value), - (failure) => reject(failure) - ); + this.flush().then(() => promiseToResolve).then( + () => resolve(value), + (failure) => reject(failure) + ); }) .catch((reason) => { // Reason is either a failure or an Error @@ -624,12 +604,10 @@ export class DurableExecutionStateMachine implements RestateContext { this.inSideEffectFlag = false; // When something went wrong, then we resolve the promise with a failure. - this.flush() - .then(() => promiseToResolve) - .then( - () => reject(failure), - (failureFromRuntime) => reject(failureFromRuntime) - ); + this.flush().then(() => promiseToResolve).then( + () => reject(failure), + (failureFromRuntime) => reject(failureFromRuntime) + ); }); }); } @@ -642,15 +620,11 @@ export class DurableExecutionStateMachine implements RestateContext { this.incrementJournalIndex(); const msg = SleepEntryMessage.create({ wakeUpTime: Date.now() + millis }); - const promise = new Promise((resolve, reject) => { - this.storePendingMsg( + const promise = this.storePendingMsg( this.currentJournalIndex, SLEEP_ENTRY_MESSAGE_TYPE, - msg, - resolve, - reject + msg ); - }); if (this.state !== ExecutionState.REPLAYING) { // Not in replay mode: SleepEntryMessage will be forwarded to the runtime @@ -696,7 +670,7 @@ export class DurableExecutionStateMachine implements RestateContext { } async flush(): Promise { - await this.connection.flush(); + await this.connection.flush() } scheduleSuspensionTimeout(): void { @@ -827,11 +801,7 @@ export class DurableExecutionStateMachine implements RestateContext { handleInputMessage(m: PollInputStreamEntryMessage) { this.invocationIdString = uuidV7FromBuffer(this.invocationId); - this.logPrefix = `[${this.method.packge}.${ - this.method.service - }-${this.instanceKey.toString("base64")}-${this.invocationIdString}] [${ - this.method.method.name - }]`; + this.logPrefix = `[${this.method.packge}.${this.method.service}-${this.instanceKey.toString('base64')}-${this.invocationIdString}] [${this.method.method.name}]`; this.method.invoke(this, m.value, this.logPrefix).then( (value) => this.onCallSuccess(value), @@ -1020,21 +990,24 @@ export class DurableExecutionStateMachine implements RestateContext { } } - storePendingMsg( + storePendingMsg( journalIndex: number, messageType: bigint, - message?: ProtocolMessage | Uint8Array, - /* eslint-disable @typescript-eslint/no-explicit-any */ - resolve?: (value: any) => void, - /* eslint-disable @typescript-eslint/no-explicit-any */ - reject?: (value: any) => void - ) { + message?: ProtocolMessage | Uint8Array + ): Promise { // If we are replaying, the replayed message may have arrived before the user code got there. // Otherwise, add to map. // TODO make this more efficient and only add it to the map if we don't have the result ready + let resolvePendingMsg: (value: any) => void; + let rejectPendingMsg: (reason?: any) => void; + const promise = new Promise((resolve, reject) => { + resolvePendingMsg = resolve; + rejectPendingMsg = reject; + }) this.indexToPendingMsgMap.set( journalIndex, - new PendingMessage(messageType, message, resolve, reject) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + new PendingMessage(messageType, promise, resolvePendingMsg!, rejectPendingMsg!, message) ); if (SUSPENSION_TRIGGERS.includes(messageType)) { @@ -1048,7 +1021,7 @@ export class DurableExecutionStateMachine implements RestateContext { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const completionResult = this.outOfOrderReplayMessages.get(journalIndex)!; this.handlePendingMessage( - completionResult.journalIndex, + journalIndex, completionResult.messageType, completionResult.message, completionResult.comparisonFct, @@ -1057,6 +1030,8 @@ export class DurableExecutionStateMachine implements RestateContext { ); this.outOfOrderReplayMessages.delete(journalIndex); } + + return promise; } /** @@ -1110,7 +1085,6 @@ export class DurableExecutionStateMachine implements RestateContext { ); } this.outOfOrderReplayMessages.set(journalIndex, { - journalIndex: journalIndex, messageType: replayMsg.messageType, message: replayMsg.message, comparisonFct: replayMsg.comparisonFct, @@ -1119,7 +1093,6 @@ export class DurableExecutionStateMachine implements RestateContext { }); } else { this.outOfOrderReplayMessages.set(journalIndex, { - journalIndex: journalIndex, messageType: resultMessageType, message: resultMessage, comparisonFct: comparisonFct, @@ -1174,19 +1147,18 @@ export class DurableExecutionStateMachine implements RestateContext { pendingMessage.messageType === COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE || pendingMessage.messageType === BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE ) { + pendingMessage.resolve(undefined); this.indexToPendingMsgMap.delete(journalIndex); return; } // 4. Handle side effect acks - // In case of a side effect completion, we don't get a value or failure back but still need to ack the completion. if ( resultMessageType === COMPLETION_MESSAGE_TYPE && pendingMessage.messageType === SIDE_EFFECT_ENTRY_MESSAGE_TYPE ) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - pendingMessage.resolve!(undefined); + pendingMessage.resolve(undefined); this.indexToPendingMsgMap.delete(journalIndex); } @@ -1289,6 +1261,12 @@ export class DurableExecutionStateMachine implements RestateContext { "Call ended successful with output message.", msg ); + + if(this.indexToPendingMsgMap.size !== 0){ + // wait till all messages have been resolved + await Promise.all([...this.indexToPendingMsgMap.values()].map(el => el.promise)) + } + // We send the message straight over the connection this.connection.buffer( new Message(OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, msg)