1
18
19 package resolver_test
20
21 import (
22 "context"
23 "encoding/json"
24 "fmt"
25 "testing"
26
27 "github.com/google/uuid"
28 "google.golang.org/grpc/balancer"
29 iresolver "google.golang.org/grpc/internal/resolver"
30 "google.golang.org/grpc/internal/testutils"
31 "google.golang.org/grpc/internal/testutils/xds/e2e"
32 "google.golang.org/grpc/resolver"
33 "google.golang.org/grpc/serviceconfig"
34 "google.golang.org/grpc/xds/internal/balancer/clustermanager"
35 "google.golang.org/grpc/xds/internal/clusterspecifier"
36 "google.golang.org/protobuf/proto"
37 "google.golang.org/protobuf/types/known/anypb"
38 "google.golang.org/protobuf/types/known/wrapperspb"
39
40 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
41 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
42 )
43
44 func init() {
45 balancer.Register(cspBalancerBuilder{})
46 clusterspecifier.Register(testClusterSpecifierPlugin{})
47 }
48
49
50
51 type cspBalancerBuilder struct{}
52
53 func (cspBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
54 return nil
55 }
56
57 func (cspBalancerBuilder) Name() string {
58 return "csp_experimental"
59 }
60
61 type cspBalancerConfig struct {
62 serviceconfig.LoadBalancingConfig
63 ArbitraryField string `json:"arbitrary_field"`
64 }
65
66 func (cspBalancerBuilder) ParseConfig(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
67 cfg := &cspBalancerConfig{}
68 if err := json.Unmarshal(lbCfg, cfg); err != nil {
69 return nil, err
70 }
71 return cfg, nil
72
73 }
74
75
76
77 type testClusterSpecifierPlugin struct {
78 }
79
80 func (testClusterSpecifierPlugin) TypeURLs() []string {
81
82
83
84
85
86
87 return []string{"type.googleapis.com/google.protobuf.StringValue"}
88 }
89
90 func (testClusterSpecifierPlugin) ParseClusterSpecifierConfig(cfg proto.Message) (clusterspecifier.BalancerConfig, error) {
91 if cfg == nil {
92 return nil, fmt.Errorf("testClusterSpecifierPlugin: nil configuration message provided")
93 }
94 anyp, ok := cfg.(*anypb.Any)
95 if !ok {
96 return nil, fmt.Errorf("testClusterSpecifierPlugin: error parsing config %v: got type %T, want *anypb.Any", cfg, cfg)
97 }
98 lbCfg := new(wrapperspb.StringValue)
99 if err := anypb.UnmarshalTo(anyp, lbCfg, proto.UnmarshalOptions{}); err != nil {
100 return nil, fmt.Errorf("testClusterSpecifierPlugin: error parsing config %v: %v", cfg, err)
101 }
102 return []map[string]any{{"csp_experimental": cspBalancerConfig{ArbitraryField: lbCfg.GetValue()}}}, nil
103 }
104
105
106
107
108
109
110
111
112
113 func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
114
115 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
116 defer cancel()
117 nodeID := uuid.New().String()
118 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
119
120
121 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
122 routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
123 RouteConfigName: defaultTestRouteConfigName,
124 ListenerName: defaultTestServiceName,
125 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
126 ClusterSpecifierPluginName: "cspA",
127 ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
128 })}
129 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
130
131 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
132
133
134 wantSC := `
135 {
136 "loadBalancingConfig": [
137 {
138 "xds_cluster_manager_experimental": {
139 "children": {
140 "cluster_specifier_plugin:cspA": {
141 "childPolicy": [
142 {
143 "csp_experimental": {
144 "arbitrary_field": "anything"
145 }
146 }
147 ]
148 }
149 }
150 }
151 }
152 ]
153 }`
154 cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
155 res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
156 if err != nil {
157 t.Fatalf("cs.SelectConfig(): %v", err)
158 }
159
160 gotCluster := clustermanager.GetPickedClusterForTesting(res.Context)
161 wantCluster := "cluster_specifier_plugin:cspA"
162 if gotCluster != wantCluster {
163 t.Fatalf("config selector returned cluster: %v, want: %v", gotCluster, wantCluster)
164 }
165
166
167 routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
168 RouteConfigName: defaultTestRouteConfigName,
169 ListenerName: defaultTestServiceName,
170 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
171 ClusterSpecifierPluginName: "cspA",
172 ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
173 })}
174 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
175
176
177 wantSC = `
178 {
179 "loadBalancingConfig": [
180 {
181 "xds_cluster_manager_experimental": {
182 "children": {
183 "cluster_specifier_plugin:cspA": {
184 "childPolicy": [
185 {
186 "csp_experimental": {
187 "arbitrary_field": "changed"
188 }
189 }
190 ]
191 }
192 }
193 }
194 }
195 ]
196 }`
197 verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
198 }
199
200
201
202
203 func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
204
205 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
206 defer cancel()
207 nodeID := uuid.New().String()
208 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
209
210
211 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
212 routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
213 RouteConfigName: defaultTestRouteConfigName,
214 ListenerName: defaultTestServiceName,
215 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
216 ClusterSpecifierPluginName: "cspA",
217 ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
218 })}
219 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
220
221 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
222
223
224 wantSC := `
225 {
226 "loadBalancingConfig": [
227 {
228 "xds_cluster_manager_experimental": {
229 "children": {
230 "cluster_specifier_plugin:cspA": {
231 "childPolicy": [
232 {
233 "csp_experimental": {
234 "arbitrary_field": "anythingA"
235 }
236 }
237 ]
238 }
239 }
240 }
241 }
242 ]
243 }`
244 cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
245
246 resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
247 if err != nil {
248 t.Fatalf("cs.SelectConfig(): %v", err)
249 }
250
251 gotCluster := clustermanager.GetPickedClusterForTesting(resOld.Context)
252 wantCluster := "cluster_specifier_plugin:cspA"
253 if gotCluster != wantCluster {
254 t.Fatalf("config selector returned cluster: %v, want: %v", gotCluster, wantCluster)
255 }
256
257
258
259
260
261 routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
262 RouteConfigName: defaultTestRouteConfigName,
263 ListenerName: defaultTestServiceName,
264 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
265 ClusterSpecifierPluginName: "cspB",
266 ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
267 })}
268 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
269
270
271 wantSC = `
272 {
273 "loadBalancingConfig": [
274 {
275 "xds_cluster_manager_experimental": {
276 "children": {
277 "cluster_specifier_plugin:cspA": {
278 "childPolicy": [
279 {
280 "csp_experimental": {
281 "arbitrary_field": "anythingA"
282 }
283 }
284 ]
285 },
286 "cluster_specifier_plugin:cspB": {
287 "childPolicy": [
288 {
289 "csp_experimental": {
290 "arbitrary_field": "anythingB"
291 }
292 }
293 ]
294 }
295 }
296 }
297 }
298 ]
299 }`
300 cs = verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
301
302
303 resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
304 if err != nil {
305 t.Fatalf("cs.SelectConfig(): %v", err)
306 }
307
308 gotCluster = clustermanager.GetPickedClusterForTesting(resNew.Context)
309 wantCluster = "cluster_specifier_plugin:cspB"
310 if gotCluster != wantCluster {
311 t.Fatalf("config selector returned cluster: %v, want: %v", gotCluster, wantCluster)
312 }
313
314
315
316 resOld.OnCommitted()
317
318 wantSC = `
319 {
320 "loadBalancingConfig": [
321 {
322 "xds_cluster_manager_experimental": {
323 "children": {
324 "cluster_specifier_plugin:cspB": {
325 "childPolicy": [
326 {
327 "csp_experimental": {
328 "arbitrary_field": "anythingB"
329 }
330 }
331 ]
332 }
333 }
334 }
335 }
336 ]
337 }`
338 verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
339 }
340
View as plain text