Skip to content

Commit 436704f

Browse files
authored
Json serializer (#183)
- use JSON for encoding/decoding - lift encode/decode out of the system database - distinguish nil values (stored as NULL) in Postgres - return json strings on the ListWorkflows GetWorkflowSteps paths - Add comprehensive testing suite On the get wf steps / list wf path, the contract is that if the value is nil, you know it was storing a nil value. Else, the value is a JSON string of your workflow's or step's `T`.
1 parent 31315f4 commit 436704f

File tree

13 files changed

+1341
-816
lines changed

13 files changed

+1341
-816
lines changed

dbos/admin_server.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -152,20 +152,23 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
152152
result["StartedAt"] = nil
153153
}
154154

155-
if ws.Input != nil && ws.Input != "" {
156-
bytes, err := json.Marshal(ws.Input)
157-
if err != nil {
158-
return nil, fmt.Errorf("failed to marshal input: %w", err)
155+
if ws.Input != nil {
156+
// If there is a value, it should be a JSON string
157+
jsonInput, ok := ws.Input.(string)
158+
if ok {
159+
result["Input"] = jsonInput
160+
} else {
161+
result["Input"] = ""
159162
}
160-
result["Input"] = string(bytes)
161163
}
162164

163-
if ws.Output != nil && ws.Output != "" {
164-
bytes, err := json.Marshal(ws.Output)
165-
if err != nil {
166-
return nil, fmt.Errorf("failed to marshal output: %w", err)
165+
if ws.Output != nil {
166+
jsonOutput, ok := ws.Output.(string)
167+
if ok {
168+
result["Output"] = jsonOutput
169+
} else {
170+
result["Output"] = ""
167171
}
168-
result["Output"] = string(bytes)
169172
}
170173

171174
if ws.Error != nil {
@@ -439,15 +442,16 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
439442
"child_workflow_id": step.ChildWorkflowID,
440443
}
441444

442-
// Marshal Output as JSON string if present
443-
if step.Output != nil && step.Output != "" {
444-
bytes, err := json.Marshal(step.Output)
445-
if err != nil {
446-
ctx.logger.Error("Failed to marshal step output", "error", err)
447-
http.Error(w, fmt.Sprintf("Failed to format step output: %v", err), http.StatusInternalServerError)
448-
return
445+
if step.Output != nil {
446+
// If there is a value, it should be a JSON string
447+
jsonOutput, ok := step.Output.(string)
448+
if ok {
449+
formattedStep["output"] = jsonOutput
450+
} else {
451+
formattedStep["output"] = ""
449452
}
450-
formattedStep["output"] = string(bytes)
453+
} else {
454+
formattedStep["output"] = ""
451455
}
452456

453457
// Marshal Error as JSON string if present

dbos/admin_server_test.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,12 @@ func TestAdminServer(t *testing.T) {
347347
// Empty string workflow: both input and output are empty strings
348348
// According to the logic, empty strings should not have Input/Output fields
349349
input, hasInput := wf["Input"]
350-
require.Equal(t, "", input)
350+
require.Equal(t, "\"\"", input)
351351
require.True(t, hasInput, "Empty string workflow should have Input field")
352352

353353
output, hasOutput := wf["Output"]
354354
require.True(t, hasOutput, "Empty string workflow should have Output field")
355-
require.Equal(t, "", output)
355+
require.Equal(t, "\"\"", output)
356356

357357
} else if wfID == structHandle.GetWorkflowID() {
358358
// Struct workflow: input and output should be marshaled as JSON strings
@@ -874,22 +874,9 @@ func TestAdminServer(t *testing.T) {
874874
assert.Contains(t, unmarshaledError, "deliberate error for testing", "Error message should be preserved")
875875

876876
case "emptyStep":
877-
// Empty string might be returned as nil or as an empty JSON string
877+
// Empty string is returned as an empty JSON string
878878
output := step["output"]
879-
if output == nil {
880-
// Empty string was not included in response (which is fine)
881-
t.Logf("Empty step output was nil (not included)")
882-
} else {
883-
// If it was included, it should be marshaled as JSON string `""`
884-
outputStr, ok := output.(string)
885-
require.True(t, ok, "If present, empty step output should be a JSON string")
886-
887-
var unmarshaledOutput string
888-
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
889-
require.NoError(t, err, "Failed to unmarshal empty step output")
890-
assert.Equal(t, "", unmarshaledOutput, "Empty step output should be empty string")
891-
}
892-
879+
require.Equal(t, "\"\"", output, "Empty step output should be an empty string")
893880
assert.Nil(t, step["error"], "Empty step should have no error")
894881
}
895882
}

dbos/client.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
147147
if params.priority > uint(math.MaxInt) {
148148
return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt)
149149
}
150+
150151
status := WorkflowStatus{
151152
Name: params.workflowName,
152153
ApplicationVersion: params.applicationVersion,
@@ -155,7 +156,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
155156
CreatedAt: time.Now(),
156157
Deadline: deadline,
157158
Timeout: params.workflowTimeout,
158-
Input: params.workflowInput,
159+
Input: input,
159160
QueueName: queueName,
160161
DeduplicationID: params.deduplicationID,
161162
Priority: int(params.priority),
@@ -240,20 +241,15 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op
240241
return nil, errors.New("client cannot be nil")
241242
}
242243

243-
// Register the input and outputs for gob encoding
244-
var logger *slog.Logger
245-
if cl, ok := c.(*client); ok {
246-
if ctx, ok := cl.dbosCtx.(*dbosContext); ok {
247-
logger = ctx.logger
248-
}
244+
// Serialize input
245+
serializer := newJSONSerializer[P]()
246+
encodedInput, err := serializer.Encode(input)
247+
if err != nil {
248+
return nil, fmt.Errorf("failed to serialize workflow input: %w", err)
249249
}
250-
var typedInput P
251-
safeGobRegister(typedInput, logger)
252-
var typedOutput R
253-
safeGobRegister(typedOutput, logger)
254250

255251
// Call the interface method with the same signature
256-
handle, err := c.Enqueue(queueName, workflowName, input, opts...)
252+
handle, err := c.Enqueue(queueName, workflowName, encodedInput, opts...)
257253
if err != nil {
258254
return nil, err
259255
}

dbos/client_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,11 @@ func TestCancelResume(t *testing.T) {
369369

370370
// Wait for workflow completion
371371
proceedSignal.Set() // Allow the workflow to proceed to step two
372-
result, err := resumeHandle.GetResult()
372+
resultAny, err := resumeHandle.GetResult()
373373
require.NoError(t, err, "failed to get result from resumed workflow")
374374

375-
// Verify the result
376-
assert.Equal(t, input, result, "expected result to match input")
375+
// Will be a float64 from json decode
376+
require.Equal(t, input, int(resultAny.(float64)), "expected result to match input")
377377

378378
// Verify both steps completed
379379
assert.Equal(t, 2, stepsCompleted, "expected steps completed to be 2")
@@ -391,10 +391,11 @@ func TestCancelResume(t *testing.T) {
391391
resumeAgainHandle, err := client.ResumeWorkflow(workflowID)
392392
require.NoError(t, err, "failed to resume workflow again")
393393

394-
resultAgain, err := resumeAgainHandle.GetResult()
394+
resultAgainAny, err := resumeAgainHandle.GetResult()
395395
require.NoError(t, err, "failed to get result from second resume")
396396

397-
assert.Equal(t, input, resultAgain, "expected second resume result to match input")
397+
// Will be a float64 from json decode
398+
require.Equal(t, input, int(resultAgainAny.(float64)), "expected result to match input")
398399

399400
// Verify steps didn't run again
400401
assert.Equal(t, 2, stepsCompleted, "expected steps completed to remain 2 after second resume")

dbos/dbos.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func (c *dbosContext) Value(key any) any {
185185
return c.ctx.Value(key)
186186
}
187187

188+
188189
// WithValue returns a copy of the DBOS context with the given key-value pair.
189190
// This is similar to context.WithValue but maintains DBOS context capabilities.
190191
// No-op if the provided context is not a concrete dbos.dbosContext.
@@ -354,14 +355,6 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
354355
initExecutor.logger = config.Logger
355356
initExecutor.logger.Info("Initializing DBOS context", "app_name", config.AppName, "dbos_version", getDBOSVersion())
356357

357-
// Register types we serialize with gob
358-
var t time.Time
359-
safeGobRegister(t, initExecutor.logger)
360-
var ws []WorkflowStatus
361-
safeGobRegister(ws, initExecutor.logger)
362-
var si []StepInfo
363-
safeGobRegister(si, initExecutor.logger)
364-
365358
// Initialize global variables from processed config (already handles env vars and defaults)
366359
initExecutor.applicationVersion = config.ApplicationVersion
367360
initExecutor.executorID = config.ExecutorID

dbos/queue.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package dbos
22

33
import (
4-
"bytes"
54
"context"
6-
"encoding/base64"
7-
"encoding/gob"
85
"log/slog"
96
"math"
107
"math/rand"
@@ -227,23 +224,8 @@ func (qr *queueRunner) run(ctx *dbosContext) {
227224
continue
228225
}
229226

230-
// Deserialize input
231-
var input any
232-
if len(workflow.input) > 0 {
233-
inputBytes, err := base64.StdEncoding.DecodeString(workflow.input)
234-
if err != nil {
235-
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
236-
continue
237-
}
238-
buf := bytes.NewBuffer(inputBytes)
239-
dec := gob.NewDecoder(buf)
240-
if err := dec.Decode(&input); err != nil {
241-
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
242-
continue
243-
}
244-
}
245-
246-
_, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id))
227+
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
228+
_, err = registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
247229
if err != nil {
248230
qr.logger.Error("Error running queued workflow", "error", err)
249231
}

dbos/queues_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbos
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"os"
@@ -538,10 +539,16 @@ func TestQueueRecovery(t *testing.T) {
538539
for _, h := range recoveryHandles {
539540
if h.GetWorkflowID() == wfid {
540541
// Root workflow case
541-
result, err := h.GetResult()
542+
resultAny, err := h.GetResult()
542543
require.NoError(t, err, "failed to get result from recovered root workflow handle")
543-
castedResult, ok := result.([]int)
544-
require.True(t, ok, "expected result to be of type []int for root workflow, got %T", result)
544+
// re-encode and decode the result from []interface{} to []int
545+
encodedResult, ok := resultAny.([]any)
546+
require.True(t, ok, "expected result to be a []any")
547+
jsonBytes, err := json.Marshal(encodedResult)
548+
require.NoError(t, err, "failed to marshal result to JSON")
549+
var castedResult []int
550+
err = json.Unmarshal(jsonBytes, &castedResult)
551+
require.NoError(t, err, "failed to decode result to []int")
545552
expectedResult := []int{0, 1, 2, 3, 4}
546553
assert.Equal(t, expectedResult, castedResult, "expected result %v, got %v", expectedResult, castedResult)
547554
}

dbos/recovery.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package dbos
22

3-
import (
4-
"strings"
5-
)
6-
73
func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]WorkflowHandle[any], error) {
84
workflowHandles := make([]WorkflowHandle[any], 0)
95
// List pending workflows for the executors
@@ -18,13 +14,6 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
1814
}
1915

2016
for _, workflow := range pendingWorkflows {
21-
if inputStr, ok := workflow.Input.(string); ok {
22-
if strings.Contains(inputStr, "Failed to decode") {
23-
ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name)
24-
continue
25-
}
26-
}
27-
2817
if workflow.QueueName != "" {
2918
cleared, err := ctx.systemDB.clearQueueAssignment(ctx, workflow.ID)
3019
if err != nil {
@@ -59,6 +48,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
5948
WithWorkflowID(workflow.ID),
6049
}
6150
// Create a workflow context from the executor context
51+
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
6252
handle, err := registeredWorkflow.wrappedFunction(ctx, workflow.Input, opts...)
6353
if err != nil {
6454
return nil, err

0 commit comments

Comments
 (0)