-
Notifications
You must be signed in to change notification settings - Fork 0
Composing Functions into Workflows
Dispatch functions can be composed into workflows of arbitrary complexity.
Function calls can be composed using regular control flow (loops and branches), enabling patterns such as fan-out / fan-in.
Dispatch functions can call other Dispatch functions and wait for their results to be available.
To wait for the results of one function call, use the Await
method that's available on each Dispatch function.
To wait for the results of many function calls, use the Gather
method that's available on each Dispatch function.
It's also possible to gather results from calls to different functions, or await the results of the first result from one or more function calls. See the dispatchcoro.Await
function for more information.
Here's an example workflow, showing both Await
and Gather
in action:
findCustomersWithPendingCharge := dispatch.Func("findCustomersWithPendingCharge ",
func (ctx context.Context, ts time.Time) (customerIDs []string, err error) {
// ... connect to database and find customers with pending charges ...
})
chargeCreditCard := dispatch.Func("chargeCreditCard",
func (ctx context.Context, customerID string) (receiptID string, err error) {
// ... charge credit card via Stripe API ...
})
workflow := dispatch.Func("workflow", func (ctx context.Context, ts time.Time) (receiptIDs []string, err error) {
customerIDs, err := findCustomersWithPendingCharge.Await(ctx, ts)
if err != nil {
return nil, fmt.Errorf("failed to list customers with pending charge: %w", err)
}
return chargeCreditCard.Gather(ctx, customerIDs)
})
In this example, the workflow finds users that are due to be charged, then charges their credit cards independently.
Dispatch handles the hard parts:
- executing a dynamic number of function calls (e.g. the number of customers to charge is unknown and may be large)
- fan-out of independent function calls and fan-in of their results
- retrying function calls that fail temporarily
- preventing function calls that succeeded or failed permanently from being retried again
- staying within rate limits of third-party services (e.g. Stripe API)
- adapt workflow and function execution to changes downstream (e.g. databases or APIs that temporarily go down or have degraded performance)
In the example above, the workflow
function is a coroutine that can be suspended and resumed. When calling Await
or Gather
, the coroutine is suspended. When the results of the function call(s) are available, the coroutine is resumed from where it was suspended.
By default, Dispatch coroutine functions are volatile. Their state is held in memory while they're suspended, and progress may be lost if the Dispatch endpoint crashes or restarts.
Dispatch coroutine functions can also be compiled into distributed coroutines. In this mode, the await points act as durability checkpoints. The state of the coroutine is sent to the Dispatch cloud service, which is then able to resume execution of the coroutine on any Dispatch endpoint that's hosting the same function. If a Dispatch endpoint crashes or restarts, progress is not lost; Dispatch will resume the coroutine from its last checkpoint, potentially in a different process or on a different host.
To compile a Dispatch endpoint and its coroutines into distributed coroutines, use the coroc
compiler from https://github.com/dispatchrun/coroutine.
Install coroc
:
go install github.com/dispatchrun/coroutine/compiler/cmd/coroc@latest
Compile coroutines hosted by a Dispatch endpoint:
coroc ./path/to/go/package
Run or build the Dispatch endpoint as normal, but include the -tags durable
build tag:
# to run
go run -tags durable ./path/to/go/package
# to build
go build -tags durable -o my-endpoint ./path/to/go/package
Please report any compile time issues with coroc
, or runtime serialization issues with the dispatchrun/coroutine library, to the GitHub issues page.
See how to test functions and workflows.