1
18
19 package rls
20
21 import (
22 "bytes"
23 "encoding/json"
24 "fmt"
25 "net/url"
26 "time"
27
28 "google.golang.org/grpc/balancer"
29 "google.golang.org/grpc/balancer/rls/internal/keys"
30 "google.golang.org/grpc/internal"
31 "google.golang.org/grpc/internal/pretty"
32 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
33 "google.golang.org/grpc/resolver"
34 "google.golang.org/grpc/serviceconfig"
35 "google.golang.org/protobuf/encoding/protojson"
36 "google.golang.org/protobuf/types/known/durationpb"
37 )
38
39 const (
40
41
42 maxMaxAge = 5 * time.Minute
43
44 maxCacheSize = 5 * 1024 * 1024 * 8
45
46 defaultLookupServiceTimeout = 10 * time.Second
47
48
49 dummyChildPolicyTarget = "target_name_to_be_filled_in_later"
50 )
51
52
53 type lbConfig struct {
54 serviceconfig.LoadBalancingConfig
55
56 cacheSizeBytes int64
57 kbMap keys.BuilderMap
58 lookupService string
59 lookupServiceTimeout time.Duration
60 maxAge time.Duration
61 staleAge time.Duration
62 defaultTarget string
63
64 childPolicyName string
65 childPolicyConfig map[string]json.RawMessage
66 childPolicyTargetField string
67 controlChannelServiceConfig string
68 }
69
70 func (lbCfg *lbConfig) Equal(other *lbConfig) bool {
71 return lbCfg.kbMap.Equal(other.kbMap) &&
72 lbCfg.lookupService == other.lookupService &&
73 lbCfg.lookupServiceTimeout == other.lookupServiceTimeout &&
74 lbCfg.maxAge == other.maxAge &&
75 lbCfg.staleAge == other.staleAge &&
76 lbCfg.cacheSizeBytes == other.cacheSizeBytes &&
77 lbCfg.defaultTarget == other.defaultTarget &&
78 lbCfg.childPolicyName == other.childPolicyName &&
79 lbCfg.childPolicyTargetField == other.childPolicyTargetField &&
80 lbCfg.controlChannelServiceConfig == other.controlChannelServiceConfig &&
81 childPolicyConfigEqual(lbCfg.childPolicyConfig, other.childPolicyConfig)
82 }
83
84 func childPolicyConfigEqual(a, b map[string]json.RawMessage) bool {
85 if (b == nil) != (a == nil) {
86 return false
87 }
88 if len(b) != len(a) {
89 return false
90 }
91 for k, jsonA := range a {
92 jsonB, ok := b[k]
93 if !ok {
94 return false
95 }
96 if !bytes.Equal(jsonA, jsonB) {
97 return false
98 }
99 }
100 return true
101 }
102
103
104
105 type lbConfigJSON struct {
106 RouteLookupConfig json.RawMessage
107 RouteLookupChannelServiceConfig json.RawMessage
108 ChildPolicy []map[string]json.RawMessage
109 ChildPolicyConfigTargetFieldName string
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 func (rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
146 logger.Infof("Received JSON service config: %v", pretty.ToJSON(c))
147 cfgJSON := &lbConfigJSON{}
148 if err := json.Unmarshal(c, cfgJSON); err != nil {
149 return nil, fmt.Errorf("rls: json unmarshal failed for service config %+v: %v", string(c), err)
150 }
151
152 m := protojson.UnmarshalOptions{DiscardUnknown: true}
153 rlsProto := &rlspb.RouteLookupConfig{}
154 if err := m.Unmarshal(cfgJSON.RouteLookupConfig, rlsProto); err != nil {
155 return nil, fmt.Errorf("rls: bad RouteLookupConfig proto %+v: %v", string(cfgJSON.RouteLookupConfig), err)
156 }
157 lbCfg, err := parseRLSProto(rlsProto)
158 if err != nil {
159 return nil, err
160 }
161
162 if sc := string(cfgJSON.RouteLookupChannelServiceConfig); sc != "" {
163 parsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(sc)
164 if parsed.Err != nil {
165 return nil, fmt.Errorf("rls: bad control channel service config %q: %v", sc, parsed.Err)
166 }
167 lbCfg.controlChannelServiceConfig = sc
168 }
169
170 if cfgJSON.ChildPolicyConfigTargetFieldName == "" {
171 return nil, fmt.Errorf("rls: childPolicyConfigTargetFieldName field is not set in service config %+v", string(c))
172 }
173 name, config, err := parseChildPolicyConfigs(cfgJSON.ChildPolicy, cfgJSON.ChildPolicyConfigTargetFieldName)
174 if err != nil {
175 return nil, err
176 }
177 lbCfg.childPolicyName = name
178 lbCfg.childPolicyConfig = config
179 lbCfg.childPolicyTargetField = cfgJSON.ChildPolicyConfigTargetFieldName
180 return lbCfg, nil
181 }
182
183 func parseRLSProto(rlsProto *rlspb.RouteLookupConfig) (*lbConfig, error) {
184
185 kbMap, err := keys.MakeBuilderMap(rlsProto)
186 if err != nil {
187 return nil, err
188 }
189
190
191 lookupService := rlsProto.GetLookupService()
192 if lookupService == "" {
193 return nil, fmt.Errorf("rls: empty lookup_service in route lookup config %+v", rlsProto)
194 }
195 parsedTarget, err := url.Parse(lookupService)
196 if err != nil {
197
198 parsedTarget, err = url.Parse(resolver.GetDefaultScheme() + ":///" + lookupService)
199 if err != nil {
200 return nil, fmt.Errorf("rls: invalid target URI in lookup_service %s", lookupService)
201 }
202 }
203 if parsedTarget.Scheme == "" {
204 parsedTarget.Scheme = resolver.GetDefaultScheme()
205 }
206 if resolver.Get(parsedTarget.Scheme) == nil {
207 return nil, fmt.Errorf("rls: unregistered scheme in lookup_service %s", lookupService)
208 }
209
210 lookupServiceTimeout, err := convertDuration(rlsProto.GetLookupServiceTimeout())
211 if err != nil {
212 return nil, fmt.Errorf("rls: failed to parse lookup_service_timeout in route lookup config %+v: %v", rlsProto, err)
213 }
214 if lookupServiceTimeout == 0 {
215 lookupServiceTimeout = defaultLookupServiceTimeout
216 }
217
218
219
220
221
222 maxAge, err := convertDuration(rlsProto.GetMaxAge())
223 if err != nil {
224 return nil, fmt.Errorf("rls: failed to parse max_age in route lookup config %+v: %v", rlsProto, err)
225 }
226 staleAge, err := convertDuration(rlsProto.GetStaleAge())
227 if err != nil {
228 return nil, fmt.Errorf("rls: failed to parse staleAge in route lookup config %+v: %v", rlsProto, err)
229 }
230 if staleAge != 0 && maxAge == 0 {
231 return nil, fmt.Errorf("rls: stale_age is set, but max_age is not in route lookup config %+v", rlsProto)
232 }
233 if staleAge >= maxAge {
234 logger.Infof("rls: stale_age %v is not less than max_age %v, ignoring it", staleAge, maxAge)
235 staleAge = 0
236 }
237 if maxAge == 0 || maxAge > maxMaxAge {
238 logger.Infof("rls: max_age in route lookup config is %v, using %v", maxAge, maxMaxAge)
239 maxAge = maxMaxAge
240 }
241
242
243
244 cacheSizeBytes := rlsProto.GetCacheSizeBytes()
245 if cacheSizeBytes <= 0 {
246 return nil, fmt.Errorf("rls: cache_size_bytes must be set to a non-zero value: %+v", rlsProto)
247 }
248 if cacheSizeBytes > maxCacheSize {
249 logger.Info("rls: cache_size_bytes %v is too large, setting it to: %v", cacheSizeBytes, maxCacheSize)
250 cacheSizeBytes = maxCacheSize
251 }
252 return &lbConfig{
253 kbMap: kbMap,
254 lookupService: lookupService,
255 lookupServiceTimeout: lookupServiceTimeout,
256 maxAge: maxAge,
257 staleAge: staleAge,
258 cacheSizeBytes: cacheSizeBytes,
259 defaultTarget: rlsProto.GetDefaultTarget(),
260 }, nil
261 }
262
263
264
265 func parseChildPolicyConfigs(childPolicies []map[string]json.RawMessage, targetFieldName string) (string, map[string]json.RawMessage, error) {
266 for i, config := range childPolicies {
267 if len(config) != 1 {
268 return "", nil, fmt.Errorf("rls: invalid childPolicy: entry %v does not contain exactly 1 policy/config pair: %q", i, config)
269 }
270
271 var name string
272 var rawCfg json.RawMessage
273 for name, rawCfg = range config {
274 }
275 builder := balancer.Get(name)
276 if builder == nil {
277 continue
278 }
279 parser, ok := builder.(balancer.ConfigParser)
280 if !ok {
281 return "", nil, fmt.Errorf("rls: childPolicy %q with config %q does not support config parsing", name, string(rawCfg))
282 }
283
284
285
286
287
288
289 var childConfig map[string]json.RawMessage
290 if err := json.Unmarshal(rawCfg, &childConfig); err != nil {
291 return "", nil, fmt.Errorf("rls: json unmarshal failed for child policy config %q: %v", string(rawCfg), err)
292 }
293 childConfig[targetFieldName], _ = json.Marshal(dummyChildPolicyTarget)
294 jsonCfg, err := json.Marshal(childConfig)
295 if err != nil {
296 return "", nil, fmt.Errorf("rls: json marshal failed for child policy config {%+v}: %v", childConfig, err)
297 }
298 if _, err := parser.ParseConfig(jsonCfg); err != nil {
299 return "", nil, fmt.Errorf("rls: childPolicy config validation failed: %v", err)
300 }
301 return name, childConfig, nil
302 }
303 return "", nil, fmt.Errorf("rls: invalid childPolicy config: no supported policies found in %+v", childPolicies)
304 }
305
306 func convertDuration(d *durationpb.Duration) (time.Duration, error) {
307 if d == nil {
308 return 0, nil
309 }
310 return d.AsDuration(), d.CheckValid()
311 }
312
View as plain text