Skip to content

Commit 8e50cda

Browse files
committed
Fix closing of state machine before the replay has finished in success case.
1 parent 4a0a593 commit 8e50cda

File tree

1 file changed

+54
-75
lines changed

1 file changed

+54
-75
lines changed

src/state_machine.ts

Lines changed: 54 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,10 @@ enum ExecutionState {
7777
export class PendingMessage {
7878
constructor(
7979
readonly messageType: bigint,
80-
readonly message?: ProtocolMessage | Uint8Array,
81-
readonly resolve?: (value: unknown) => void,
82-
readonly reject?: (reason: Failure | Error) => void
80+
readonly promise: Promise<unknown>,
81+
readonly resolve: (value: unknown) => void,
82+
readonly reject: (reason: Failure | Error) => void,
83+
readonly message?: ProtocolMessage | Uint8Array
8384
) {}
8485
}
8586

@@ -89,7 +90,6 @@ export class PendingMessage {
8990
* The CompletionResult is the data of those out-of-order messages that we store in the map.
9091
*/
9192
type CompletionResult = {
92-
journalIndex: number;
9393
messageType: bigint;
9494
/* eslint-disable @typescript-eslint/no-explicit-any */
9595
message: any;
@@ -173,15 +173,12 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
173173
this.incrementJournalIndex();
174174

175175
const msg = GetStateEntryMessage.create({ key: Buffer.from(name) });
176-
const promise = new Promise<Buffer>((resolve, reject) => {
177-
this.storePendingMsg(
178-
this.currentJournalIndex,
179-
GET_STATE_ENTRY_MESSAGE_TYPE,
180-
msg,
181-
resolve,
182-
reject
183-
);
184-
});
176+
177+
const promise = this.storePendingMsg(
178+
this.currentJournalIndex,
179+
GET_STATE_ENTRY_MESSAGE_TYPE,
180+
msg
181+
);
185182

186183
if (this.state !== ExecutionState.REPLAYING) {
187184
// Not in replay mode: GetState message will be forwarded to the runtime
@@ -290,16 +287,12 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
290287

291288
const msg = AwakeableEntryMessage.create();
292289

293-
const awakeablePromise = new Promise<Buffer>((resolve, reject) => {
294-
this.storePendingMsg(
290+
const awakeablePromise = this.storePendingMsg<Buffer>(
295291
this.currentJournalIndex,
296292
AWAKEABLE_ENTRY_MESSAGE_TYPE,
297-
msg,
298-
resolve,
299-
reject
300-
);
301-
}).then<T>((result: Buffer) => {
302-
return JSON.parse(result.toString()) as T;
293+
msg
294+
).then<T>((result: Buffer) => {
295+
return JSON.parse(result.toString()) as T;
303296
});
304297

305298
if (this.state !== ExecutionState.REPLAYING) {
@@ -446,15 +439,11 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
446439
parameter: Buffer.from(data),
447440
});
448441

449-
const promise = new Promise((resolve, reject) => {
450-
this.storePendingMsg(
442+
const promise = this.storePendingMsg(
451443
this.currentJournalIndex,
452444
INVOKE_ENTRY_MESSAGE_TYPE,
453-
msg,
454-
resolve,
455-
reject
445+
msg
456446
);
457-
});
458447
if (this.state !== ExecutionState.REPLAYING) {
459448
// Not in replay mode: invoke will be forwarded to the runtime
460449
rlog.debugJournalMessage(
@@ -532,17 +521,10 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
532521
// If we are replaying, it needs to be resolved by the value of the replayed SideEffectEntryMessage.
533522
// For journal mismatch checks during replay,
534523
// we only check the message type to avoid having to re-execute the user code.
535-
const promiseToResolve = new Promise<T | undefined>(
536-
(resolveWithCompletion, rejectWithCompletion) => {
537-
this.storePendingMsg(
524+
const promiseToResolve = this.storePendingMsg<T | undefined>(
538525
this.currentJournalIndex,
539-
SIDE_EFFECT_ENTRY_MESSAGE_TYPE,
540-
undefined,
541-
resolveWithCompletion,
542-
rejectWithCompletion
526+
SIDE_EFFECT_ENTRY_MESSAGE_TYPE
543527
);
544-
}
545-
);
546528

547529
if (this.state === ExecutionState.REPLAYING) {
548530
// In replay mode: side effect will be ignored. Expecting completion.
@@ -588,12 +570,10 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
588570

589571
// When the runtime has acked the sideEffect with an empty completion,
590572
// then we resolve the promise with the result of the user-defined function.
591-
this.flush()
592-
.then(() => promiseToResolve)
593-
.then(
594-
() => resolve(value),
595-
(failure) => reject(failure)
596-
);
573+
this.flush().then(() => promiseToResolve).then(
574+
() => resolve(value),
575+
(failure) => reject(failure)
576+
);
597577
})
598578
.catch((reason) => {
599579
// Reason is either a failure or an Error
@@ -624,12 +604,10 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
624604
this.inSideEffectFlag = false;
625605

626606
// When something went wrong, then we resolve the promise with a failure.
627-
this.flush()
628-
.then(() => promiseToResolve)
629-
.then(
630-
() => reject(failure),
631-
(failureFromRuntime) => reject(failureFromRuntime)
632-
);
607+
this.flush().then(() => promiseToResolve).then(
608+
() => reject(failure),
609+
(failureFromRuntime) => reject(failureFromRuntime)
610+
);
633611
});
634612
});
635613
}
@@ -642,15 +620,11 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
642620
this.incrementJournalIndex();
643621

644622
const msg = SleepEntryMessage.create({ wakeUpTime: Date.now() + millis });
645-
const promise = new Promise<void>((resolve, reject) => {
646-
this.storePendingMsg(
623+
const promise = this.storePendingMsg<void>(
647624
this.currentJournalIndex,
648625
SLEEP_ENTRY_MESSAGE_TYPE,
649-
msg,
650-
resolve,
651-
reject
626+
msg
652627
);
653-
});
654628

655629
if (this.state !== ExecutionState.REPLAYING) {
656630
// Not in replay mode: SleepEntryMessage will be forwarded to the runtime
@@ -696,7 +670,7 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
696670
}
697671

698672
async flush(): Promise<void> {
699-
await this.connection.flush();
673+
await this.connection.flush()
700674
}
701675

702676
scheduleSuspensionTimeout(): void {
@@ -827,13 +801,10 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
827801

828802
handleInputMessage(m: PollInputStreamEntryMessage) {
829803
this.invocationIdString = uuidV7FromBuffer(this.invocationId);
830-
this.logPrefix = `[${this.method.packge}.${
831-
this.method.service
832-
}-${this.instanceKey.toString("base64")}-${this.invocationIdString}] [${
833-
this.method.method.name
834-
}]`;
804+
this.logPrefix = `[${this.method.packge}.${this.method.service}-${this.instanceKey.toString('base64')}-${this.invocationIdString}] [${this.method.method.name}]`;
805+
rlog.debugJournalMessage(this.logPrefix, "Received input message.", m);
835806

836-
this.method.invoke(this, m.value, this.logPrefix).then(
807+
this.method.invoke(this, m.value).then(
837808
(value) => this.onCallSuccess(value),
838809
(failure) => this.onCallFailure(failure)
839810
);
@@ -1020,21 +991,24 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
1020991
}
1021992
}
1022993

1023-
storePendingMsg(
994+
storePendingMsg<T>(
1024995
journalIndex: number,
1025996
messageType: bigint,
1026-
message?: ProtocolMessage | Uint8Array,
1027-
/* eslint-disable @typescript-eslint/no-explicit-any */
1028-
resolve?: (value: any) => void,
1029-
/* eslint-disable @typescript-eslint/no-explicit-any */
1030-
reject?: (value: any) => void
1031-
) {
997+
message?: ProtocolMessage | Uint8Array
998+
): Promise<T> {
1032999
// If we are replaying, the replayed message may have arrived before the user code got there.
10331000
// Otherwise, add to map.
10341001
// TODO make this more efficient and only add it to the map if we don't have the result ready
1002+
let resolvePendingMsg: (value: any) => void;
1003+
let rejectPendingMsg: (reason?: any) => void;
1004+
const promise = new Promise<T>((resolve, reject) => {
1005+
resolvePendingMsg = resolve;
1006+
rejectPendingMsg = reject;
1007+
})
10351008
this.indexToPendingMsgMap.set(
10361009
journalIndex,
1037-
new PendingMessage(messageType, message, resolve, reject)
1010+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1011+
new PendingMessage(messageType, promise, resolvePendingMsg!, rejectPendingMsg!, message)
10381012
);
10391013

10401014
if (SUSPENSION_TRIGGERS.includes(messageType)) {
@@ -1048,7 +1022,7 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
10481022
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
10491023
const completionResult = this.outOfOrderReplayMessages.get(journalIndex)!;
10501024
this.handlePendingMessage(
1051-
completionResult.journalIndex,
1025+
journalIndex,
10521026
completionResult.messageType,
10531027
completionResult.message,
10541028
completionResult.comparisonFct,
@@ -1057,6 +1031,8 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
10571031
);
10581032
this.outOfOrderReplayMessages.delete(journalIndex);
10591033
}
1034+
1035+
return promise;
10601036
}
10611037

10621038
/**
@@ -1110,7 +1086,6 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
11101086
);
11111087
}
11121088
this.outOfOrderReplayMessages.set(journalIndex, {
1113-
journalIndex: journalIndex,
11141089
messageType: replayMsg.messageType,
11151090
message: replayMsg.message,
11161091
comparisonFct: replayMsg.comparisonFct,
@@ -1119,7 +1094,6 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
11191094
});
11201095
} else {
11211096
this.outOfOrderReplayMessages.set(journalIndex, {
1122-
journalIndex: journalIndex,
11231097
messageType: resultMessageType,
11241098
message: resultMessage,
11251099
comparisonFct: comparisonFct,
@@ -1174,19 +1148,18 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
11741148
pendingMessage.messageType === COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE ||
11751149
pendingMessage.messageType === BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE
11761150
) {
1151+
pendingMessage.resolve(undefined);
11771152
this.indexToPendingMsgMap.delete(journalIndex);
11781153
return;
11791154
}
11801155

11811156
// 4. Handle side effect acks
1182-
11831157
// In case of a side effect completion, we don't get a value or failure back but still need to ack the completion.
11841158
if (
11851159
resultMessageType === COMPLETION_MESSAGE_TYPE &&
11861160
pendingMessage.messageType === SIDE_EFFECT_ENTRY_MESSAGE_TYPE
11871161
) {
1188-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1189-
pendingMessage.resolve!(undefined);
1162+
pendingMessage.resolve(undefined);
11901163
this.indexToPendingMsgMap.delete(journalIndex);
11911164
}
11921165

@@ -1289,6 +1262,12 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
12891262
"Call ended successful with output message.",
12901263
msg
12911264
);
1265+
1266+
if(this.indexToPendingMsgMap.size !== 0){
1267+
// wait till all messages have been resolved
1268+
await Promise.all([...this.indexToPendingMsgMap.values()].map(el => el.promise))
1269+
}
1270+
12921271
// We send the message straight over the connection
12931272
this.connection.buffer(
12941273
new Message(OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, msg)

0 commit comments

Comments
 (0)