From fda4b3208796c1a4faa77942dfc15b0732eea7bc Mon Sep 17 00:00:00 2001 From: ssttevee Date: Tue, 15 Aug 2017 11:25:13 -0700 Subject: [PATCH 1/6] Resolve fields concurrently Using "deferred resolve functions" --- executor.go | 95 ++++++++++++++++++++++++++++++++-------- executor_resolve_test.go | 49 +++++++++++++++++++++ executor_test.go | 42 ++++++++++++++++++ 3 files changed, 167 insertions(+), 19 deletions(-) diff --git a/executor.go b/executor.go index 7ca4532e..62be40d4 100644 --- a/executor.go +++ b/executor.go @@ -9,6 +9,7 @@ import ( "github.com/graphql-go/graphql/gqlerrors" "github.com/graphql-go/graphql/language/ast" + "sync" ) type ExecuteParams struct { @@ -110,6 +111,14 @@ type ExecutionContext struct { VariableValues map[string]interface{} Errors []gqlerrors.FormattedError Context context.Context + + errorsMutex sync.Mutex +} + +func (eCtx *ExecutionContext) addError(err gqlerrors.FormattedError) { + eCtx.errorsMutex.Lock() + defer eCtx.errorsMutex.Unlock() + eCtx.Errors = append(eCtx.Errors, err) } func buildExecutionContext(p BuildExecutionCtxParams) (*ExecutionContext, error) { @@ -278,13 +287,38 @@ func executeFields(p ExecuteFieldsParams) *Result { p.Fields = map[string][]*ast.Field{} } + var numberOfDeferredFunctions int + recoverChan := make(chan interface{}, len(p.Fields)) + + var resultsMutex sync.Mutex finalResults := map[string]interface{}{} for responseName, fieldASTs := range p.Fields { resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs) if state.hasNoFieldDefs { continue } - finalResults[responseName] = resolved + if resolve, ok := resolved.(deferredResolveFunction); ok { + numberOfDeferredFunctions += 1 + go func() { + defer func() { + recoverChan <- recover() + }() + + resultsMutex.Lock() + defer resultsMutex.Unlock() + finalResults[responseName] = resolve() + }() + } else { + resultsMutex.Lock() + finalResults[responseName] = resolved + resultsMutex.Unlock() + } + } + + for i := 0; i < numberOfDeferredFunctions; i++ { + if r := <-recoverChan; r != nil { + panic(r) + } } return &Result{ @@ -503,6 +537,8 @@ type resolveFieldResultState struct { hasNoFieldDefs bool } +type deferredResolveFunction func() interface{} + // Resolves the field on the given source object. In particular, this // figures out the value that the field returns by calling its resolve function, // then calls completeValue to complete promises, serialize scalars, or execute @@ -510,25 +546,27 @@ type resolveFieldResultState struct { func resolveField(eCtx *ExecutionContext, parentType *Object, source interface{}, fieldASTs []*ast.Field) (result interface{}, resultState resolveFieldResultState) { // catch panic from resolveFn var returnType Output + handleRecover := func(r interface{}) { + var err error + if r, ok := r.(string); ok { + err = NewLocatedError( + fmt.Sprintf("%v", r), + FieldASTsToNodeASTs(fieldASTs), + ) + } + if r, ok := r.(error); ok { + err = gqlerrors.FormatError(r) + } + // send panic upstream + if _, ok := returnType.(*NonNull); ok { + panic(gqlerrors.FormatError(err)) + } + eCtx.addError(gqlerrors.FormatError(err)) + } + defer func() (interface{}, resolveFieldResultState) { if r := recover(); r != nil { - - var err error - if r, ok := r.(string); ok { - err = NewLocatedError( - fmt.Sprintf("%v", r), - FieldASTsToNodeASTs(fieldASTs), - ) - } - if r, ok := r.(error); ok { - err = gqlerrors.FormatError(r) - } - // send panic upstream - if _, ok := returnType.(*NonNull); ok { - panic(gqlerrors.FormatError(err)) - } - eCtx.Errors = append(eCtx.Errors, gqlerrors.FormatError(err)) - return result, resultState + handleRecover(r) } return result, resultState }() @@ -580,6 +618,25 @@ func resolveField(eCtx *ExecutionContext, parentType *Object, source interface{} panic(gqlerrors.FormatError(resolveFnError)) } + if deferredResolveFn, ok := result.(func() (interface{}, error)); ok { + return deferredResolveFunction(func() (result interface{}) { + defer func() interface{} { + if r := recover(); r != nil { + handleRecover(r) + } + + return result + }() + + result, resolveFnError = deferredResolveFn() + if resolveFnError != nil { + panic(gqlerrors.FormatError(resolveFnError)) + } + + return completeValueCatchingError(eCtx, returnType, fieldASTs, info, result) + }), resultState + } + completed := completeValueCatchingError(eCtx, returnType, fieldASTs, info, result) return completed, resultState } @@ -593,7 +650,7 @@ func completeValueCatchingError(eCtx *ExecutionContext, returnType Type, fieldAS panic(r) } if err, ok := r.(gqlerrors.FormattedError); ok { - eCtx.Errors = append(eCtx.Errors, err) + eCtx.addError(err) } return completed } diff --git a/executor_resolve_test.go b/executor_resolve_test.go index 7430cd86..59b063fe 100644 --- a/executor_resolve_test.go +++ b/executor_resolve_test.go @@ -114,6 +114,55 @@ func TestExecutesResolveFunction_UsesProvidedResolveFunction(t *testing.T) { } } +func TestExecutesResolveFunction_UsesProvidedResolveFunction_ResolveFunctionIsDeferred(t *testing.T) { + schema := testSchema(t, &graphql.Field{ + Type: graphql.String, + Args: graphql.FieldConfigArgument{ + "aStr": &graphql.ArgumentConfig{Type: graphql.String}, + "aInt": &graphql.ArgumentConfig{Type: graphql.Int}, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return func() (interface{}, error) { + b, err := json.Marshal(p.Args) + return string(b), err + }, nil + }, + }) + + expected := map[string]interface{}{ + "test": "{}", + } + result := graphql.Do(graphql.Params{ + Schema: schema, + RequestString: `{ test }`, + }) + if !reflect.DeepEqual(expected, result.Data) { + t.Fatalf("Unexpected result, Diff: %v", testutil.Diff(expected, result.Data)) + } + + expected = map[string]interface{}{ + "test": `{"aStr":"String!"}`, + } + result = graphql.Do(graphql.Params{ + Schema: schema, + RequestString: `{ test(aStr: "String!") }`, + }) + if !reflect.DeepEqual(expected, result.Data) { + t.Fatalf("Unexpected result, Diff: %v", testutil.Diff(expected, result.Data)) + } + + expected = map[string]interface{}{ + "test": `{"aInt":-123,"aStr":"String!"}`, + } + result = graphql.Do(graphql.Params{ + Schema: schema, + RequestString: `{ test(aInt: -123, aStr: "String!") }`, + }) + if !reflect.DeepEqual(expected, result.Data) { + t.Fatalf("Unexpected result, Diff: %v", testutil.Diff(expected, result.Data)) + } +} + func TestExecutesResolveFunction_UsesProvidedResolveFunction_SourceIsStruct_WithoutJSONTags(t *testing.T) { // For structs without JSON tags, it will map to upper-cased exported field names diff --git a/executor_test.go b/executor_test.go index 2c898f88..a56e6790 100644 --- a/executor_test.go +++ b/executor_test.go @@ -1483,6 +1483,48 @@ func TestQuery_ExecutionDoesNotAddErrorsFromFieldResolveFn(t *testing.T) { } } +func TestQuery_DeferredResolveFn_ExecutionAddsErrorsFromFieldResolveFn(t *testing.T) { + qError := errors.New("queryError") + q := graphql.NewObject(graphql.ObjectConfig{ + Name: "Query", + Fields: graphql.Fields{ + "a": &graphql.Field{ + Type: graphql.String, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return func() (interface{}, error) { + return nil, qError + }, nil + }, + }, + "b": &graphql.Field{ + Type: graphql.String, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return func() (interface{}, error) { + return "ok", nil + }, nil + }, + }, + }, + }) + blogSchema, err := graphql.NewSchema(graphql.SchemaConfig{ + Query: q, + }) + if err != nil { + t.Fatalf("unexpected error, got: %v", err) + } + query := "{ a }" + result := graphql.Do(graphql.Params{ + Schema: blogSchema, + RequestString: query, + }) + if len(result.Errors) == 0 { + t.Fatal("wrong result, expected errors, got no errors") + } + if result.Errors[0].Error() != qError.Error() { + t.Fatalf("wrong result, unexpected error, got: %v, expected: %v", result.Errors[0], qError) + } +} + func TestQuery_InputObjectUsesFieldDefaultValueFn(t *testing.T) { inputType := graphql.NewInputObject(graphql.InputObjectConfig{ Name: "Input", From fccbb6e532678702604f542b72558e45e14a12af Mon Sep 17 00:00:00 2001 From: deoxxa Date: Mon, 11 Dec 2017 23:11:47 +1100 Subject: [PATCH 2/6] fix bug with slow async resolve functions overwriting later loop iterations --- executor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor.go b/executor.go index 62be40d4..1f8981fa 100644 --- a/executor.go +++ b/executor.go @@ -299,7 +299,7 @@ func executeFields(p ExecuteFieldsParams) *Result { } if resolve, ok := resolved.(deferredResolveFunction); ok { numberOfDeferredFunctions += 1 - go func() { + go func(responseName string) { defer func() { recoverChan <- recover() }() @@ -307,7 +307,7 @@ func executeFields(p ExecuteFieldsParams) *Result { resultsMutex.Lock() defer resultsMutex.Unlock() finalResults[responseName] = resolve() - }() + }(responseName) } else { resultsMutex.Lock() finalResults[responseName] = resolved From d18c4b3304468ee0d6fd552c3a50598624371e12 Mon Sep 17 00:00:00 2001 From: deoxxa Date: Mon, 11 Dec 2017 23:13:08 +1100 Subject: [PATCH 3/6] run async resolvers at the same time rather than blocking on resultsMutex --- executor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executor.go b/executor.go index 1f8981fa..2796d8f0 100644 --- a/executor.go +++ b/executor.go @@ -304,9 +304,11 @@ func executeFields(p ExecuteFieldsParams) *Result { recoverChan <- recover() }() + res := resolve() + resultsMutex.Lock() defer resultsMutex.Unlock() - finalResults[responseName] = resolve() + finalResults[responseName] = res }(responseName) } else { resultsMutex.Lock() From d6fa83314d7155bb8d4459552eb4ca3e847c0fdd Mon Sep 17 00:00:00 2001 From: ssttevee Date: Fri, 27 Apr 2018 15:11:17 -0700 Subject: [PATCH 4/6] fix addError receiver type --- executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor.go b/executor.go index a167dbef..4e44667d 100644 --- a/executor.go +++ b/executor.go @@ -116,7 +116,7 @@ type executionContext struct { errorsMutex sync.Mutex } -func (eCtx *ExecutionContext) addError(err gqlerrors.FormattedError) { +func (eCtx *executionContext) addError(err gqlerrors.FormattedError) { eCtx.errorsMutex.Lock() defer eCtx.errorsMutex.Unlock() eCtx.Errors = append(eCtx.Errors, err) From 8d83388570fab78fa3c30874f1be5e8874b33218 Mon Sep 17 00:00:00 2001 From: Steve Lam Date: Tue, 1 May 2018 12:02:17 -0700 Subject: [PATCH 5/6] change when to run deferred resolve function This ensures that the `deferredResolveFunction` is returned by the executor and not a resolve function. i.e. ensure that a resolved value of type `function() interface{}` will not be run concurrently. --- executor.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executor.go b/executor.go index 4e44667d..5717f6d0 100644 --- a/executor.go +++ b/executor.go @@ -298,14 +298,14 @@ func executeFields(p executeFieldsParams) *Result { if state.hasNoFieldDefs { continue } - if resolve, ok := resolved.(deferredResolveFunction); ok { + if state.isDeferred { numberOfDeferredFunctions += 1 go func(responseName string) { defer func() { recoverChan <- recover() }() - res := resolve() + res := resolved.(deferredResolveFunction)() resultsMutex.Lock() defer resultsMutex.Unlock() @@ -538,6 +538,7 @@ func getFieldEntryKey(node *ast.Field) string { // Internal resolveField state type resolveFieldResultState struct { hasNoFieldDefs bool + isDeferred bool } type deferredResolveFunction func() interface{} @@ -622,6 +623,7 @@ func resolveField(eCtx *executionContext, parentType *Object, source interface{} } if deferredResolveFn, ok := result.(func() (interface{}, error)); ok { + resultState.isDeferred = true return deferredResolveFunction(func() (result interface{}) { defer func() interface{} { if r := recover(); r != nil { From 5bbb693819591933c334f5099f1f4327525afd79 Mon Sep 17 00:00:00 2001 From: ssttevee Date: Thu, 14 Jun 2018 20:25:51 -0700 Subject: [PATCH 6/6] allow untyped thunks in definitions --- definition.go | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/definition.go b/definition.go index 2a1779f5..b387e72e 100644 --- a/definition.go +++ b/definition.go @@ -424,11 +424,13 @@ func (gt *Object) Fields() FieldDefinitionMap { } var configureFields Fields - switch gt.typeConfig.Fields.(type) { + switch fields := gt.typeConfig.Fields.(type) { case Fields: - configureFields = gt.typeConfig.Fields.(Fields) + configureFields = fields case FieldsThunk: - configureFields = gt.typeConfig.Fields.(FieldsThunk)() + configureFields = fields() + case func() Fields: + configureFields = fields() } fields, err := defineFieldMap(gt, configureFields) @@ -444,11 +446,13 @@ func (gt *Object) Interfaces() []*Interface { } var configInterfaces []*Interface - switch gt.typeConfig.Interfaces.(type) { + switch ifaces := gt.typeConfig.Interfaces.(type) { case InterfacesThunk: - configInterfaces = gt.typeConfig.Interfaces.(InterfacesThunk)() + configInterfaces = ifaces() + case func() []*Interface: + configInterfaces = ifaces() case []*Interface: - configInterfaces = gt.typeConfig.Interfaces.([]*Interface) + configInterfaces = ifaces case nil: default: gt.err = fmt.Errorf("Unknown Object.Interfaces type: %T", gt.typeConfig.Interfaces) @@ -754,11 +758,13 @@ func (it *Interface) Fields() (fields FieldDefinitionMap) { } var configureFields Fields - switch it.typeConfig.Fields.(type) { + switch fields := it.typeConfig.Fields.(type) { case Fields: - configureFields = it.typeConfig.Fields.(Fields) + configureFields = fields case FieldsThunk: - configureFields = it.typeConfig.Fields.(FieldsThunk)() + configureFields = fields() + case func() Fields: + configureFields = fields() } fields, err := defineFieldMap(it, configureFields) @@ -1140,11 +1146,13 @@ func NewInputObject(config InputObjectConfig) *InputObject { func (gt *InputObject) defineFieldMap() InputObjectFieldMap { var fieldMap InputObjectConfigFieldMap - switch gt.typeConfig.Fields.(type) { + switch fields := gt.typeConfig.Fields.(type) { case InputObjectConfigFieldMap: - fieldMap = gt.typeConfig.Fields.(InputObjectConfigFieldMap) + fieldMap = fields + case func() InputObjectConfigFieldMap: + fieldMap = fields() case InputObjectConfigFieldMapThunk: - fieldMap = gt.typeConfig.Fields.(InputObjectConfigFieldMapThunk)() + fieldMap = fields() } resultFieldMap := InputObjectFieldMap{}