Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"release": "release-it"
},
"files": [
"dist", "src"
"dist",
"src"
],
"dependencies": {
"protobufjs": "^7.2.2",
Expand Down
6 changes: 6 additions & 0 deletions proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ message BackgroundInvokeEntryMessage {
string method_name = 2;

bytes parameter = 3;

// Time when this BackgroundInvoke should be executed.
// The time is set as duration since UNIX Epoch.
// If this value is not set, equal to 0, or past in time,
// the runtime will execute this BackgroundInvoke as soon as possible.
int64 invoke_time = 4;
}

// Kind: Completable JournalEntry
Expand Down
2 changes: 1 addition & 1 deletion src/restate_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export interface RestateContext {
* client.greet(Request.create({ name: "Peter" }))
* )
*/
inBackground<T>(call: () => Promise<T>): Promise<void>;
inBackground<T>(call: () => Promise<T>, delayMillis?: number): void;

/**
* Execute a side effect and store the result in the Restate runtime.
Expand Down
49 changes: 30 additions & 19 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
// So to be able to know if a request is a background request or not, the user first sets this flag:
// e.g.: ctx.inBackground(() => client.greet(request))
private inBackgroundCallFlag = false;
private inBackgroundCallDelay = 0;

// This flag is set to true when we are executing code that is inside a side effect.
// We use this flag to prevent the user from doing operations on the context from within a side effect.
Expand Down Expand Up @@ -424,11 +425,19 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
// Validation check that we are not in a sideEffect is done in inBackground() already.
this.incrementJournalIndex();

const msg = BackgroundInvokeEntryMessage.create({
serviceName: service,
methodName: method,
parameter: Buffer.from(data),
});
const msg =
this.inBackgroundCallDelay > 0
? BackgroundInvokeEntryMessage.create({
serviceName: service,
methodName: method,
parameter: Buffer.from(data),
invokeTime: Date.now() + this.inBackgroundCallDelay,
})
: BackgroundInvokeEntryMessage.create({
serviceName: service,
methodName: method,
parameter: Buffer.from(data),
});

if (this.state === ExecutionState.REPLAYING) {
// In replay mode: background invoke will not be forwarded to the runtime.
Expand Down Expand Up @@ -522,26 +531,28 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
});
}

/**
* When you call inBackground, a flag is set that you want the nested call to be executed in the background.
* Then you use the client to do the call to the other Restate service.
* When you do the call, the overridden request method gets called.
* That one checks if the inBackgroundFlag is set.
* If so, it doesn't care about a response and just returns back an empty UInt8Array, and otherwise it waits for the response.
* The reason for this is that we use the generated clients of proto-ts to do invokes.
* And we override the request method that is called by that client to do the Restate related things.
* The request method of the proto-ts client requires returning a Promise.
* So until we find a cleaner solution for this, in which we can still use the generated clients but are not required to return a promise,
* this will return a void Promise.
* @param call
*/
async inBackground<T>(call: () => Promise<T>): Promise<void> {
// When you call inBackground, a flag is set that you want the nested call to be executed in the background.
// Then you use the client to do the call to the other Restate service.
// When you do the call, the overridden request method gets called.
// That one checks if the inBackgroundFlag is set.
// If so, it doesn't care about a response and just returns back an empty UInt8Array, and otherwise it waits for the response.
// The reason for this is that we use the generated clients of proto-ts to do invokes.
// And we override the request method that is called by that client to do the Restate related things.
// The request method of the proto-ts client requires returning a Promise.
// So until we find a cleaner solution for this, in which we can still use the generated clients but are not required to return a promise,
// this will return a void Promise.
async inBackground<T>(
call: () => Promise<T>,
delayMillis?: number
): Promise<void> {
if (!this.isValidState("inBackground")) {
return Promise.reject();
}

this.inBackgroundCallFlag = true;
this.inBackgroundCallDelay = delayMillis || 0;
await call();
this.inBackgroundCallDelay = 0;
this.inBackgroundCallFlag = false;
}

Expand Down
29 changes: 20 additions & 9 deletions test/protoutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,27 @@ export function invokeMessage(
export function backgroundInvokeMessage(
serviceName: string,
methodName: string,
parameter: Uint8Array
parameter: Uint8Array,
invokeTime?: number
): Message {
return new Message(
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
BackgroundInvokeEntryMessage.create({
serviceName: serviceName,
methodName: methodName,
parameter: Buffer.from(parameter),
})
);
return invokeTime
? new Message(
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
BackgroundInvokeEntryMessage.create({
serviceName: serviceName,
methodName: methodName,
parameter: Buffer.from(parameter),
invokeTime: invokeTime,
})
)
: new Message(
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
BackgroundInvokeEntryMessage.create({
serviceName: serviceName,
methodName: methodName,
parameter: Buffer.from(parameter),
})
);
}

export function decodeSideEffectFromResult(msg: Uint8Array | ProtocolMessage) {
Expand Down
127 changes: 127 additions & 0 deletions test/send_request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,39 @@ class FailingForwardGreetingService implements TestGreeter {
}
}

const delayedCallTime = 1835661783000;
class DelayedInBackgroundInvokeGreeter implements TestGreeter {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async greet(request: TestRequest): Promise<TestResponse> {
const ctx = restate.useContext(this);

const client = new TestGreeterClientImpl(ctx);
await ctx.inBackground(
() => client.greet(TestRequest.create({ name: "Francesco" })),
delayedCallTime - Date.now()
);

return TestResponse.create({ greeting: `Hello` });
}
}
class DelayedAndNormalInBackgroundInvokesGreeter implements TestGreeter {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async greet(request: TestRequest): Promise<TestResponse> {
const ctx = restate.useContext(this);

const client = new TestGreeterClientImpl(ctx);
await ctx.inBackground(
() => client.greet(TestRequest.create({ name: "Francesco" })),
delayedCallTime - Date.now()
);
await ctx.inBackground(() =>
client.greet(TestRequest.create({ name: "Francesco" }))
);

return TestResponse.create({ greeting: `Hello` });
}
}

describe("ReverseAwaitOrder: None completed", () => {
it("should call greet", async () => {
const result = await new TestDriver(
Expand Down Expand Up @@ -669,4 +702,98 @@ describe("FailingSideEffectInBackgroundInvokeGreeter: failing background call ",
});
});

describe("DelayedInBackgroundInvokeGreeter: delayed in back ground call without completion", () => {
it("should call greet", async () => {
const result = await new TestDriver(
protoMetadata,
"TestGreeter",
new DelayedInBackgroundInvokeGreeter(),
"/test.TestGreeter/Greet",
[startMessage(1), inputMessage(greetRequest("Till"))]
).run();

expect(result).toStrictEqual([
backgroundInvokeMessage(
"test.TestGreeter",
"Greet",
greetRequest("Francesco"),
delayedCallTime
),
outputMessage(greetResponse("Hello")),
]);
});
});

describe("DelayedInBackgroundInvokeGreeter: delayed in background call with replay", () => {
it("should call greet", async () => {
const result = await new TestDriver(
protoMetadata,
"TestGreeter",
new DelayedInBackgroundInvokeGreeter(),
"/test.TestGreeter/Greet",
[
startMessage(2),
inputMessage(greetRequest("Till")),
backgroundInvokeMessage(
"test.TestGreeter",
"Greet",
greetRequest("Francesco"),
delayedCallTime
),
]
).run();

expect(result).toStrictEqual([outputMessage(greetResponse("Hello"))]);
});
});

describe("DelayedInBackgroundInvokeGreeter: delayed in background call with journal mismatch", () => {
it("should call greet", async () => {
const result = await new TestDriver(
protoMetadata,
"TestGreeter",
new DelayedInBackgroundInvokeGreeter(),
"/test.TestGreeter/Greet",
[
startMessage(2),
inputMessage(greetRequest("Till")),
invokeMessage("test.TestGreeter", "Greet", greetRequest("Francesco")),
]
).run();

expect(result.length).toStrictEqual(1);
checkError(
result[0],
"Replayed journal entries did not correspond to the user code. The user code has to be deterministic!"
);
});
});

describe("DelayedAndNormalInBackgroundInvokesGreeter: two async calls. One with delay, one normal.", () => {
it("should call greet", async () => {
const result = await new TestDriver(
protoMetadata,
"TestGreeter",
new DelayedAndNormalInBackgroundInvokesGreeter(),
"/test.TestGreeter/Greet",
[startMessage(1), inputMessage(greetRequest("Till"))]
).run();

expect(result).toStrictEqual([
backgroundInvokeMessage(
"test.TestGreeter",
"Greet",
greetRequest("Francesco"),
delayedCallTime
),
backgroundInvokeMessage(
"test.TestGreeter",
"Greet",
greetRequest("Francesco")
),
outputMessage(greetResponse("Hello")),
]);
});
});

// TODO also implement the other tests of the Java SDK.
2 changes: 1 addition & 1 deletion test/test_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class TestingContext implements RestateContext {
clear(name: string): void {
throw new Error("Method not implemented.");
}
inBackground<T>(call: () => Promise<T>): Promise<void> {
inBackground<T>(call: () => Promise<T>, delayMillis?: number): Promise<void> {
throw new Error("Method not implemented.");
}
sideEffect<T>(fn: () => Promise<T>): Promise<T> {
Expand Down