Skip to content

Another asynchronous resolution PR. But using JS-like promises instead of goroutines. #357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,16 @@ type ResolveInfo struct {
VariableValues map[string]interface{}
}

type ResolveResult struct {
Value interface{}
Error error
}

// When returned from a resolve function, ResolvePromise indicates that the resolution will be done
// asynchronously. When used, an IdleHandler should be specified. This handler must fulfill one or
// more promises each time it is invoked.
type ResolvePromise chan *ResolveResult

type Fields map[string]*Field

type Field struct {
Expand Down
180 changes: 157 additions & 23 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/graphql-go/graphql/gqlerrors"
"github.com/graphql-go/graphql/language/ast"
"github.com/graphql-go/graphql/promise"
)

type ExecuteParams struct {
Expand All @@ -18,6 +19,8 @@ type ExecuteParams struct {
OperationName string
Args map[string]interface{}

IdleHandler func()

// Context may be provided to pass application-specific per-request
// information to resolve functions.
Context context.Context
Expand Down Expand Up @@ -74,6 +77,7 @@ func Execute(p ExecuteParams) (result *Result) {
ExecutionContext: exeContext,
Root: p.Root,
Operation: exeContext.Operation,
IdleHandler: p.IdleHandler,
})
select {
case out <- result:
Expand Down Expand Up @@ -164,6 +168,28 @@ type executeOperationParams struct {
ExecutionContext *executionContext
Root interface{}
Operation ast.Definition
IdleHandler func()
}

func waitForPromise(p executeOperationParams, promise *promise.Promise) interface{} {
var result interface{}
done := false
promise.Then(func(value interface{}) interface{} {
result = value
done = true
return nil
})
promise.Schedule()
for !done {
if p.IdleHandler == nil {
panic(errors.New("Asynchronous resolution attempted with no idle handler defined"))
}
p.IdleHandler()
if !promise.Schedule() {
panic(errors.New("No progress was made after idle handler was invoked"))
}
}
return result
}

func executeOperation(p executeOperationParams) *Result {
Expand All @@ -185,11 +211,22 @@ func executeOperation(p executeOperationParams) *Result {
Fields: fields,
}

var data interface{}

if p.Operation.GetOperation() == ast.OperationTypeMutation {
return executeFieldsSerially(executeFieldsParams)
data = executeFieldsSerially(executeFieldsParams)
} else {
data = executeFields(executeFieldsParams)
}
return executeFields(executeFieldsParams)

result := &Result{}
if isPromise(data) {
result.Data = waitForPromise(p, data.(*promise.Promise))
} else {
result.Data = data
}
result.Errors = p.ExecutionContext.Errors
return result
}

// Extracts the root type of the operation from the schema.
Expand Down Expand Up @@ -247,7 +284,7 @@ type executeFieldsParams struct {
}

// Implements the "Evaluating selection sets" section of the spec for "write" mode.
func executeFieldsSerially(p executeFieldsParams) *Result {
func executeFieldsSerially(p executeFieldsParams) interface{} {
if p.Source == nil {
p.Source = map[string]interface{}{}
}
Expand All @@ -256,42 +293,75 @@ func executeFieldsSerially(p executeFieldsParams) *Result {
}

finalResults := make(map[string]interface{}, len(p.Fields))
chain := promise.Resolve(nil)
for responseName, fieldASTs := range p.Fields {
resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs)
if state.hasNoFieldDefs {
continue
}
finalResults[responseName] = resolved
fieldASTs := fieldASTs
responseName := responseName
chain = chain.Then(func(interface{}) interface{} {
resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs)
if state.hasNoFieldDefs {
return nil
}
if isPromise(resolved) {
return resolved.(*promise.Promise).Then(func(value interface{}) interface{} {
finalResults[responseName] = value
return nil
})
}
finalResults[responseName] = resolved
return nil
})
}

return &Result{
Data: finalResults,
Errors: p.ExecutionContext.Errors,
}
return chain.Then(func(interface{}) interface{} {
return finalResults
})
}

func promiseForObject(object map[string]interface{}) *promise.Promise {
keys := make([]string, 0, len(object))
values := make([]interface{}, 0, len(object))
for key, value := range object {
keys = append(keys, key)
values = append(values, value)
}
return promise.All(values).Then(func(values interface{}) interface{} {
list := values.([]interface{})
result := make(map[string]interface{}, len(list))
for i, value := range list {
result[keys[i]] = value
}
return result
})
}

// Implements the "Evaluating selection sets" section of the spec for "read" mode.
func executeFields(p executeFieldsParams) *Result {
func executeFields(p executeFieldsParams) interface{} {
if p.Source == nil {
p.Source = map[string]interface{}{}
}
if p.Fields == nil {
p.Fields = map[string][]*ast.Field{}
}

containsPromise := false
finalResults := make(map[string]interface{}, len(p.Fields))
for responseName, fieldASTs := range p.Fields {
resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs)
if state.hasNoFieldDefs {
continue
}
if isPromise(resolved) {
containsPromise = true
}
finalResults[responseName] = resolved
}

return &Result{
Data: finalResults,
Errors: p.ExecutionContext.Errors,
if !containsPromise {
return finalResults
}

return promiseForObject(finalResults)
}

type collectFieldsParams struct {
Expand Down Expand Up @@ -504,6 +574,50 @@ type resolveFieldResultState struct {
hasNoFieldDefs bool
}

func promiseForResolvePromise(ch ResolvePromise) *promise.Promise {
return promise.New(func(resolve func(interface{}), reject func(error)) {
select {
case r := <-ch:
if r.Error != nil {
reject(r.Error)
} else {
resolve(r.Value)
}
default:
}
})
}

// If the given value is a ResolvePromise or an iterable containing one or more ResolvePromises, a
// promise is returned for it.
func maybePromise(v interface{}) *promise.Promise {
if isIterable(v) {
v := reflect.ValueOf(v)
containsPromise := false
for i := 0; i < v.Len(); i++ {
if _, ok := v.Index(i).Interface().(ResolvePromise); ok {
containsPromise = true
break
}
}
if containsPromise {
elements := make([]interface{}, v.Len())
for i := 0; i < v.Len(); i++ {
v := v.Index(i).Interface()
if rp, ok := v.(ResolvePromise); ok {
elements[i] = promiseForResolvePromise(rp)
} else {
elements[i] = v
}
}
return promise.All(elements)
}
} else if ch, ok := v.(ResolvePromise); ok {
return promiseForResolvePromise(ch)
}
return nil
}

// Resolves the field on the given source object. In particular, this
// figures out the value that the field returns by calling its resolve function,
// then calls completeValue to complete promises, serialize scalars, or execute
Expand Down Expand Up @@ -570,7 +684,7 @@ func resolveField(eCtx *executionContext, parentType *Object, source interface{}

var resolveFnError error

result, resolveFnError = resolveFn(ResolveParams{
resolveFnValue, resolveFnError := resolveFn(ResolveParams{
Source: source,
Args: args,
Info: info,
Expand All @@ -581,8 +695,23 @@ func resolveField(eCtx *executionContext, parentType *Object, source interface{}
panic(gqlerrors.FormatError(resolveFnError))
}

completed := completeValueCatchingError(eCtx, returnType, fieldASTs, info, result)
return completed, resultState
// If the value is a promise, chain value completion.
if promise := maybePromise(resolveFnValue); promise != nil {
promise = promise.Then(func(value interface{}) interface{} {
return completeValueCatchingError(eCtx, returnType, fieldASTs, info, value)
})
// If the value isn't non-null, catch errors.
if _, ok := returnType.(*NonNull); !ok {
promise = promise.Catch(func(err error) interface{} {
eCtx.Errors = append(eCtx.Errors, gqlerrors.FormatError(err))
return nil
})
}
return promise, resultState
}

result = completeValueCatchingError(eCtx, returnType, fieldASTs, info, resolveFnValue)
return result, resultState
}

func completeValueCatchingError(eCtx *executionContext, returnType Type, fieldASTs []*ast.Field, info ResolveInfo, result interface{}) (completed interface{}) {
Expand Down Expand Up @@ -759,10 +888,7 @@ func completeObjectValue(eCtx *executionContext, returnType *Object, fieldASTs [
Source: result,
Fields: subFieldASTs,
}
results := executeFields(executeFieldsParams)

return results.Data

return executeFields(executeFieldsParams)
}

// completeLeafValue complete a leaf value (Scalar / Enum) by serializing to a valid value, returning nil if serialization is not possible.
Expand Down Expand Up @@ -792,11 +918,19 @@ func completeListValue(eCtx *executionContext, returnType *List, fieldASTs []*as

itemType := returnType.OfType
completedResults := make([]interface{}, 0, resultVal.Len())
containsPromise := false
for i := 0; i < resultVal.Len(); i++ {
val := resultVal.Index(i).Interface()
completedItem := completeValueCatchingError(eCtx, itemType, fieldASTs, info, val)
if isPromise(completedItem) {
containsPromise = true
}
completedResults = append(completedResults, completedItem)
}

if containsPromise {
return promise.All(completedResults)
}
return completedResults
}

Expand Down
Loading