1
18
19 package grpc
20
21 import (
22 "encoding/json"
23 "errors"
24 "fmt"
25 "reflect"
26 "time"
27
28 "google.golang.org/grpc/balancer"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/internal"
31 "google.golang.org/grpc/internal/balancer/gracefulswitch"
32 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
33 "google.golang.org/grpc/serviceconfig"
34 )
35
36 const maxInt = int(^uint(0) >> 1)
37
38
39
40
41
42
43
44 type MethodConfig = internalserviceconfig.MethodConfig
45
46
47
48
49
50
51
52 type ServiceConfig struct {
53 serviceconfig.Config
54
55
56
57 lbConfig serviceconfig.LoadBalancingConfig
58
59
60
61
62
63
64
65 Methods map[string]MethodConfig
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 retryThrottling *retryThrottlingPolicy
83
84
85 healthCheckConfig *healthCheckConfig
86
87
88 rawJSONString string
89 }
90
91
92 type healthCheckConfig struct {
93
94 ServiceName string
95 }
96
97 type jsonRetryPolicy struct {
98 MaxAttempts int
99 InitialBackoff internalserviceconfig.Duration
100 MaxBackoff internalserviceconfig.Duration
101 BackoffMultiplier float64
102 RetryableStatusCodes []codes.Code
103 }
104
105
106
107
108 type retryThrottlingPolicy struct {
109
110
111
112
113 MaxTokens float64
114
115
116
117
118
119 TokenRatio float64
120 }
121
122 type jsonName struct {
123 Service string
124 Method string
125 }
126
127 var (
128 errDuplicatedName = errors.New("duplicated name")
129 errEmptyServiceNonEmptyMethod = errors.New("cannot combine empty 'service' and non-empty 'method'")
130 )
131
132 func (j jsonName) generatePath() (string, error) {
133 if j.Service == "" {
134 if j.Method != "" {
135 return "", errEmptyServiceNonEmptyMethod
136 }
137 return "", nil
138 }
139 res := "/" + j.Service + "/"
140 if j.Method != "" {
141 res += j.Method
142 }
143 return res, nil
144 }
145
146
147 type jsonMC struct {
148 Name *[]jsonName
149 WaitForReady *bool
150 Timeout *internalserviceconfig.Duration
151 MaxRequestMessageBytes *int64
152 MaxResponseMessageBytes *int64
153 RetryPolicy *jsonRetryPolicy
154 }
155
156
157 type jsonSC struct {
158 LoadBalancingPolicy *string
159 LoadBalancingConfig *json.RawMessage
160 MethodConfig *[]jsonMC
161 RetryThrottling *retryThrottlingPolicy
162 HealthCheckConfig *healthCheckConfig
163 }
164
165 func init() {
166 internal.ParseServiceConfig = parseServiceConfig
167 }
168 func parseServiceConfig(js string) *serviceconfig.ParseResult {
169 if len(js) == 0 {
170 return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
171 }
172 var rsc jsonSC
173 err := json.Unmarshal([]byte(js), &rsc)
174 if err != nil {
175 logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
176 return &serviceconfig.ParseResult{Err: err}
177 }
178 sc := ServiceConfig{
179 Methods: make(map[string]MethodConfig),
180 retryThrottling: rsc.RetryThrottling,
181 healthCheckConfig: rsc.HealthCheckConfig,
182 rawJSONString: js,
183 }
184 c := rsc.LoadBalancingConfig
185 if c == nil {
186 name := PickFirstBalancerName
187 if rsc.LoadBalancingPolicy != nil {
188 name = *rsc.LoadBalancingPolicy
189 }
190 if balancer.Get(name) == nil {
191 name = PickFirstBalancerName
192 }
193 cfg := []map[string]any{{name: struct{}{}}}
194 strCfg, err := json.Marshal(cfg)
195 if err != nil {
196 return &serviceconfig.ParseResult{Err: fmt.Errorf("unexpected error marshaling simple LB config: %w", err)}
197 }
198 r := json.RawMessage(strCfg)
199 c = &r
200 }
201 cfg, err := gracefulswitch.ParseConfig(*c)
202 if err != nil {
203 return &serviceconfig.ParseResult{Err: err}
204 }
205 sc.lbConfig = cfg
206
207 if rsc.MethodConfig == nil {
208 return &serviceconfig.ParseResult{Config: &sc}
209 }
210
211 paths := map[string]struct{}{}
212 for _, m := range *rsc.MethodConfig {
213 if m.Name == nil {
214 continue
215 }
216
217 mc := MethodConfig{
218 WaitForReady: m.WaitForReady,
219 Timeout: (*time.Duration)(m.Timeout),
220 }
221 if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
222 logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
223 return &serviceconfig.ParseResult{Err: err}
224 }
225 if m.MaxRequestMessageBytes != nil {
226 if *m.MaxRequestMessageBytes > int64(maxInt) {
227 mc.MaxReqSize = newInt(maxInt)
228 } else {
229 mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes))
230 }
231 }
232 if m.MaxResponseMessageBytes != nil {
233 if *m.MaxResponseMessageBytes > int64(maxInt) {
234 mc.MaxRespSize = newInt(maxInt)
235 } else {
236 mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes))
237 }
238 }
239 for i, n := range *m.Name {
240 path, err := n.generatePath()
241 if err != nil {
242 logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
243 return &serviceconfig.ParseResult{Err: err}
244 }
245
246 if _, ok := paths[path]; ok {
247 err = errDuplicatedName
248 logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
249 return &serviceconfig.ParseResult{Err: err}
250 }
251 paths[path] = struct{}{}
252 sc.Methods[path] = mc
253 }
254 }
255
256 if sc.retryThrottling != nil {
257 if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
258 return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)}
259 }
260 if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
261 return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)}
262 }
263 }
264 return &serviceconfig.ParseResult{Config: &sc}
265 }
266
267 func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPolicy, err error) {
268 if jrp == nil {
269 return nil, nil
270 }
271
272 if jrp.MaxAttempts <= 1 ||
273 jrp.InitialBackoff <= 0 ||
274 jrp.MaxBackoff <= 0 ||
275 jrp.BackoffMultiplier <= 0 ||
276 len(jrp.RetryableStatusCodes) == 0 {
277 logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
278 return nil, nil
279 }
280
281 rp := &internalserviceconfig.RetryPolicy{
282 MaxAttempts: jrp.MaxAttempts,
283 InitialBackoff: time.Duration(jrp.InitialBackoff),
284 MaxBackoff: time.Duration(jrp.MaxBackoff),
285 BackoffMultiplier: jrp.BackoffMultiplier,
286 RetryableStatusCodes: make(map[codes.Code]bool),
287 }
288 if rp.MaxAttempts > 5 {
289
290 rp.MaxAttempts = 5
291 }
292 for _, code := range jrp.RetryableStatusCodes {
293 rp.RetryableStatusCodes[code] = true
294 }
295 return rp, nil
296 }
297
298 func min(a, b *int) *int {
299 if *a < *b {
300 return a
301 }
302 return b
303 }
304
305 func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
306 if mcMax == nil && doptMax == nil {
307 return &defaultVal
308 }
309 if mcMax != nil && doptMax != nil {
310 return min(mcMax, doptMax)
311 }
312 if mcMax != nil {
313 return mcMax
314 }
315 return doptMax
316 }
317
318 func newInt(b int) *int {
319 return &b
320 }
321
322 func init() {
323 internal.EqualServiceConfigForTesting = equalServiceConfig
324 }
325
326
327
328
329
330 func equalServiceConfig(a, b serviceconfig.Config) bool {
331 if a == nil && b == nil {
332 return true
333 }
334 aa, ok := a.(*ServiceConfig)
335 if !ok {
336 return false
337 }
338 bb, ok := b.(*ServiceConfig)
339 if !ok {
340 return false
341 }
342 aaRaw := aa.rawJSONString
343 aa.rawJSONString = ""
344 bbRaw := bb.rawJSONString
345 bb.rawJSONString = ""
346 defer func() {
347 aa.rawJSONString = aaRaw
348 bb.rawJSONString = bbRaw
349 }()
350
351
352
353 return reflect.DeepEqual(aa, bb)
354 }
355
View as plain text