1
18
19 package clusterresolver
20
21 import (
22 "encoding/json"
23 "fmt"
24 "sort"
25
26 "google.golang.org/grpc/balancer/weightedroundrobin"
27 "google.golang.org/grpc/internal/hierarchy"
28 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
29 "google.golang.org/grpc/resolver"
30 "google.golang.org/grpc/xds/internal"
31 "google.golang.org/grpc/xds/internal/balancer/clusterimpl"
32 "google.golang.org/grpc/xds/internal/balancer/outlierdetection"
33 "google.golang.org/grpc/xds/internal/balancer/priority"
34 "google.golang.org/grpc/xds/internal/balancer/wrrlocality"
35 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
36 )
37
38 const million = 1000000
39
40
41
42
43
44
45
46
47 type priorityConfig struct {
48 mechanism DiscoveryMechanism
49
50 edsResp xdsresource.EndpointsUpdate
51
52 addresses []string
53
54
55 childNameGen *nameGenerator
56 }
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
75 pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy)
76 if err != nil {
77 return nil, nil, fmt.Errorf("failed to build priority config: %v", err)
78 }
79 ret, err := json.Marshal(pc)
80 if err != nil {
81 return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
82 }
83 return ret, addrs, nil
84 }
85
86 func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) {
87 var (
88 retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
89 retAddrs []resolver.Address
90 )
91 for _, p := range priorities {
92 switch p.mechanism.Type {
93 case DiscoveryMechanismTypeEDS:
94 names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy)
95 if err != nil {
96 return nil, nil, err
97 }
98 retConfig.Priorities = append(retConfig.Priorities, names...)
99 retAddrs = append(retAddrs, addrs...)
100 odCfgs := convertClusterImplMapToOutlierDetection(configs, p.mechanism.outlierDetection)
101 for n, c := range odCfgs {
102 retConfig.Children[n] = &priority.Child{
103 Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: c},
104
105 IgnoreReresolutionRequests: true,
106 }
107 }
108 continue
109 case DiscoveryMechanismTypeLogicalDNS:
110 name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism)
111 retConfig.Priorities = append(retConfig.Priorities, name)
112 retAddrs = append(retAddrs, addrs...)
113 odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection)
114 retConfig.Children[name] = &priority.Child{
115 Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg},
116
117
118 IgnoreReresolutionRequests: false,
119 }
120 continue
121 }
122 }
123 return retConfig, retAddrs, nil
124 }
125
126 func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig {
127 odCfgs := make(map[string]*outlierdetection.LBConfig, len(ciCfgs))
128 for n, c := range ciCfgs {
129 odCfgs[n] = makeClusterImplOutlierDetectionChild(c, odCfg)
130 }
131 return odCfgs
132 }
133
134 func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) *outlierdetection.LBConfig {
135 odCfgRet := odCfg
136 odCfgRet.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: ciCfg}
137 return &odCfgRet
138 }
139
140 func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) {
141
142 const childPolicy = "pick_first"
143 retAddrs := make([]resolver.Address, 0, len(addrStrs))
144 pName := fmt.Sprintf("priority-%v", g.prefix)
145 for _, addrStr := range addrStrs {
146 retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
147 }
148 return pName, &clusterimpl.LBConfig{
149 Cluster: mechanism.Cluster,
150 TelemetryLabels: mechanism.TelemetryLabels,
151 ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
152 }, retAddrs
153 }
154
155
156
157
158
159
160
161
162
163
164 func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
165 drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
166 for _, d := range edsResp.Drops {
167 drops = append(drops, clusterimpl.DropConfig{
168 Category: d.Category,
169 RequestsPerMillion: d.Numerator * million / d.Denominator,
170 })
171 }
172
173
174
175
176
177
178
179
180 priorities := [][]xdsresource.Locality{{}}
181 if len(edsResp.Localities) != 0 {
182 priorities = groupLocalitiesByPriority(edsResp.Localities)
183 }
184 retNames := g.generate(priorities)
185 retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
186 var retAddrs []resolver.Address
187 for i, pName := range retNames {
188 priorityLocalities := priorities[i]
189 cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
190 if err != nil {
191 return nil, nil, nil, err
192 }
193 retConfigs[pName] = cfg
194 retAddrs = append(retAddrs, addrs...)
195 }
196 return retNames, retConfigs, retAddrs, nil
197 }
198
199
200
201
202
203
204
205
206 func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresource.Locality {
207 var priorityIntSlice []int
208 priorities := make(map[int][]xdsresource.Locality)
209 for _, locality := range localities {
210 priority := int(locality.Priority)
211 priorities[priority] = append(priorities[priority], locality)
212 priorityIntSlice = append(priorityIntSlice, priority)
213 }
214
215
216
217 sort.Ints(priorityIntSlice)
218 priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
219 ret := make([][]xdsresource.Locality, 0, len(priorityIntSliceDeduped))
220 for _, p := range priorityIntSliceDeduped {
221 ret = append(ret, priorities[p])
222 }
223 return ret
224 }
225
226 func dedupSortedIntSlice(a []int) []int {
227 if len(a) == 0 {
228 return a
229 }
230 i, j := 0, 1
231 for ; j < len(a); j++ {
232 if a[i] == a[j] {
233 continue
234 }
235 i++
236 if i != j {
237 a[i] = a[j]
238 }
239 }
240 return a[:i+1]
241 }
242
243
244
245
246
247 func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
248 var addrs []resolver.Address
249 for _, locality := range localities {
250 var lw uint32 = 1
251 if locality.Weight != 0 {
252 lw = locality.Weight
253 }
254 localityStr, err := locality.ID.ToString()
255 if err != nil {
256 localityStr = fmt.Sprintf("%+v", locality.ID)
257 }
258 for _, endpoint := range locality.Endpoints {
259
260
261
262 if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
263 continue
264 }
265 addr := resolver.Address{Addr: endpoint.Address}
266 addr = hierarchy.Set(addr, []string{priorityName, localityStr})
267 addr = internal.SetLocalityID(addr, locality.ID)
268
269
270
271
272
273 addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw})
274 var ew uint32 = 1
275 if endpoint.Weight != 0 {
276 ew = endpoint.Weight
277 }
278 addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
279 addrs = append(addrs, addr)
280 }
281 }
282 return &clusterimpl.LBConfig{
283 Cluster: mechanism.Cluster,
284 EDSServiceName: mechanism.EDSServiceName,
285 LoadReportingServer: mechanism.LoadReportingServer,
286 MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
287 TelemetryLabels: mechanism.TelemetryLabels,
288 DropCategories: drops,
289 ChildPolicy: xdsLBPolicy,
290 }, addrs, nil
291 }
292
View as plain text