diff --git a/package.json b/package.json index 51ff2bb8..ffdd4c9d 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,8 @@ "release": "release-it" }, "files": [ - "dist", "src" + "dist", + "src" ], "dependencies": { "protobufjs": "^7.2.2", diff --git a/proto/protocol.proto b/proto/protocol.proto index d720dcdd..586ea68c 100644 --- a/proto/protocol.proto +++ b/proto/protocol.proto @@ -123,6 +123,12 @@ message BackgroundInvokeEntryMessage { string method_name = 2; bytes parameter = 3; + + // Time when this BackgroundInvoke should be executed. + // The time is set as duration since UNIX Epoch. + // If this value is not set, equal to 0, or past in time, + // the runtime will execute this BackgroundInvoke as soon as possible. + int64 invoke_time = 4; } // Kind: Completable JournalEntry diff --git a/src/restate_context.ts b/src/restate_context.ts index 983b89a7..c88e5228 100644 --- a/src/restate_context.ts +++ b/src/restate_context.ts @@ -65,7 +65,7 @@ export interface RestateContext { * client.greet(Request.create({ name: "Peter" })) * ) */ - inBackground(call: () => Promise): Promise; + inBackground(call: () => Promise, delayMillis?: number): void; /** * Execute a side effect and store the result in the Restate runtime. diff --git a/src/state_machine.ts b/src/state_machine.ts index 2318c8a4..bfa58302 100644 --- a/src/state_machine.ts +++ b/src/state_machine.ts @@ -125,6 +125,7 @@ export class DurableExecutionStateMachine implements RestateContext { // So to be able to know if a request is a background request or not, the user first sets this flag: // e.g.: ctx.inBackground(() => client.greet(request)) private inBackgroundCallFlag = false; + private inBackgroundCallDelay = 0; // This flag is set to true when we are executing code that is inside a side effect. // We use this flag to prevent the user from doing operations on the context from within a side effect. @@ -424,11 +425,19 @@ export class DurableExecutionStateMachine implements RestateContext { // Validation check that we are not in a sideEffect is done in inBackground() already. this.incrementJournalIndex(); - const msg = BackgroundInvokeEntryMessage.create({ - serviceName: service, - methodName: method, - parameter: Buffer.from(data), - }); + const msg = + this.inBackgroundCallDelay > 0 + ? BackgroundInvokeEntryMessage.create({ + serviceName: service, + methodName: method, + parameter: Buffer.from(data), + invokeTime: Date.now() + this.inBackgroundCallDelay, + }) + : BackgroundInvokeEntryMessage.create({ + serviceName: service, + methodName: method, + parameter: Buffer.from(data), + }); if (this.state === ExecutionState.REPLAYING) { // In replay mode: background invoke will not be forwarded to the runtime. @@ -522,26 +531,28 @@ export class DurableExecutionStateMachine implements RestateContext { }); } - /** - * When you call inBackground, a flag is set that you want the nested call to be executed in the background. - * Then you use the client to do the call to the other Restate service. - * When you do the call, the overridden request method gets called. - * That one checks if the inBackgroundFlag is set. - * If so, it doesn't care about a response and just returns back an empty UInt8Array, and otherwise it waits for the response. - * The reason for this is that we use the generated clients of proto-ts to do invokes. - * And we override the request method that is called by that client to do the Restate related things. - * The request method of the proto-ts client requires returning a Promise. - * So until we find a cleaner solution for this, in which we can still use the generated clients but are not required to return a promise, - * this will return a void Promise. - * @param call - */ - async inBackground(call: () => Promise): Promise { + // When you call inBackground, a flag is set that you want the nested call to be executed in the background. + // Then you use the client to do the call to the other Restate service. + // When you do the call, the overridden request method gets called. + // That one checks if the inBackgroundFlag is set. + // If so, it doesn't care about a response and just returns back an empty UInt8Array, and otherwise it waits for the response. + // The reason for this is that we use the generated clients of proto-ts to do invokes. + // And we override the request method that is called by that client to do the Restate related things. + // The request method of the proto-ts client requires returning a Promise. + // So until we find a cleaner solution for this, in which we can still use the generated clients but are not required to return a promise, + // this will return a void Promise. + async inBackground( + call: () => Promise, + delayMillis?: number + ): Promise { if (!this.isValidState("inBackground")) { return Promise.reject(); } this.inBackgroundCallFlag = true; + this.inBackgroundCallDelay = delayMillis || 0; await call(); + this.inBackgroundCallDelay = 0; this.inBackgroundCallFlag = false; } diff --git a/test/protoutils.ts b/test/protoutils.ts index 09a3c019..dcafb214 100644 --- a/test/protoutils.ts +++ b/test/protoutils.ts @@ -223,16 +223,27 @@ export function invokeMessage( export function backgroundInvokeMessage( serviceName: string, methodName: string, - parameter: Uint8Array + parameter: Uint8Array, + invokeTime?: number ): Message { - return new Message( - BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE, - BackgroundInvokeEntryMessage.create({ - serviceName: serviceName, - methodName: methodName, - parameter: Buffer.from(parameter), - }) - ); + return invokeTime + ? new Message( + BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE, + BackgroundInvokeEntryMessage.create({ + serviceName: serviceName, + methodName: methodName, + parameter: Buffer.from(parameter), + invokeTime: invokeTime, + }) + ) + : new Message( + BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE, + BackgroundInvokeEntryMessage.create({ + serviceName: serviceName, + methodName: methodName, + parameter: Buffer.from(parameter), + }) + ); } export function decodeSideEffectFromResult(msg: Uint8Array | ProtocolMessage) { diff --git a/test/send_request.test.ts b/test/send_request.test.ts index 8089b5c4..db42b9ba 100644 --- a/test/send_request.test.ts +++ b/test/send_request.test.ts @@ -123,6 +123,39 @@ class FailingForwardGreetingService implements TestGreeter { } } +const delayedCallTime = 1835661783000; +class DelayedInBackgroundInvokeGreeter implements TestGreeter { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async greet(request: TestRequest): Promise { + const ctx = restate.useContext(this); + + const client = new TestGreeterClientImpl(ctx); + await ctx.inBackground( + () => client.greet(TestRequest.create({ name: "Francesco" })), + delayedCallTime - Date.now() + ); + + return TestResponse.create({ greeting: `Hello` }); + } +} +class DelayedAndNormalInBackgroundInvokesGreeter implements TestGreeter { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async greet(request: TestRequest): Promise { + const ctx = restate.useContext(this); + + const client = new TestGreeterClientImpl(ctx); + await ctx.inBackground( + () => client.greet(TestRequest.create({ name: "Francesco" })), + delayedCallTime - Date.now() + ); + await ctx.inBackground(() => + client.greet(TestRequest.create({ name: "Francesco" })) + ); + + return TestResponse.create({ greeting: `Hello` }); + } +} + describe("ReverseAwaitOrder: None completed", () => { it("should call greet", async () => { const result = await new TestDriver( @@ -669,4 +702,98 @@ describe("FailingSideEffectInBackgroundInvokeGreeter: failing background call ", }); }); +describe("DelayedInBackgroundInvokeGreeter: delayed in back ground call without completion", () => { + it("should call greet", async () => { + const result = await new TestDriver( + protoMetadata, + "TestGreeter", + new DelayedInBackgroundInvokeGreeter(), + "/test.TestGreeter/Greet", + [startMessage(1), inputMessage(greetRequest("Till"))] + ).run(); + + expect(result).toStrictEqual([ + backgroundInvokeMessage( + "test.TestGreeter", + "Greet", + greetRequest("Francesco"), + delayedCallTime + ), + outputMessage(greetResponse("Hello")), + ]); + }); +}); + +describe("DelayedInBackgroundInvokeGreeter: delayed in background call with replay", () => { + it("should call greet", async () => { + const result = await new TestDriver( + protoMetadata, + "TestGreeter", + new DelayedInBackgroundInvokeGreeter(), + "/test.TestGreeter/Greet", + [ + startMessage(2), + inputMessage(greetRequest("Till")), + backgroundInvokeMessage( + "test.TestGreeter", + "Greet", + greetRequest("Francesco"), + delayedCallTime + ), + ] + ).run(); + + expect(result).toStrictEqual([outputMessage(greetResponse("Hello"))]); + }); +}); + +describe("DelayedInBackgroundInvokeGreeter: delayed in background call with journal mismatch", () => { + it("should call greet", async () => { + const result = await new TestDriver( + protoMetadata, + "TestGreeter", + new DelayedInBackgroundInvokeGreeter(), + "/test.TestGreeter/Greet", + [ + startMessage(2), + inputMessage(greetRequest("Till")), + invokeMessage("test.TestGreeter", "Greet", greetRequest("Francesco")), + ] + ).run(); + + expect(result.length).toStrictEqual(1); + checkError( + result[0], + "Replayed journal entries did not correspond to the user code. The user code has to be deterministic!" + ); + }); +}); + +describe("DelayedAndNormalInBackgroundInvokesGreeter: two async calls. One with delay, one normal.", () => { + it("should call greet", async () => { + const result = await new TestDriver( + protoMetadata, + "TestGreeter", + new DelayedAndNormalInBackgroundInvokesGreeter(), + "/test.TestGreeter/Greet", + [startMessage(1), inputMessage(greetRequest("Till"))] + ).run(); + + expect(result).toStrictEqual([ + backgroundInvokeMessage( + "test.TestGreeter", + "Greet", + greetRequest("Francesco"), + delayedCallTime + ), + backgroundInvokeMessage( + "test.TestGreeter", + "Greet", + greetRequest("Francesco") + ), + outputMessage(greetResponse("Hello")), + ]); + }); +}); + // TODO also implement the other tests of the Java SDK. diff --git a/test/test_context.ts b/test/test_context.ts index c48aa998..51d206ac 100644 --- a/test/test_context.ts +++ b/test/test_context.ts @@ -43,7 +43,7 @@ export class TestingContext implements RestateContext { clear(name: string): void { throw new Error("Method not implemented."); } - inBackground(call: () => Promise): Promise { + inBackground(call: () => Promise, delayMillis?: number): Promise { throw new Error("Method not implemented."); } sideEffect(fn: () => Promise): Promise {