diff --git a/internal/xds/resolver/serviceconfig.go b/internal/xds/resolver/serviceconfig.go index f2ceabe7a8db..3754b14dd3e4 100644 --- a/internal/xds/resolver/serviceconfig.go +++ b/internal/xds/resolver/serviceconfig.go @@ -97,17 +97,14 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) []byte { } type virtualHost struct { - // map from filter name to its config - httpFilterConfigOverride map[string]httpfilter.FilterConfig // retry policy present in virtual host retryConfig *xdsresource.RetryConfig } // routeCluster holds information about a cluster as referenced by a route. type routeCluster struct { - name string - // map from filter name to its config - httpFilterConfigOverride map[string]httpfilter.FilterConfig + name string // Name of the cluster. + interceptor iresolver.ClientInterceptor // HTTP filters to run for RPCs matching this route. } type route struct { @@ -115,10 +112,8 @@ type route struct { actionType xdsresource.RouteActionType // holds route action type clusters wrr.WRR // holds *routeCluster entries maxStreamDuration time.Duration - // map from filter name to its config - httpFilterConfigOverride map[string]httpfilter.FilterConfig - retryConfig *xdsresource.RetryConfig - hashPolicies []*xdsresource.HashPolicy + retryConfig *xdsresource.RetryConfig + hashPolicies []*xdsresource.HashPolicy } func (r route) String() string { @@ -200,11 +195,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP ref := &cs.clusters[cluster.name].refCount atomic.AddInt32(ref, 1) - interceptor, err := cs.newInterceptor(rt, cluster) - if err != nil { - return nil, annotateErrorWithNodeID(err, cs.xdsNodeID) - } - lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name) lbCtx = iringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies)) @@ -220,7 +210,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP cs.sendNewServiceConfig() } }, - Interceptor: interceptor, + Interceptor: cluster.interceptor, } if rt.maxStreamDuration != 0 { @@ -310,35 +300,6 @@ func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies [ return rand.Uint64() } -func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) { - if len(cs.httpFilterConfig) == 0 { - return nil, nil - } - interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig)) - for _, filter := range cs.httpFilterConfig { - override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority - if override == nil { - override = rt.httpFilterConfigOverride[filter.Name] // route is second priority - } - if override == nil { - override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority - } - ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) - if !ok { - // Should not happen if it passed xdsClient validation. - return nil, fmt.Errorf("filter does not support use in client") - } - i, err := ib.BuildClientInterceptor(filter.Config, override) - if err != nil { - return nil, fmt.Errorf("error constructing filter: %v", err) - } - if i != nil { - interceptors = append(interceptors, i) - } - } - return &interceptorList{interceptors: interceptors}, nil -} - // stop decrements refs of all clusters referenced by this config selector. func (cs *configSelector) stop() { // The resolver's old configSelector may be nil. Handle that here. @@ -363,6 +324,38 @@ func (cs *configSelector) stop() { } } +// newInterceptor builds a chain of client interceptors for the given filters +// and override configuration. The cluster override has the highest priority, +// followed by the route override, and finally the virtual host override. +func newInterceptor(filters []xdsresource.HTTPFilter, clusterOverride, routeOverride, virtualHostOverride map[string]httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { + if len(filters) == 0 { + return nil, nil + } + interceptors := make([]iresolver.ClientInterceptor, 0, len(filters)) + for _, filter := range filters { + override := clusterOverride[filter.Name] + if override == nil { + override = routeOverride[filter.Name] + } + if override == nil { + override = virtualHostOverride[filter.Name] + } + ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) + if !ok { + // Should not happen if it passed xdsClient validation. + return nil, fmt.Errorf("filter %q does not support use in client", filter.Name) + } + i, err := ib.BuildClientInterceptor(filter.Config, override) + if err != nil { + return nil, fmt.Errorf("error constructing filter: %v", err) + } + if i != nil { + interceptors = append(interceptors, i) + } + } + return &interceptorList{interceptors: interceptors}, nil +} + type interceptorList struct { interceptors []iresolver.ClientInterceptor } diff --git a/internal/xds/resolver/xds_http_filters_test.go b/internal/xds/resolver/xds_http_filters_test.go new file mode 100644 index 000000000000..9d2c5acf0b43 --- /dev/null +++ b/internal/xds/resolver/xds_http_filters_test.go @@ -0,0 +1,659 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver_test + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/httpfilter" + rinternal "google.golang.org/grpc/internal/xds/resolver/internal" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/wrapperspb" + + v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/xds" // Register all required xDS components +) + +const ( + filterCfgPathFieldName = "path" + filterCfgErrorFieldName = "new_stream_error" + filterCfgMetadataKey = "test-filter-config" +) + +// testFilterCfg is the internal representation of the filter config proto. It +// is returned by filter's config parsing methods. +type testFilterCfg struct { + httpfilter.FilterConfig + path string + newStreamErr string +} + +// filterConfigFromProto parses filter config specified as a v3.TypedStruct into +// a testFilterCfg. +func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { + ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct) + if !ok { + return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{}) + } + + if ts.GetValue() == nil { + return testFilterCfg{}, nil + } + ret := testFilterCfg{} + if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil { + ret.path = v.GetStringValue() + } + if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { + ret.newStreamErr = v.GetStringValue() + } + return ret, nil +} + +type logger interface { + Logf(format string, args ...any) +} + +// testHTTPFilterWithRPCMetadata is a HTTP filter used for testing purposes. +// +// This filter is used to verify that the xDS resolver and filter stack +// correctly propagate filter configuration (both base and override) to RPCs. It +// does this by injecting the config paths from its base and override configs as +// JSON-encoded metadata into outgoing RPCs. The metadata can then be observed +// by the backend, allowing tests to assert that the correct filter +// configuration was applied for each RPC. +type testHTTPFilterWithRPCMetadata struct { + logger logger + typeURL string + newStreamChan *testutils.Channel // If set, filter config is written to this field from NewStream() +} + +func (fb *testHTTPFilterWithRPCMetadata) TypeURLs() []string { return []string{fb.typeURL} } + +func (*testHTTPFilterWithRPCMetadata) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(cfg) +} + +func (*testHTTPFilterWithRPCMetadata) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(override) +} + +func (*testHTTPFilterWithRPCMetadata) IsTerminal() bool { return false } + +// ClientInterceptorBuilder is an optional interface for filters to implement. +// This compile time check ensures the test filter implements it. +var _ httpfilter.ClientInterceptorBuilder = &testHTTPFilterWithRPCMetadata{} + +func (fb *testHTTPFilterWithRPCMetadata) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { + fb.logger.Logf("BuildClientInterceptor called with config: %+v, override: %+v", config, override) + + if config == nil { + return nil, fmt.Errorf("unexpected missing config") + } + + baseCfg := config.(testFilterCfg) + basePath := baseCfg.path + newStreamErr := baseCfg.newStreamErr + + var overridePath string + if override != nil { + overrideCfg := override.(testFilterCfg) + overridePath = overrideCfg.path + if overrideCfg.newStreamErr != "" { + newStreamErr = overrideCfg.newStreamErr + } + } + + return &testFilterInterceptor{ + logger: fb.logger, + cfg: overallFilterConfig{ + BasePath: basePath, + OverridePath: overridePath, + Error: newStreamErr, + }, + newStreamChan: fb.newStreamChan, + }, nil +} + +// overallFilterConfig is a JSON representation of the filter config. +// It is sent as RPC metadata and written to a channel for test verification. +type overallFilterConfig struct { + BasePath string `json:"base_path,omitempty"` + OverridePath string `json:"override_path,omitempty"` + Error string `json:"error,omitempty"` +} + +// testFilterInterceptor is a client interceptor that injects RPC metadata +// corresponding to its filter config. +type testFilterInterceptor struct { + logger logger + cfg overallFilterConfig + newStreamChan *testutils.Channel // If set, filter config is written to this field from NewStream() +} + +func (fi *testFilterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { + // Write the config to the channel, if set. This allows tests to verify that + // the filter was invoked at RPC time. This is useful for tests where the + // RPC is expected to fail, and therefore the RPC metadata cannot be + // observed from the backend. + if fi.newStreamChan != nil { + fi.newStreamChan.Send(fi.cfg) + } + + if fi.cfg.Error != "" { + return nil, status.Error(codes.Unavailable, fi.cfg.Error) + } + + // Marshal the filter config to JSON and inject it as metadata. + bytes, err := json.Marshal(fi.cfg) + if err != nil { + return nil, fmt.Errorf("failed to marshal filter config: %w", err) + } + cfg := string(bytes) + fi.logger.Logf("Injecting filter config metadata: %v", cfg) + + return newStream(metadata.AppendToOutgoingContext(ctx, filterCfgMetadataKey, fmt.Sprintf("%v", cfg)), done) +} + +func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { + return &v3httppb.HttpFilter{ + Name: name, + ConfigType: &v3httppb.HttpFilter_TypedConfig{ + TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: typeURL, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}}, + }, + }, + }), + }, + } +} + +// newStubServer returns a stub server that sends any filter config metadata +// received as part of incoming RPCs to the provided channel. +func newStubServer(metadataCh chan []string) *stubserver.StubServer { + return &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.Error(codes.InvalidArgument, "missing metadata") + } + select { + case metadataCh <- md.Get(filterCfgMetadataKey): + case <-ctx.Done(): + return nil, ctx.Err() + } + return &testpb.Empty{}, nil + }, + UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.Error(codes.InvalidArgument, "missing metadata") + } + select { + case metadataCh <- md.Get(filterCfgMetadataKey): + case <-ctx.Done(): + return nil, ctx.Err() + } + return &testpb.SimpleResponse{Payload: req.GetPayload()}, nil + }, + } +} + +// Tests HTTP filters with the xDS resolver. The test exercises various levels +// of filter config overrides (base, virtual host-level, route-level and +// cluster-level), and verifies that the correct config is applied for each RPC. +func (s) TestXDSResolverHTTPFilters_AllOverrides(t *testing.T) { + // Override default WRR with a deterministic test version. + origNewWRR := rinternal.NewWRR + rinternal.NewWRR = testutils.NewTestWRR + defer func() { rinternal.NewWRR = origNewWRR }() + + // Register a custom httpFilter builder for the test. + testFilterName := t.Name() + fb := &testHTTPFilterWithRPCMetadata{logger: t, typeURL: testFilterName} + httpfilter.Register(fb) + defer httpfilter.UnregisterForTesting(fb.typeURL) + + // Spin up an xDS management server + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer mgmtServer.Stop() + + // Create an xDS resolver with bootstrap configuration pointing to the above + // management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + if internal.NewXDSResolverWithConfigForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") + } + resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a couple of test backends. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + const chBufSize = 4 // Expecting 4 metadata entries (2 RPCs, each with 2 filters). + metadataCh := make(chan []string, chBufSize) + backend1 := stubserver.StartTestService(t, newStubServer(metadataCh)) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, newStubServer(metadataCh)) + defer backend2.Stop() + + // Configure resources on the management server. + // + // The route configuration contains two routes, matching two different RPCs. + // The route for the UnaryCall RPC does not contain any cluster-level or + // route-level per-filter config overrides. A virtual host-level per-filter + // config override exists and it should apply for RPCs matching this route. + // + // The route for the EmptyCall RPC contains a route-level per-filter config + // override that should apply for RPCs routed to cluster "A" since it does + // not have any cluster-level overrides. For RPCs matching cluster "B" + // though, a cluster-level per-filter config override should take + // precedence. + const testServiceName = "service-name" + const routeConfigName = "route-config" + listener := &v3listenerpb.Listener{ + Name: testServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: routeConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{testServiceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/UnaryCall"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + {Name: "B", Weight: wrapperspb.UInt32(1)}, + }, + }, + }, + }, + }, + }, + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "A", + Weight: wrapperspb.UInt32(1), + }, + { + Name: "B", + Weight: wrapperspb.UInt32(1), + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}}, + }, + }, + }), + }, + }, + }, + }, + }, + }, + }, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}}, + }, + }, + }), + }, + }, + }, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}}, + }, + }, + }), + }, + }}, + }, + }, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo", testFilterName, "foo1", ""), + newHTTPFilter(t, "bar", testFilterName, "bar1", ""), + e2e.RouterHTTPFilter, + }, + }), + }, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Clusters: []*v3clusterpb.Cluster{ + e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone), + e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone), + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{ + e2e.DefaultEndpoint("endpoint_A", "localhost", []uint32{testutils.ParsePort(t, backend1.Address)}), + e2e.DefaultEndpoint("endpoint_B", "localhost", []uint32{testutils.ParsePort(t, backend2.Address)}), + }, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a gRPC client using the xDS resolver. + cc, err := grpc.NewClient("xds:///"+testServiceName, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) + if err != nil { + t.Fatalf("Failed to create a gRPC client: %v", err) + } + defer cc.Close() + + // Helper to make an RPC twice and collect filter configs from metadata. We + // make the RPC two times to ensure that we hit both clusters (because of + // the deterministic WRR). The returned filter configs are in the order in + // which the RPCs were made. + collectFilterConfigs := func(rpc func() error) []overallFilterConfig { + t.Helper() + var gotFilterCfgs []overallFilterConfig + for i := 0; i < 2; i++ { + if err := rpc(); err != nil { + t.Fatalf("Unexpected RPC error: %v", err) + } + select { + case cfg := <-metadataCh: + if len(cfg) != 2 { + t.Fatalf("Unexpected number of filter config metadata, got: %d, want: 2", len(cfg)) + } + for _, c := range cfg { + var ofc overallFilterConfig + if err := json.Unmarshal([]byte(c), &ofc); err != nil { + t.Fatalf("Failed to unmarshal filter config JSON %q: %v", c, err) + } + gotFilterCfgs = append(gotFilterCfgs, ofc) + } + case <-ctx.Done(): + t.Fatalf("Timeout waiting for metadata from backend") + } + } + return gotFilterCfgs + } + + // Test base filter config (UnaryCall). Because of the deterministic WRR, we + // know the expected order of clusters for the two RPCs. + wantFilterCfgs := []overallFilterConfig{ + {BasePath: "foo1", OverridePath: "foo2"}, // Routed to cluster A + {BasePath: "bar1", OverridePath: "bar2"}, // Routed to cluster A + {BasePath: "foo1", OverridePath: "foo2"}, // Routed to cluster B + {BasePath: "bar1", OverridePath: "bar2"}, // Routed to cluster B + } + client := testgrpc.NewTestServiceClient(cc) + gotFilterCfgs := collectFilterConfigs(func() error { + _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}) + return err + }) + if diff := cmp.Diff(wantFilterCfgs, gotFilterCfgs); diff != "" { + t.Fatalf("Unexpected filter configs (-want +got):\n%s", diff) + } + + // Test per-route and per-cluster overrides (EmptyCall). + wantFilterCfgs = []overallFilterConfig{ + {BasePath: "foo1", OverridePath: "foo3"}, // Routed to cluster A + {BasePath: "bar1", OverridePath: "bar3"}, // Routed to cluster A + {BasePath: "foo1", OverridePath: "foo4"}, // Routed to cluster B + {BasePath: "bar1", OverridePath: "bar4"}, // Routed to cluster B + } + gotFilterCfgs = collectFilterConfigs(func() error { + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + return err + }) + if diff := cmp.Diff(wantFilterCfgs, gotFilterCfgs); diff != "" { + t.Fatalf("Unexpected filter configs (-want +got):\n%s", diff) + } +} + +// Tests that if a filter returns an error from its NewStream method, the RPC +// fails with that error. It also verifies that subsequent filters in the chain +// are not run. +func (s) TestXDSResolverHTTPFilters_NewStreamError(t *testing.T) { + // Register a custom httpFilter builder for the test and use a channel to + // get notified when the interceptor is invoked. + testFilterName := t.Name() + fb := &testHTTPFilterWithRPCMetadata{ + logger: t, + typeURL: testFilterName, + newStreamChan: testutils.NewChannelWithSize(3), // We have three filters. + } + httpfilter.Register(fb) + defer httpfilter.UnregisterForTesting(fb.typeURL) + + // Spin up an xDS management server + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer mgmtServer.Stop() + + // Create an xDS resolver with bootstrap configuration pointing to the above + // management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + if internal.NewXDSResolverWithConfigForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") + } + resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a test backend, but we expect the filter to fail the RPC before it + // ever gets to the backend. The test is designed to fail if the RPC + // *succeeds* (i.e., if the backend is reached). A large channel buffer is + // used to prevent blocking in the unexpected case where the filter fails to + // reject the RPC. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + metadataCh := make(chan []string, 10) + backend := stubserver.StartTestService(t, newStubServer(metadataCh)) + defer backend.Stop() + + // Configure resources on the management server. + // + // The route configuration contains two routes, matching two different RPCs. + // The route for the UnaryCall RPC does not contain any cluster-level or + // route-level per-filter config overrides. A virtual host-level per-filter + // config override exists and it should apply for RPCs matching this route. + // + // The route for the EmptyCall RPC contains a route-level per-filter config + // override that should apply for RPCs routed to cluster "A" since it does + // not have any cluster-level overrides. For RPCs matching cluster "B" + // though, a cluster-level per-filter config override should take + // precedence. + const testServiceName = "service-name" + const routeConfigName = "route-config" + listener := &v3listenerpb.Listener{ + Name: testServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: routeConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{testServiceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + }, + }, + }, + }, + }, + }, + }, + }}, + }, + }, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo-good", testFilterName, "foo-good", ""), + newHTTPFilter(t, "foo-failing", testFilterName, "foo-failing", "filter interceptor error"), + newHTTPFilter(t, "bar-good", testFilterName, "bar-good", ""), + e2e.RouterHTTPFilter, + }, + }), + }, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint("endpoint_A", "localhost", []uint32{testutils.ParsePort(t, backend.Address)})}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a gRPC client using the xDS resolver. + cc, err := grpc.NewClient("xds:///"+testServiceName, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) + if err != nil { + t.Fatalf("Failed to create a gRPC client: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatalf("EmptyCall() RPC succeeded when expected to fail") + } + if got, want := status.Code(err), codes.Unavailable; got != want { + t.Fatalf("EmptyCall() RPC error code, got: %v, want: %v", got, want) + } + if got, want := err.Error(), "filter interceptor error"; !strings.Contains(got, want) { + t.Fatalf("Unexpected RPC error, got: %v, want: %v", err, "rpc error: code = Unavailable desc = filter interceptor error") + } + + // Verify that the first good filter was invoked + cfg, err := fb.newStreamChan.Receive(ctx) + if err != nil { + t.Fatal("Timeout waiting for first filter to be invoked") + } + ofc := cfg.(overallFilterConfig) + wantCfg := overallFilterConfig{BasePath: "foo-good"} + if diff := cmp.Diff(wantCfg, ofc); diff != "" { + t.Fatalf("Unexpected first filter config (-want +got):\n%s", diff) + } + + // Verify that the failing filter was invoked too. + cfg, err = fb.newStreamChan.Receive(ctx) + if err != nil { + t.Fatal("Timeout waiting for second filter to be invoked") + } + ofc = cfg.(overallFilterConfig) + wantCfg = overallFilterConfig{BasePath: "foo-failing", Error: "filter interceptor error"} + if diff := cmp.Diff(wantCfg, ofc); diff != "" { + t.Fatalf("Unexpected second filter config (-want +got):\n%s", diff) + } + + // Verify that the last good filter was not invoked. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err = fb.newStreamChan.Receive(sCtx); err == nil { + t.Fatal("Last filter was invoked when expected not to be") + } +} diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 8bb960a6567f..6c1bd4ead1e2 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -328,7 +328,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool { // r.activeClusters for previously-unseen clusters. // // Only executed in the context of a serializer callback. -func (r *xdsResolver) newConfigSelector() *configSelector { +func (r *xdsResolver) newConfigSelector() (*configSelector, error) { cs := &configSelector{ channelID: r.channelID, xdsNodeID: r.xdsClient.BootstrapConfig().Node().GetId(), @@ -338,8 +338,7 @@ func (r *xdsResolver) newConfigSelector() *configSelector { }) }, virtualHost: virtualHost{ - httpFilterConfigOverride: r.currentVirtualHost.HTTPFilterConfigOverride, - retryConfig: r.currentVirtualHost.RetryConfig, + retryConfig: r.currentVirtualHost.RetryConfig, }, routes: make([]route, len(r.currentVirtualHost.Routes)), clusters: make(map[string]*clusterInfo), @@ -350,18 +349,20 @@ func (r *xdsResolver) newConfigSelector() *configSelector { clusters := rinternal.NewWRR.(func() wrr.WRR)() if rt.ClusterSpecifierPlugin != "" { clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin - clusters.Add(&routeCluster{ - name: clusterName, - }, 1) + clusters.Add(&routeCluster{name: clusterName}, 1) ci := r.addOrGetActiveClusterInfo(clusterName) ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} cs.clusters[clusterName] = ci } else { for _, wc := range rt.WeightedClusters { clusterName := clusterPrefix + wc.Name + interceptor, err := newInterceptor(r.currentListener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.currentVirtualHost.HTTPFilterConfigOverride) + if err != nil { + return nil, err + } clusters.Add(&routeCluster{ - name: clusterName, - httpFilterConfigOverride: wc.HTTPFilterConfigOverride, + name: clusterName, + interceptor: interceptor, }, int64(wc.Weight)) ci := r.addOrGetActiveClusterInfo(clusterName) ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: wc.Name})} @@ -378,7 +379,6 @@ func (r *xdsResolver) newConfigSelector() *configSelector { cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration } - cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride cs.routes[i].retryConfig = rt.RetryConfig cs.routes[i].hashPolicies = rt.HashPolicies } @@ -390,7 +390,7 @@ func (r *xdsResolver) newConfigSelector() *configSelector { atomic.AddInt32(&ci.refCount, 1) } - return cs + return cs, nil } // pruneActiveClusters deletes entries in r.activeClusters with zero @@ -446,7 +446,12 @@ func (r *xdsResolver) onResolutionComplete() { return } - cs := r.newConfigSelector() + cs, err := r.newConfigSelector() + if err != nil { + // Send an erroring config selector in this case that fails RPCs. + r.onResourceError(fmt.Errorf("xds: failed to create config selector: %v", err)) + return + } if !r.sendNewServiceConfig(cs) { // Channel didn't like the update we provided (unexpected); erase // this config selector and ignore this update, continuing with diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index b28724d37383..eaf2252fd65b 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -41,20 +41,15 @@ import ( "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/balancer/clustermanager" "google.golang.org/grpc/internal/xds/bootstrap" - "google.golang.org/grpc/internal/xds/httpfilter" rinternal "google.golang.org/grpc/internal/xds/resolver/internal" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/wrapperspb" - v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -1158,106 +1153,6 @@ func (s) TestResolverWRR(t *testing.T) { } } -const filterCfgPathFieldName = "path" -const filterCfgErrorFieldName = "new_stream_error" - -type filterCfg struct { - httpfilter.FilterConfig - path string - newStreamErr error -} - -type filterBuilder struct { - paths []string - typeURL string -} - -func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} } - -func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { - ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct) - if !ok { - return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{}) - } - - if ts.GetValue() == nil { - return filterCfg{}, nil - } - ret := filterCfg{} - if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil { - ret.path = v.GetStringValue() - } - if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { - if v.GetStringValue() == "" { - ret.newStreamErr = nil - } else { - ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue()) - } - } - return ret, nil -} - -func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { - return filterConfigFromProto(cfg) -} - -func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { - return filterConfigFromProto(override) -} - -func (*filterBuilder) IsTerminal() bool { return false } - -var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{} - -func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { - if config == nil { - panic("unexpected missing config") - } - - fi := &filterInterceptor{ - parent: fb, - pathCh: make(chan string, 10), - } - - fb.paths = append(fb.paths, "build:"+config.(filterCfg).path) - err := config.(filterCfg).newStreamErr - if override != nil { - fb.paths = append(fb.paths, "override:"+override.(filterCfg).path) - err = override.(filterCfg).newStreamErr - } - - fi.cfgPath = config.(filterCfg).path - fi.err = err - return fi, nil -} - -type filterInterceptor struct { - parent *filterBuilder - pathCh chan string - cfgPath string - err error -} - -func (fi *filterInterceptor) NewStream(ctx context.Context, _ iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { - fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath) - if fi.err != nil { - return nil, fi.err - } - d := func() { - fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath) - done() - } - cs, err := newStream(ctx, d) - if err != nil { - return nil, err - } - return &clientStream{ClientStream: cs}, nil -} - -type clientStream struct { - iresolver.ClientStream -} - func (s) TestConfigSelector_FailureCases(t *testing.T) { const methodName = "1" @@ -1345,298 +1240,6 @@ func (s) TestConfigSelector_FailureCases(t *testing.T) { } } -func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { - return &v3httppb.HttpFilter{ - Name: name, - ConfigType: &v3httppb.HttpFilter_TypedConfig{ - TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: typeURL, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}}, - }, - }, - }), - }, - } -} - -func (s) TestXDSResolverHTTPFilters(t *testing.T) { - const methodName1 = "1" - const methodName2 = "2" - testFilterName := t.Name() - - testCases := []struct { - name string - listener *v3listenerpb.Listener - rpcRes map[string][][]string - wantStreamErr string - }{ - { - name: "NewStream error - ensure earlier interceptor Done is still called", - listener: &v3listenerpb.Listener{ - Name: defaultTestServiceName, - ApiListener: &v3listenerpb.ApiListener{ - ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ - RouteConfig: &v3routepb.RouteConfiguration{ - Name: defaultTestRouteConfigName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{defaultTestServiceName}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, - }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - {Name: "B", Weight: wrapperspb.UInt32(1)}, - }, - }, - }, - }, - }, - }}, - }}, - }}, - HttpFilters: []*v3httppb.HttpFilter{ - newHTTPFilter(t, "foo", testFilterName, "foo1", ""), - newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"), - e2e.RouterHTTPFilter, - }, - }), - }, - }, - rpcRes: map[string][][]string{ - methodName1: { - {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream() - }, - }, - wantStreamErr: "bar newstream err", - }, - { - name: "all overrides", - listener: &v3listenerpb.Listener{ - Name: defaultTestServiceName, - ApiListener: &v3listenerpb.ApiListener{ - ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ - RouteConfig: &v3routepb.RouteConfiguration{ - Name: defaultTestRouteConfigName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{defaultTestServiceName}, - Routes: []*v3routepb.Route{ - { - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, - }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - {Name: "B", Weight: wrapperspb.UInt32(1)}, - }, - }, - }, - }, - }, - }, - { - Match: &v3routepb.RouteMatch{ - PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2}, - }, - Action: &v3routepb.Route_Route{ - Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ - WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - {Name: "A", Weight: wrapperspb.UInt32(1)}, - { - Name: "B", - Weight: wrapperspb.UInt32(1), - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}}, - }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}}, - }, - }, - }), - }, - }, - }, - }, - }, - }, - }, - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, - }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}}, - }, - }, - }), - }, - }, - }, - TypedPerFilterConfig: map[string]*anypb.Any{ - "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}}, - filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, - }, - }, - }), - "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ - TypeUrl: testFilterName, - Value: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}}, - }, - }, - }), - }, - }}, - }}, - HttpFilters: []*v3httppb.HttpFilter{ - newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"), - newHTTPFilter(t, "bar", testFilterName, "bar1", ""), - e2e.RouterHTTPFilter, - }, - }), - }, - }, - rpcRes: map[string][][]string{ - methodName1: { - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - }, - methodName2: { - {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - origNewWRR := rinternal.NewWRR - rinternal.NewWRR = testutils.NewTestWRR - defer func() { rinternal.NewWRR = origNewWRR }() - - // Register a custom httpFilter builder for the test. - fb := &filterBuilder{typeURL: testFilterName} - httpfilter.Register(fb) - - // Spin up an xDS management server. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - nodeID := uuid.New().String() - mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) - - // Build an xDS resolver. - stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) - - cluster := []*v3clusterpb.Cluster{ - e2e.DefaultCluster("A", "endpoint_A", e2e.SecurityLevelNone), - e2e.DefaultCluster("B", "endpoint_B", e2e.SecurityLevelNone), - } - endpoints := []*v3endpointpb.ClusterLoadAssignment{ - e2e.DefaultEndpoint("endpoint_A", defaultTestHostname, defaultTestPort), - e2e.DefaultEndpoint("endpoint_B", defaultTestHostname, defaultTestPort), - } - // Update the management server with a listener resource that - // contains an inline route configuration. - configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil, cluster, endpoints) - - // Ensure that the resolver pushes a state update to the channel. - cs := verifyUpdateFromResolver(ctx, t, stateCh, "") - - for method, wants := range tc.rpcRes { - // Order of wants is non-deterministic. - remainingWant := make([][]string, len(wants)) - copy(remainingWant, wants) - for n := range wants { - res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx}) - if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) - } - - var doneFunc func() - _, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{}, func() {}, func(_ context.Context, done func()) (iresolver.ClientStream, error) { - doneFunc = done - return nil, nil - }) - if tc.wantStreamErr != "" { - if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) { - t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr) - } - if err == nil { - res.OnCommitted() - doneFunc() - } - continue - } - if err != nil { - t.Fatalf("unexpected error from Interceptor.NewStream: %v", err) - - } - res.OnCommitted() - doneFunc() - - gotPaths := fb.paths - fb.paths = []string{} - - // Confirm the desired path is found in remainingWant, and remove it. - pass := false - for i := range remainingWant { - if cmp.Equal(gotPaths, remainingWant[i]) { - remainingWant[i] = remainingWant[len(remainingWant)-1] - remainingWant = remainingWant[:len(remainingWant)-1] - pass = true - break - } - } - if !pass { - t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant) - } - } - } - }) - } -} - func newDurationP(d time.Duration) *time.Duration { return &d }