Skip to content

Commit 73cbb9b

Browse files
committed
Add delayed call
1 parent b41e1b9 commit 73cbb9b

File tree

7 files changed

+187
-31
lines changed

7 files changed

+187
-31
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
"release": "release-it"
3535
},
3636
"files": [
37-
"dist", "src"
37+
"dist",
38+
"src"
3839
],
3940
"dependencies": {
4041
"protobufjs": "^7.2.2",

proto/protocol.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ message BackgroundInvokeEntryMessage {
123123
string method_name = 2;
124124

125125
bytes parameter = 3;
126+
127+
// Time when this BackgroundInvoke should be executed.
128+
// The time is set as duration since UNIX Epoch.
129+
// If this value is not set, equal to 0, or past in time,
130+
// the runtime will execute this BackgroundInvoke as soon as possible.
131+
int64 invoke_time = 4;
126132
}
127133

128134
// Kind: Completable JournalEntry

src/restate_context.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ export interface RestateContext {
6565
* client.greet(Request.create({ name: "Peter" }))
6666
* )
6767
*/
68-
inBackground<T>(call: () => Promise<T>): Promise<void>;
68+
inBackground<T>(call: () => Promise<T>, delayMillis?: number): void;
6969

7070
/**
7171
* Execute a side effect and store the result in the Restate runtime.

src/state_machine.ts

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
125125
// So to be able to know if a request is a background request or not, the user first sets this flag:
126126
// e.g.: ctx.inBackground(() => client.greet(request))
127127
private inBackgroundCallFlag = false;
128+
private inBackgroundCallDelay = 0;
128129

129130
// This flag is set to true when we are executing code that is inside a side effect.
130131
// We use this flag to prevent the user from doing operations on the context from within a side effect.
@@ -424,11 +425,19 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
424425
// Validation check that we are not in a sideEffect is done in inBackground() already.
425426
this.incrementJournalIndex();
426427

427-
const msg = BackgroundInvokeEntryMessage.create({
428-
serviceName: service,
429-
methodName: method,
430-
parameter: Buffer.from(data),
431-
});
428+
const msg =
429+
this.inBackgroundCallDelay > 0
430+
? BackgroundInvokeEntryMessage.create({
431+
serviceName: service,
432+
methodName: method,
433+
parameter: Buffer.from(data),
434+
invokeTime: Date.now() + this.inBackgroundCallDelay,
435+
})
436+
: BackgroundInvokeEntryMessage.create({
437+
serviceName: service,
438+
methodName: method,
439+
parameter: Buffer.from(data),
440+
});
432441

433442
if (this.state === ExecutionState.REPLAYING) {
434443
// In replay mode: background invoke will not be forwarded to the runtime.
@@ -522,26 +531,28 @@ export class DurableExecutionStateMachine<I, O> implements RestateContext {
522531
});
523532
}
524533

525-
/**
526-
* When you call inBackground, a flag is set that you want the nested call to be executed in the background.
527-
* Then you use the client to do the call to the other Restate service.
528-
* When you do the call, the overridden request method gets called.
529-
* That one checks if the inBackgroundFlag is set.
530-
* If so, it doesn't care about a response and just returns back an empty UInt8Array, and otherwise it waits for the response.
531-
* The reason for this is that we use the generated clients of proto-ts to do invokes.
532-
* And we override the request method that is called by that client to do the Restate related things.
533-
* The request method of the proto-ts client requires returning a Promise.
534-
* So until we find a cleaner solution for this, in which we can still use the generated clients but are not required to return a promise,
535-
* this will return a void Promise.
536-
* @param call
537-
*/
538-
async inBackground<T>(call: () => Promise<T>): Promise<void> {
534+
// When you call inBackground, a flag is set that you want the nested call to be executed in the background.
535+
// Then you use the client to do the call to the other Restate service.
536+
// When you do the call, the overridden request method gets called.
537+
// That one checks if the inBackgroundFlag is set.
538+
// If so, it doesn't care about a response and just returns back an empty UInt8Array, and otherwise it waits for the response.
539+
// The reason for this is that we use the generated clients of proto-ts to do invokes.
540+
// And we override the request method that is called by that client to do the Restate related things.
541+
// The request method of the proto-ts client requires returning a Promise.
542+
// So until we find a cleaner solution for this, in which we can still use the generated clients but are not required to return a promise,
543+
// this will return a void Promise.
544+
async inBackground<T>(
545+
call: () => Promise<T>,
546+
delayMillis?: number
547+
): Promise<void> {
539548
if (!this.isValidState("inBackground")) {
540549
return Promise.reject();
541550
}
542551

543552
this.inBackgroundCallFlag = true;
553+
this.inBackgroundCallDelay = delayMillis || 0;
544554
await call();
555+
this.inBackgroundCallDelay = 0;
545556
this.inBackgroundCallFlag = false;
546557
}
547558

test/protoutils.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -223,16 +223,27 @@ export function invokeMessage(
223223
export function backgroundInvokeMessage(
224224
serviceName: string,
225225
methodName: string,
226-
parameter: Uint8Array
226+
parameter: Uint8Array,
227+
invokeTime?: number
227228
): Message {
228-
return new Message(
229-
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
230-
BackgroundInvokeEntryMessage.create({
231-
serviceName: serviceName,
232-
methodName: methodName,
233-
parameter: Buffer.from(parameter),
234-
})
235-
);
229+
return invokeTime
230+
? new Message(
231+
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
232+
BackgroundInvokeEntryMessage.create({
233+
serviceName: serviceName,
234+
methodName: methodName,
235+
parameter: Buffer.from(parameter),
236+
invokeTime: invokeTime,
237+
})
238+
)
239+
: new Message(
240+
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
241+
BackgroundInvokeEntryMessage.create({
242+
serviceName: serviceName,
243+
methodName: methodName,
244+
parameter: Buffer.from(parameter),
245+
})
246+
);
236247
}
237248

238249
export function decodeSideEffectFromResult(msg: Uint8Array | ProtocolMessage) {

test/send_request.test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,39 @@ class FailingForwardGreetingService implements TestGreeter {
123123
}
124124
}
125125

126+
const delayedCallTime = 1835661783000;
127+
class DelayedInBackgroundInvokeGreeter implements TestGreeter {
128+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
129+
async greet(request: TestRequest): Promise<TestResponse> {
130+
const ctx = restate.useContext(this);
131+
132+
const client = new TestGreeterClientImpl(ctx);
133+
await ctx.inBackground(
134+
() => client.greet(TestRequest.create({ name: "Francesco" })),
135+
delayedCallTime - Date.now()
136+
);
137+
138+
return TestResponse.create({ greeting: `Hello` });
139+
}
140+
}
141+
class DelayedAndNormalInBackgroundInvokesGreeter implements TestGreeter {
142+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
143+
async greet(request: TestRequest): Promise<TestResponse> {
144+
const ctx = restate.useContext(this);
145+
146+
const client = new TestGreeterClientImpl(ctx);
147+
await ctx.inBackground(
148+
() => client.greet(TestRequest.create({ name: "Francesco" })),
149+
delayedCallTime - Date.now()
150+
);
151+
await ctx.inBackground(() =>
152+
client.greet(TestRequest.create({ name: "Francesco" }))
153+
);
154+
155+
return TestResponse.create({ greeting: `Hello` });
156+
}
157+
}
158+
126159
describe("ReverseAwaitOrder: None completed", () => {
127160
it("should call greet", async () => {
128161
const result = await new TestDriver(
@@ -669,4 +702,98 @@ describe("FailingSideEffectInBackgroundInvokeGreeter: failing background call ",
669702
});
670703
});
671704

705+
describe("DelayedInBackgroundInvokeGreeter: delayed in back ground call without completion", () => {
706+
it("should call greet", async () => {
707+
const result = await new TestDriver(
708+
protoMetadata,
709+
"TestGreeter",
710+
new DelayedInBackgroundInvokeGreeter(),
711+
"/test.TestGreeter/Greet",
712+
[startMessage(1), inputMessage(greetRequest("Till"))]
713+
).run();
714+
715+
expect(result).toStrictEqual([
716+
backgroundInvokeMessage(
717+
"test.TestGreeter",
718+
"Greet",
719+
greetRequest("Francesco"),
720+
delayedCallTime
721+
),
722+
outputMessage(greetResponse("Hello")),
723+
]);
724+
});
725+
});
726+
727+
describe("DelayedInBackgroundInvokeGreeter: delayed in background call with replay", () => {
728+
it("should call greet", async () => {
729+
const result = await new TestDriver(
730+
protoMetadata,
731+
"TestGreeter",
732+
new DelayedInBackgroundInvokeGreeter(),
733+
"/test.TestGreeter/Greet",
734+
[
735+
startMessage(2),
736+
inputMessage(greetRequest("Till")),
737+
backgroundInvokeMessage(
738+
"test.TestGreeter",
739+
"Greet",
740+
greetRequest("Francesco"),
741+
delayedCallTime
742+
),
743+
]
744+
).run();
745+
746+
expect(result).toStrictEqual([outputMessage(greetResponse("Hello"))]);
747+
});
748+
});
749+
750+
describe("DelayedInBackgroundInvokeGreeter: delayed in background call with journal mismatch", () => {
751+
it("should call greet", async () => {
752+
const result = await new TestDriver(
753+
protoMetadata,
754+
"TestGreeter",
755+
new DelayedInBackgroundInvokeGreeter(),
756+
"/test.TestGreeter/Greet",
757+
[
758+
startMessage(2),
759+
inputMessage(greetRequest("Till")),
760+
invokeMessage("test.TestGreeter", "Greet", greetRequest("Francesco")),
761+
]
762+
).run();
763+
764+
expect(result.length).toStrictEqual(1);
765+
checkError(
766+
result[0],
767+
"Replayed journal entries did not correspond to the user code. The user code has to be deterministic!"
768+
);
769+
});
770+
});
771+
772+
describe("DelayedAndNormalInBackgroundInvokesGreeter: two async calls. One with delay, one normal.", () => {
773+
it("should call greet", async () => {
774+
const result = await new TestDriver(
775+
protoMetadata,
776+
"TestGreeter",
777+
new DelayedAndNormalInBackgroundInvokesGreeter(),
778+
"/test.TestGreeter/Greet",
779+
[startMessage(1), inputMessage(greetRequest("Till"))]
780+
).run();
781+
782+
expect(result).toStrictEqual([
783+
backgroundInvokeMessage(
784+
"test.TestGreeter",
785+
"Greet",
786+
greetRequest("Francesco"),
787+
delayedCallTime
788+
),
789+
backgroundInvokeMessage(
790+
"test.TestGreeter",
791+
"Greet",
792+
greetRequest("Francesco")
793+
),
794+
outputMessage(greetResponse("Hello")),
795+
]);
796+
});
797+
});
798+
672799
// TODO also implement the other tests of the Java SDK.

test/test_context.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class TestingContext implements RestateContext {
4343
clear(name: string): void {
4444
throw new Error("Method not implemented.");
4545
}
46-
inBackground<T>(call: () => Promise<T>): Promise<void> {
46+
inBackground<T>(call: () => Promise<T>, delayMillis?: number): Promise<void> {
4747
throw new Error("Method not implemented.");
4848
}
4949
sideEffect<T>(fn: () => Promise<T>): Promise<T> {

0 commit comments

Comments
 (0)