Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
318bb8e
just call deserialized during dequeue
maxdml Oct 29, 2025
0259dd6
decode during recovery (instead of getting it decoded from list work…
maxdml Oct 29, 2025
bf35e78
lift encoding outside of the system db
maxdml Oct 29, 2025
9852279
encode in enqueue, client
maxdml Oct 29, 2025
f945e3b
fix
maxdml Oct 29, 2025
ecad764
decode on the concurrent execution fallback path
maxdml Oct 29, 2025
16f6aea
system db expects pointers to string, correctly set in serialize()
maxdml Oct 29, 2025
f05d0c9
not now
maxdml Oct 30, 2025
a3bd6f7
fix + awaitWorkflowResult returns *string
maxdml Oct 30, 2025
7c8211a
fixes
maxdml Oct 30, 2025
e87d0e6
nit
maxdml Oct 30, 2025
ab20c6a
custom serializer
maxdml Oct 27, 2025
2ae4129
wip
maxdml Oct 28, 2025
687842e
comment
maxdml Oct 28, 2025
0646fcf
udpate test
maxdml Oct 28, 2025
9e5fb44
test recovery path. Add JSON recast in wrapped functions and in step …
maxdml Oct 28, 2025
0700203
qeueue decode + test
maxdml Oct 28, 2025
7b96377
wip interfaces
maxdml Oct 29, 2025
550cd76
wip
maxdml Oct 29, 2025
8c08963
remove Gob encoder
maxdml Oct 30, 2025
e5844c9
simplify tests and extend coverage
maxdml Oct 30, 2025
7950464
test concrete signatures, test more send/recv set/get messages
maxdml Oct 30, 2025
c48821f
update tests
maxdml Oct 30, 2025
8bd98fc
fix
maxdml Oct 30, 2025
ae0f005
comments
maxdml Oct 30, 2025
0d9df4e
comment
maxdml Oct 30, 2025
03cb9a4
cleanup
maxdml Oct 30, 2025
9e7be97
fix
maxdml Oct 30, 2025
ad5fe3c
comments
maxdml Oct 30, 2025
38c4ff7
Merge remote-tracking branch 'origin/lift-serialization-to-typed-laye…
maxdml Oct 30, 2025
a2b0872
cleanup
maxdml Oct 31, 2025
4d7f09b
remove unwanted files
maxdml Oct 31, 2025
a9e2b85
remove unwanted files
maxdml Oct 31, 2025
69869d2
fix merge weirdness
maxdml Oct 31, 2025
7cc6ffb
cleanup
maxdml Oct 31, 2025
bcd86f3
cleanup
maxdml Oct 31, 2025
dd8d725
fix
maxdml Nov 1, 2025
033e794
fix for mocking
maxdml Nov 1, 2025
8576172
simplify
maxdml Nov 1, 2025
344149c
simplify
maxdml Nov 1, 2025
39568ee
simplify
maxdml Nov 1, 2025
d749433
typo
maxdml Nov 1, 2025
ae8af6a
WIP: test user provided gob encoder
maxdml Nov 3, 2025
ff6c8c6
cleanup
maxdml Nov 3, 2025
888e700
cleanup
maxdml Nov 3, 2025
fa59aa8
remove json serializer, no custom serializer
maxdml Nov 3, 2025
fb8c685
fix corner case with pointers and ListWorkflowSteps/GetWorkflowSteps
maxdml Nov 3, 2025
00ac18b
add empty string test
maxdml Nov 3, 2025
189b201
nit
maxdml Nov 3, 2025
2aff44e
cleanup
maxdml Nov 3, 2025
dec1a76
cleanup
maxdml Nov 3, 2025
2679e55
private
maxdml Nov 3, 2025
cf12908
more private
maxdml Nov 3, 2025
e81c208
simplify
maxdml Nov 3, 2025
8d74304
cleanup
maxdml Nov 3, 2025
800c0ed
more complex struct
maxdml Nov 4, 2025
9676054
shouldn't require user gob registration
maxdml Nov 4, 2025
14f73be
no interface signature
maxdml Nov 4, 2025
31ebaee
cleanup
maxdml Nov 4, 2025
ccb51f6
cleanup
maxdml Nov 4, 2025
a05aead
make the serializer generic to tighten the code, add a test with manu…
maxdml Nov 4, 2025
439a92c
cleanup + add pointer fields to test struct
maxdml Nov 4, 2025
61f727d
consolidate tests and always exercise recovery path
maxdml Nov 4, 2025
1a6a950
cannot store child 'steps' as nil in the db to run as step
maxdml Nov 4, 2025
370d33e
test some more signatures
maxdml Nov 4, 2025
de6488a
fix
maxdml Nov 4, 2025
5443e4f
nits
maxdml Nov 4, 2025
219a647
internal stepInfo with *string step output
maxdml Nov 5, 2025
eb4371a
detect nested pointers during wf registration and running steps
maxdml Nov 5, 2025
85b37cf
fix flaky test that more than flaky
maxdml Nov 5, 2025
e163953
fix another race in test
maxdml Nov 5, 2025
a1a6e47
json serializer
maxdml Nov 7, 2025
33e444c
remove nested pointer check, update tests
maxdml Nov 9, 2025
cdf7642
cleanup
maxdml Nov 10, 2025
b6bef08
return input/outputs as a raw json string from list workflows and get…
maxdml Nov 10, 2025
927cc87
fix
maxdml Nov 10, 2025
6fe311c
fix admin server responses, fix zero values handling
maxdml Nov 10, 2025
4b400c5
differentiate nil values and zero values
maxdml Nov 10, 2025
8f840c9
fix
maxdml Nov 10, 2025
d293bac
update test
maxdml Nov 10, 2025
2bd394a
nits
maxdml Nov 10, 2025
87a1e4e
simpler
maxdml Nov 10, 2025
c458251
fix
maxdml Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,23 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
result["StartedAt"] = nil
}

