Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 52 additions & 74 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ enum ExecutionState {
export class PendingMessage {
constructor(
readonly messageType: bigint,
readonly message?: ProtocolMessage | Uint8Array,
readonly resolve?: (value: unknown) => void,
readonly reject?: (reason: Failure | Error) => void
readonly promise: Promise<unknown>,
readonly resolve: (value: unknown) => void,
readonly reject: (reason: Failure | Error) => void,
readonly message?: ProtocolMessage | Uint8Array
) {}
}

Expand All @@ -89,7 +90,6 @@ export class PendingMessage {
* The CompletionResult is the data of those out-of-order messages that we store in the map.
*/
type CompletionResult = {
journalIndex: number;
messageType: bigint;
/* eslint-disable @typescript-eslint/no-explicit-any */
message: any;
Expand Down Expand Up @@ -173,15 +173,12 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
this.incrementJournalIndex();

const msg = GetStateEntryMessage.create({ key: Buffer.from(name) });
const promise = new Promise<Buffer>((resolve, reject) => {
this.storePendingMsg(
this.currentJournalIndex,
GET_STATE_ENTRY_MESSAGE_TYPE,
msg,
resolve,
reject
);
});

const promise = this.storePendingMsg(
this.currentJournalIndex,
GET_STATE_ENTRY_MESSAGE_TYPE,
msg
);

if (this.state !== ExecutionState.REPLAYING) {
// Not in replay mode: GetState message will be forwarded to the runtime
Expand Down Expand Up @@ -290,16 +287,12 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {

const msg = AwakeableEntryMessage.create();

const awakeablePromise = new Promise<Buffer>((resolve, reject) => {
this.storePendingMsg(
const awakeablePromise = this.storePendingMsg<Buffer>(
this.currentJournalIndex,
AWAKEABLE_ENTRY_MESSAGE_TYPE,
msg,
resolve,
reject
);
}).then<T>((result: Buffer) => {
return JSON.parse(result.toString()) as T;
msg
).then<T>((result: Buffer) => {
return JSON.parse(result.toString()) as T;
});

if (this.state !== ExecutionState.REPLAYING) {
Expand Down Expand Up @@ -446,15 +439,11 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
parameter: Buffer.from(data),
});

const promise = new Promise((resolve, reject) => {
this.storePendingMsg(
const promise = this.storePendingMsg(
this.currentJournalIndex,
INVOKE_ENTRY_MESSAGE_TYPE,
msg,
resolve,
reject
msg
);
});
if (this.state !== ExecutionState.REPLAYING) {
// Not in replay mode: invoke will be forwarded to the runtime
rlog.debugJournalMessage(
Expand Down Expand Up @@ -532,17 +521,10 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
// If we are replaying, it needs to be resolved by the value of the replayed SideEffectEntryMessage.
// For journal mismatch checks during replay,
// we only check the message type to avoid having to re-execute the user code.
const promiseToResolve = new Promise<T | undefined>(
(resolveWithCompletion, rejectWithCompletion) => {
this.storePendingMsg(
const promiseToResolve = this.storePendingMsg<T | undefined>(
this.currentJournalIndex,
SIDE_EFFECT_ENTRY_MESSAGE_TYPE,
undefined,
resolveWithCompletion,
rejectWithCompletion
SIDE_EFFECT_ENTRY_MESSAGE_TYPE
);
}
);

if (this.state === ExecutionState.REPLAYING) {
// In replay mode: side effect will be ignored. Expecting completion.
Expand Down Expand Up @@ -588,12 +570,10 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {

// When the runtime has acked the sideEffect with an empty completion,
// then we resolve the promise with the result of the user-defined function.
this.flush()
.then(() => promiseToResolve)
.then(
() => resolve(value),
(failure) => reject(failure)
);
this.flush().then(() => promiseToResolve).then(
() => resolve(value),
(failure) => reject(failure)
);
})
.catch((reason) => {
// Reason is either a failure or an Error
Expand Down Expand Up @@ -624,12 +604,10 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
this.inSideEffectFlag = false;

// When something went wrong, then we resolve the promise with a failure.
this.flush()
.then(() => promiseToResolve)
.then(
() => reject(failure),
(failureFromRuntime) => reject(failureFromRuntime)
);
this.flush().then(() => promiseToResolve).then(
() => reject(failure),
(failureFromRuntime) => reject(failureFromRuntime)
);
});
});
}
Expand All @@ -642,15 +620,11 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
this.incrementJournalIndex();

const msg = SleepEntryMessage.create({ wakeUpTime: Date.now() + millis });
const promise = new Promise<void>((resolve, reject) => {
this.storePendingMsg(
const promise = this.storePendingMsg<void>(
this.currentJournalIndex,
SLEEP_ENTRY_MESSAGE_TYPE,
msg,
resolve,
reject
msg
);
});

if (this.state !== ExecutionState.REPLAYING) {
// Not in replay mode: SleepEntryMessage will be forwarded to the runtime
Expand Down Expand Up @@ -696,7 +670,7 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
}

async flush(): Promise<void> {
await this.connection.flush();
await this.connection.flush()
}

scheduleSuspensionTimeout(): void {
Expand Down Expand Up @@ -827,11 +801,7 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {

handleInputMessage(m: PollInputStreamEntryMessage) {
this.invocationIdString = uuidV7FromBuffer(this.invocationId);
this.logPrefix = `[${this.method.packge}.${
this.method.service
}-${this.instanceKey.toString("base64")}-${this.invocationIdString}] [${
this.method.method.name
}]`;
this.logPrefix = `[${this.method.packge}.${this.method.service}-${this.instanceKey.toString('base64')}-${this.invocationIdString}] [${this.method.method.name}]`;

this.method.invoke(this, m.value, this.logPrefix).then(
(value) => this.onCallSuccess(value),
Expand Down Expand Up @@ -1020,21 +990,24 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
}
}

storePendingMsg(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really good idea!

storePendingMsg<T>(
journalIndex: number,
messageType: bigint,
message?: ProtocolMessage | Uint8Array,
/* eslint-disable @typescript-eslint/no-explicit-any */
resolve?: (value: any) => void,
/* eslint-disable @typescript-eslint/no-explicit-any */
reject?: (value: any) => void
) {
message?: ProtocolMessage | Uint8Array
): Promise<T> {
// If we are replaying, the replayed message may have arrived before the user code got there.
// Otherwise, add to map.
// TODO make this more efficient and only add it to the map if we don't have the result ready
let resolvePendingMsg: (value: any) => void;
let rejectPendingMsg: (reason?: any) => void;
const promise = new Promise<T>((resolve, reject) => {
resolvePendingMsg = resolve;
rejectPendingMsg = reject;
})
this.indexToPendingMsgMap.set(
journalIndex,
new PendingMessage(messageType, message, resolve, reject)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
new PendingMessage(messageType, promise, resolvePendingMsg!, rejectPendingMsg!, message)
);

if (SUSPENSION_TRIGGERS.includes(messageType)) {
Expand All @@ -1048,7 +1021,7 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const completionResult = this.outOfOrderReplayMessages.get(journalIndex)!;
this.handlePendingMessage(
completionResult.journalIndex,
journalIndex,
completionResult.messageType,
completionResult.message,
completionResult.comparisonFct,
Expand All @@ -1057,6 +1030,8 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
);
this.outOfOrderReplayMessages.delete(journalIndex);
}

return promise;
}

/**
Expand Down Expand Up @@ -1110,7 +1085,6 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
);
}
this.outOfOrderReplayMessages.set(journalIndex, {
journalIndex: journalIndex,
messageType: replayMsg.messageType,
message: replayMsg.message,
comparisonFct: replayMsg.comparisonFct,
Expand All @@ -1119,7 +1093,6 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
});
} else {
this.outOfOrderReplayMessages.set(journalIndex, {
journalIndex: journalIndex,
messageType: resultMessageType,
message: resultMessage,
comparisonFct: comparisonFct,
Expand Down Expand Up @@ -1174,19 +1147,18 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
pendingMessage.messageType === COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE ||
pendingMessage.messageType === BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE
) {
pendingMessage.resolve(undefined);
this.indexToPendingMsgMap.delete(journalIndex);
return;
}

// 4. Handle side effect acks

// In case of a side effect completion, we don't get a value or failure back but still need to ack the completion.
if (
resultMessageType === COMPLETION_MESSAGE_TYPE &&
pendingMessage.messageType === SIDE_EFFECT_ENTRY_MESSAGE_TYPE
) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pendingMessage.resolve!(undefined);
pendingMessage.resolve(undefined);
this.indexToPendingMsgMap.delete(journalIndex);
}

Expand Down Expand Up @@ -1289,6 +1261,12 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
"Call ended successful with output message.",
msg
);

if(this.indexToPendingMsgMap.size !== 0){
// wait till all messages have been resolved
await Promise.all([...this.indexToPendingMsgMap.values()].map(el => el.promise))
}

// We send the message straight over the connection
this.connection.buffer(
new Message(OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, msg)
Expand Down