1
18
19 package resolver
20
21 import (
22 "context"
23 "encoding/json"
24 "fmt"
25 "math/bits"
26 "strings"
27 "sync/atomic"
28 "time"
29
30 xxhash "github.com/cespare/xxhash/v2"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/internal/grpcrand"
33 "google.golang.org/grpc/internal/grpcutil"
34 iresolver "google.golang.org/grpc/internal/resolver"
35 "google.golang.org/grpc/internal/serviceconfig"
36 "google.golang.org/grpc/internal/wrr"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/status"
39 "google.golang.org/grpc/xds/internal/balancer/clustermanager"
40 "google.golang.org/grpc/xds/internal/balancer/ringhash"
41 "google.golang.org/grpc/xds/internal/httpfilter"
42 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
43 )
44
45 const (
46 cdsName = "cds_experimental"
47 xdsClusterManagerName = "xds_cluster_manager_experimental"
48 clusterPrefix = "cluster:"
49 clusterSpecifierPluginPrefix = "cluster_specifier_plugin:"
50 )
51
52 type serviceConfig struct {
53 LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"`
54 }
55
56 type balancerConfig []map[string]any
57
58 func newBalancerConfig(name string, config any) balancerConfig {
59 return []map[string]any{{name: config}}
60 }
61
62 type cdsBalancerConfig struct {
63 Cluster string `json:"cluster"`
64 }
65
66 type xdsChildConfig struct {
67 ChildPolicy balancerConfig `json:"childPolicy"`
68 }
69
70 type xdsClusterManagerConfig struct {
71 Children map[string]xdsChildConfig `json:"children"`
72 }
73
74
75
76
77 func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
78
79 children := make(map[string]xdsChildConfig)
80 for cluster, ci := range activeClusters {
81 children[cluster] = ci.cfg
82 }
83
84 sc := serviceConfig{
85 LoadBalancingConfig: newBalancerConfig(
86 xdsClusterManagerName, xdsClusterManagerConfig{Children: children},
87 ),
88 }
89
90 bs, err := json.Marshal(sc)
91 if err != nil {
92 return nil, fmt.Errorf("failed to marshal json: %v", err)
93 }
94 return bs, nil
95 }
96
97 type virtualHost struct {
98
99 httpFilterConfigOverride map[string]httpfilter.FilterConfig
100
101 retryConfig *xdsresource.RetryConfig
102 }
103
104
105 type routeCluster struct {
106 name string
107
108 httpFilterConfigOverride map[string]httpfilter.FilterConfig
109 }
110
111 type route struct {
112 m *xdsresource.CompositeMatcher
113 actionType xdsresource.RouteActionType
114 clusters wrr.WRR
115 maxStreamDuration time.Duration
116
117 httpFilterConfigOverride map[string]httpfilter.FilterConfig
118 retryConfig *xdsresource.RetryConfig
119 hashPolicies []*xdsresource.HashPolicy
120 }
121
122 func (r route) String() string {
123 return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
124 }
125
126 type configSelector struct {
127 r *xdsResolver
128 virtualHost virtualHost
129 routes []route
130 clusters map[string]*clusterInfo
131 httpFilterConfig []xdsresource.HTTPFilter
132 }
133
134 var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")
135 var errUnsupportedClientRouteAction = status.Errorf(codes.Unavailable, "matched route does not have a supported route action type")
136
137 func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
138 if cs == nil {
139 return nil, status.Errorf(codes.Unavailable, "no valid clusters")
140 }
141 var rt *route
142
143 for _, r := range cs.routes {
144 if r.m.Match(rpcInfo) {
145 rt = &r
146 break
147 }
148 }
149
150 if rt == nil || rt.clusters == nil {
151 return nil, errNoMatchedRouteFound
152 }
153
154 if rt.actionType != xdsresource.RouteActionRoute {
155 return nil, errUnsupportedClientRouteAction
156 }
157
158 cluster, ok := rt.clusters.Next().(*routeCluster)
159 if !ok {
160 return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
161 }
162
163
164
165 ref := &cs.clusters[cluster.name].refCount
166 atomic.AddInt32(ref, 1)
167
168 interceptor, err := cs.newInterceptor(rt, cluster)
169 if err != nil {
170 return nil, err
171 }
172
173 lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
174 lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))
175
176 config := &iresolver.RPCConfig{
177
178 Context: lbCtx,
179 OnCommitted: func() {
180
181
182 if v := atomic.AddInt32(ref, -1); v == 0 {
183
184
185 cs.r.serializer.Schedule(func(context.Context) {
186 cs.r.onClusterRefDownToZero()
187 })
188 }
189 },
190 Interceptor: interceptor,
191 }
192
193 if rt.maxStreamDuration != 0 {
194 config.MethodConfig.Timeout = &rt.maxStreamDuration
195 }
196 if rt.retryConfig != nil {
197 config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
198 } else if cs.virtualHost.retryConfig != nil {
199 config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
200 }
201
202 return config, nil
203 }
204
205 func retryConfigToPolicy(config *xdsresource.RetryConfig) *serviceconfig.RetryPolicy {
206 return &serviceconfig.RetryPolicy{
207 MaxAttempts: int(config.NumRetries) + 1,
208 InitialBackoff: config.RetryBackoff.BaseInterval,
209 MaxBackoff: config.RetryBackoff.MaxInterval,
210 BackoffMultiplier: 2,
211 RetryableStatusCodes: config.RetryOn,
212 }
213 }
214
215 func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsresource.HashPolicy) uint64 {
216 var hash uint64
217 var generatedHash bool
218 var md, emd metadata.MD
219 var mdRead bool
220 for _, policy := range hashPolicies {
221 var policyHash uint64
222 var generatedPolicyHash bool
223 switch policy.HashPolicyType {
224 case xdsresource.HashPolicyTypeHeader:
225 if strings.HasSuffix(policy.HeaderName, "-bin") {
226 continue
227 }
228 if !mdRead {
229 md, _ = metadata.FromOutgoingContext(rpcInfo.Context)
230 emd, _ = grpcutil.ExtraMetadata(rpcInfo.Context)
231 mdRead = true
232 }
233 values := emd.Get(policy.HeaderName)
234 if len(values) == 0 {
235
236
237 values = md.Get(policy.HeaderName)
238 if len(values) == 0 {
239
240 continue
241 }
242 }
243 joinedValues := strings.Join(values, ",")
244 if policy.Regex != nil {
245 joinedValues = policy.Regex.ReplaceAllString(joinedValues, policy.RegexSubstitution)
246 }
247 policyHash = xxhash.Sum64String(joinedValues)
248 generatedHash = true
249 generatedPolicyHash = true
250 case xdsresource.HashPolicyTypeChannelID:
251
252 policyHash = cs.r.channelID
253 generatedHash = true
254 generatedPolicyHash = true
255 }
256
257
258
259
260 if generatedPolicyHash {
261 hash = bits.RotateLeft64(hash, 1)
262 hash = hash ^ policyHash
263 }
264
265
266
267 if policy.Terminal && generatedHash {
268 break
269 }
270 }
271
272 if generatedHash {
273 return hash
274 }
275
276
277 return grpcrand.Uint64()
278 }
279
280 func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
281 if len(cs.httpFilterConfig) == 0 {
282 return nil, nil
283 }
284 interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
285 for _, filter := range cs.httpFilterConfig {
286 override := cluster.httpFilterConfigOverride[filter.Name]
287 if override == nil {
288 override = rt.httpFilterConfigOverride[filter.Name]
289 }
290 if override == nil {
291 override = cs.virtualHost.httpFilterConfigOverride[filter.Name]
292 }
293 ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
294 if !ok {
295
296 return nil, fmt.Errorf("filter does not support use in client")
297 }
298 i, err := ib.BuildClientInterceptor(filter.Config, override)
299 if err != nil {
300 return nil, fmt.Errorf("error constructing filter: %v", err)
301 }
302 if i != nil {
303 interceptors = append(interceptors, i)
304 }
305 }
306 return &interceptorList{interceptors: interceptors}, nil
307 }
308
309
310 func (cs *configSelector) stop() {
311
312 if cs == nil {
313 return
314 }
315
316
317 needUpdate := false
318
319
320 for _, ci := range cs.clusters {
321 if v := atomic.AddInt32(&ci.refCount, -1); v == 0 {
322 needUpdate = true
323 }
324 }
325
326
327
328 if needUpdate {
329 cs.r.serializer.Schedule(func(context.Context) {
330 cs.r.onClusterRefDownToZero()
331 })
332 }
333 }
334
335 type interceptorList struct {
336 interceptors []iresolver.ClientInterceptor
337 }
338
339 func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
340 for i := len(il.interceptors) - 1; i >= 0; i-- {
341 ns := newStream
342 interceptor := il.interceptors[i]
343 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
344 return interceptor.NewStream(ctx, ri, done, ns)
345 }
346 }
347 return newStream(ctx, func() {})
348 }
349
View as plain text