Skip to content

Commit f01ca09

Browse files
committed
refactor: reorganize test code for step execution and enhance determinism checks in Go workflows
1 parent 9d497bd commit f01ca09

File tree

2 files changed

+105
-18
lines changed

2 files changed

+105
-18
lines changed

dbos/workflow.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1236,7 +1236,6 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12361236
// result := resultChan.result
12371237
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) {
12381238
if ctx == nil {
1239-
// is this the correct return here?
12401239
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil")
12411240
}
12421241

dbos/workflows_test.go

Lines changed: 105 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,11 @@ func simpleStepError(_ context.Context) (string, error) {
4343
return "", fmt.Errorf("step failure")
4444
}
4545

46-
type stepWithSleepOutput struct {
47-
StepID int
48-
Result string
49-
Error error
50-
}
51-
5246
func stepWithSleep(_ context.Context, duration time.Duration) (string, error) {
5347
time.Sleep(duration)
5448
return fmt.Sprintf("from step that slept for %s", duration), nil
5549
}
5650

57-
func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) {
58-
time.Sleep(duration)
59-
return stepWithSleepOutput{
60-
StepID: stepID,
61-
Result: fmt.Sprintf("from step that slept for %s", duration),
62-
Error: nil,
63-
}, nil
64-
}
65-
6651
func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) {
6752
return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
6853
return simpleStepError(ctx)
@@ -876,7 +861,37 @@ func TestSteps(t *testing.T) {
876861
})
877862
}
878863

864+
type stepWithSleepOutput struct {
865+
StepID int
866+
Result string
867+
Error error
868+
}
869+
870+
var (
871+
stepDeterminismStartEvent *Event
872+
stepDeterminismEvent *Event
873+
)
874+
875+
func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) {
876+
time.Sleep(duration)
877+
return stepWithSleepOutput{
878+
StepID: stepID,
879+
Result: fmt.Sprintf("from step that slept for %s", duration),
880+
Error: nil,
881+
}, nil
882+
}
883+
884+
// blocks indefinitely
885+
func stepThatBlocks(_ context.Context) (string, error) {
886+
stepDeterminismStartEvent.Set()
887+
fmt.Println("stepThatBlocks: started to block")
888+
stepDeterminismEvent.Wait()
889+
fmt.Println("stepThatBlocks: unblocked")
890+
return "from step that blocked", nil
891+
}
892+
879893
func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
894+
880895
dbosCtx := setupDBOS(t, true, true)
881896

882897
// Register custom types for Gob encoding
@@ -917,15 +932,15 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
917932
require.Equal(t, "step error", err.Error())
918933
})
919934

920-
t.Run("Go must execute 100 steps simultaneously", func(t *testing.T) {
935+
t.Run("Go must execute 100 steps simultaneously then return the stepIDs in the correct sequence", func(t *testing.T) {
921936
// run 100 steps simultaneously
922937
const numSteps = 100
923938
results := make(chan string, numSteps)
924939
errors := make(chan error, numSteps)
925940
var resultChans []<-chan StepOutcome[stepWithSleepOutput]
926941

927942
goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
928-
for i := 0; i < numSteps; i++ {
943+
for i := range numSteps {
929944
resultChan, err := Go(dbosCtx, func(ctx context.Context) (stepWithSleepOutput, error) {
930945
return stepWithSleepCustomOutput(ctx, 20*time.Millisecond, i)
931946
})
@@ -956,7 +971,80 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
956971
require.NoError(t, err, "failed to get result from go workflow")
957972
assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results))
958973
assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors))
974+
})
975+
976+
t.Run("Go executes the same workflow twice, whilst blocking the first workflow, to test for deterministic execution when using Go routines", func(t *testing.T) {
977+
978+
stepDeterminismStartEvent = NewEvent()
979+
stepDeterminismEvent = NewEvent()
959980

981+
goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
982+
_, err := Go(dbosCtx, func(ctx context.Context) (string, error) {
983+
return stepWithSleep(ctx, 1*time.Second)
984+
})
985+
986+
if err != nil {
987+
return "", err
988+
}
989+
990+
_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
991+
return stepWithSleep(ctx, 1*time.Second)
992+
})
993+
994+
if err != nil {
995+
return "", err
996+
}
997+
998+
_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
999+
return stepWithSleep(ctx, 1*time.Second)
1000+
})
1001+
1002+
if err != nil {
1003+
return "", err
1004+
}
1005+
1006+
_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
1007+
return stepThatBlocks(ctx)
1008+
})
1009+
1010+
if err != nil {
1011+
return "", err
1012+
}
1013+
1014+
return "WORKFLOW EXECUTED DETERMINISTICALLY", nil
1015+
}
1016+
1017+
// Run the first workflow
1018+
RegisterWorkflow(dbosCtx, goWorkflow)
1019+
handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input")
1020+
require.NoError(t, err, "failed to run go workflow")
1021+
1022+
// Wait for the first workflow to reach the blocking step
1023+
stepDeterminismStartEvent.Wait()
1024+
stepDeterminismStartEvent.Clear()
1025+
1026+
// Run the second workflow
1027+
handle2, err := RunWorkflow(dbosCtx, goWorkflow, "test-input", WithWorkflowID(handle.GetWorkflowID()))
1028+
1029+
// If it throws an error, it's because of steps not being deterministically executed when using Go routines in the first workflow
1030+
require.NoError(t, err, "failed to run go workflow")
1031+
1032+
// Complete the blocked workflow
1033+
stepDeterminismEvent.Set()
1034+
1035+
_, err = handle2.GetResult()
1036+
require.NoError(t, err, "failed to get result from go workflow")
1037+
1038+
// Verify workflow status is SUCCESS
1039+
status, err := handle.GetStatus()
1040+
require.NoError(t, err, "failed to get workflow status")
1041+
require.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be WorkflowStatusSuccess")
1042+
1043+
1044+
// Verify workflow result is "WORKFLOW EXECUTED DETERMINISTICALLY"
1045+
result, err := handle.GetResult()
1046+
require.NoError(t, err, "failed to get result from go workflow")
1047+
require.Equal(t, "WORKFLOW EXECUTED DETERMINISTICALLY", result, "expected result to be 'WORKFLOW EXECUTED DETERMINISTICALLY'")
9601048
})
9611049
}
9621050

0 commit comments

Comments
 (0)