if ws.Input != nil && ws.Input != "" {
bytes, err := json.Marshal(ws.Input)
if err != nil {
return nil, fmt.Errorf("failed to marshal input: %w", err)
if ws.Input != nil {
// If there is a value, it should be a JSON string
jsonInput, ok := ws.Input.(string)
if ok {
result["Input"] = jsonInput
} else {
result["Input"] = ""
}
result["Input"] = string(bytes)
}

if ws.Output != nil && ws.Output != "" {
bytes, err := json.Marshal(ws.Output)
if err != nil {
return nil, fmt.Errorf("failed to marshal output: %w", err)
if ws.Output != nil {
jsonOutput, ok := ws.Output.(string)
if ok {
result["Output"] = jsonOutput
} else {
result["Output"] = ""
}
result["Output"] = string(bytes)
}

if ws.Error != nil {
Expand Down Expand Up @@ -439,15 +442,16 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
"child_workflow_id": step.ChildWorkflowID,
}

// Marshal Output as JSON string if present
if step.Output != nil && step.Output != "" {
bytes, err := json.Marshal(step.Output)
if err != nil {
ctx.logger.Error("Failed to marshal step output", "error", err)
http.Error(w, fmt.Sprintf("Failed to format step output: %v", err), http.StatusInternalServerError)
return
if step.Output != nil {
// If there is a value, it should be a JSON string
jsonOutput, ok := step.Output.(string)
if ok {
formattedStep["output"] = jsonOutput
} else {
formattedStep["output"] = ""
}
formattedStep["output"] = string(bytes)
} else {
formattedStep["output"] = ""
}

// Marshal Error as JSON string if present
Expand Down
21 changes: 4 additions & 17 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,12 @@ func TestAdminServer(t *testing.T) {
// Empty string workflow: both input and output are empty strings
// According to the logic, empty strings should not have Input/Output fields
input, hasInput := wf["Input"]
require.Equal(t, "", input)
require.Equal(t, "\"\"", input)
require.True(t, hasInput, "Empty string workflow should have Input field")

output, hasOutput := wf["Output"]
require.True(t, hasOutput, "Empty string workflow should have Output field")
require.Equal(t, "", output)
require.Equal(t, "\"\"", output)

} else if wfID == structHandle.GetWorkflowID() {
// Struct workflow: input and output should be marshaled as JSON strings
Expand Down Expand Up @@ -874,22 +874,9 @@ func TestAdminServer(t *testing.T) {
assert.Contains(t, unmarshaledError, "deliberate error for testing", "Error message should be preserved")

case "emptyStep":
// Empty string might be returned as nil or as an empty JSON string
// Empty string is returned as an empty JSON string
output := step["output"]
if output == nil {
// Empty string was not included in response (which is fine)
t.Logf("Empty step output was nil (not included)")
} else {
// If it was included, it should be marshaled as JSON string `""`
outputStr, ok := output.(string)
require.True(t, ok, "If present, empty step output should be a JSON string")

var unmarshaledOutput string
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
require.NoError(t, err, "Failed to unmarshal empty step output")
assert.Equal(t, "", unmarshaledOutput, "Empty step output should be empty string")
}

require.Equal(t, "\"\"", output, "Empty step output should be an empty string")
assert.Nil(t, step["error"], "Empty step should have no error")
}
}
Expand Down
20 changes: 8 additions & 12 deletions dbos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
if params.priority > uint(math.MaxInt) {
return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt)
}

status := WorkflowStatus{
Name: params.workflowName,
ApplicationVersion: params.applicationVersion,
Expand All @@ -155,7 +156,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
CreatedAt: time.Now(),
Deadline: deadline,
Timeout: params.workflowTimeout,
Input: params.workflowInput,
Input: input,
QueueName: queueName,
DeduplicationID: params.deduplicationID,
Priority: int(params.priority),
Expand Down Expand Up @@ -240,20 +241,15 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op
return nil, errors.New("client cannot be nil")
}

// Register the input and outputs for gob encoding
var logger *slog.Logger
if cl, ok := c.(*client); ok {
if ctx, ok := cl.dbosCtx.(*dbosContext); ok {
logger = ctx.logger
}
// Serialize input
serializer := newJSONSerializer[P]()
encodedInput, err := serializer.Encode(input)
if err != nil {
return nil, fmt.Errorf("failed to serialize workflow input: %w", err)
}
var typedInput P
safeGobRegister(typedInput, logger)
var typedOutput R
safeGobRegister(typedOutput, logger)

// Call the interface method with the same signature
handle, err := c.Enqueue(queueName, workflowName, input, opts...)
handle, err := c.Enqueue(queueName, workflowName, encodedInput, opts...)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,11 @@ func TestCancelResume(t *testing.T) {

// Wait for workflow completion
proceedSignal.Set() // Allow the workflow to proceed to step two
result, err := resumeHandle.GetResult()
resultAny, err := resumeHandle.GetResult()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make the resume handle typed so the returned polling handle can be of type T as well. (Right now on this path, the user gets a polling handle typed any, and the result is already json decoded into an any value, losing its type information)

require.NoError(t, err, "failed to get result from resumed workflow")

// Verify the result
assert.Equal(t, input, result, "expected result to match input")
// Will be a float64 from json decode
require.Equal(t, input, int(resultAny.(float64)), "expected result to match input")

// Verify both steps completed
assert.Equal(t, 2, stepsCompleted, "expected steps completed to be 2")
Expand All @@ -391,10 +391,11 @@ func TestCancelResume(t *testing.T) {
resumeAgainHandle, err := client.ResumeWorkflow(workflowID)
require.NoError(t, err, "failed to resume workflow again")

resultAgain, err := resumeAgainHandle.GetResult()
resultAgainAny, err := resumeAgainHandle.GetResult()
require.NoError(t, err, "failed to get result from second resume")

assert.Equal(t, input, resultAgain, "expected second resume result to match input")
// Will be a float64 from json decode
require.Equal(t, input, int(resultAgainAny.(float64)), "expected result to match input")

// Verify steps didn't run again
assert.Equal(t, 2, stepsCompleted, "expected steps completed to remain 2 after second resume")
Expand Down
9 changes: 1 addition & 8 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (c *dbosContext) Value(key any) any {
return c.ctx.Value(key)
}


// WithValue returns a copy of the DBOS context with the given key-value pair.
// This is similar to context.WithValue but maintains DBOS context capabilities.
// No-op if the provided context is not a concrete dbos.dbosContext.
Expand Down Expand Up @@ -354,14 +355,6 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
initExecutor.logger = config.Logger
initExecutor.logger.Info("Initializing DBOS context", "app_name", config.AppName, "dbos_version", getDBOSVersion())

// Register types we serialize with gob
var t time.Time
safeGobRegister(t, initExecutor.logger)
var ws []WorkflowStatus
safeGobRegister(ws, initExecutor.logger)
var si []StepInfo
safeGobRegister(si, initExecutor.logger)

// Initialize global variables from processed config (already handles env vars and defaults)
initExecutor.applicationVersion = config.ApplicationVersion
initExecutor.executorID = config.ExecutorID
Expand Down
22 changes: 2 additions & 20 deletions dbos/queue.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package dbos

import (
"bytes"
"context"
"encoding/base64"
"encoding/gob"
"log/slog"
"math"
"math/rand"
Expand Down Expand Up @@ -227,23 +224,8 @@ func (qr *queueRunner) run(ctx *dbosContext) {
continue
}

// Deserialize input
var input any
if len(workflow.input) > 0 {
inputBytes, err := base64.StdEncoding.DecodeString(workflow.input)
if err != nil {
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
continue
}
buf := bytes.NewBuffer(inputBytes)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&input); err != nil {
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
continue
}
}

_, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id))
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
_, err = registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
if err != nil {
qr.logger.Error("Error running queued workflow", "error", err)
}
Expand Down
13 changes: 10 additions & 3 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dbos

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -538,10 +539,16 @@ func TestQueueRecovery(t *testing.T) {
for _, h := range recoveryHandles {
if h.GetWorkflowID() == wfid {
// Root workflow case
result, err := h.GetResult()
resultAny, err := h.GetResult()
require.NoError(t, err, "failed to get result from recovered root workflow handle")
castedResult, ok := result.([]int)
require.True(t, ok, "expected result to be of type []int for root workflow, got %T", result)
// re-encode and decode the result from []interface{} to []int
encodedResult, ok := resultAny.([]any)
require.True(t, ok, "expected result to be a []any")
jsonBytes, err := json.Marshal(encodedResult)
require.NoError(t, err, "failed to marshal result to JSON")
var castedResult []int
err = json.Unmarshal(jsonBytes, &castedResult)
require.NoError(t, err, "failed to decode result to []int")
Comment on lines +542 to +551
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on this path the result was already decoded into any, which lost type info. This is because recovery handles are untyped. Just needed for testing as recovery handles are not public.

expectedResult := []int{0, 1, 2, 3, 4}
assert.Equal(t, expectedResult, castedResult, "expected result %v, got %v", expectedResult, castedResult)
}
Expand Down
12 changes: 1 addition & 11 deletions dbos/recovery.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package dbos

import (
"strings"
)

func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]WorkflowHandle[any], error) {
workflowHandles := make([]WorkflowHandle[any], 0)
// List pending workflows for the executors
Expand All @@ -18,13 +14,6 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
}

for _, workflow := range pendingWorkflows {
if inputStr, ok := workflow.Input.(string); ok {
if strings.Contains(inputStr, "Failed to decode") {
ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name)
continue
}
}

if workflow.QueueName != "" {
cleared, err := ctx.systemDB.clearQueueAssignment(ctx, workflow.ID)
if err != nil {
Expand Down Expand Up @@ -59,6 +48,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
WithWorkflowID(workflow.ID),
}
// Create a workflow context from the executor context
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
handle, err := registeredWorkflow.wrappedFunction(ctx, workflow.Input, opts...)
if err != nil {
return nil, err
Expand Down
Loading
Loading