...
1
18
19 package clusterresolver
20
21 import (
22 "context"
23 "sync"
24
25 "google.golang.org/grpc/internal/grpclog"
26 "google.golang.org/grpc/internal/grpcsync"
27 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
28 )
29
30
31
32 type resourceUpdate struct {
33 priorities []priorityConfig
34 err error
35 }
36
37
38
39
40
41 type topLevelResolver interface {
42 onUpdate()
43 }
44
45
46
47
48 type endpointsResolver interface {
49
50
51
52
53
54
55
56
57 lastUpdate() (any, bool)
58
59
60 resolveNow()
61
62
63
64 stop()
65 }
66
67
68
69
70
71 type discoveryMechanismKey struct {
72 typ DiscoveryMechanismType
73 name string
74 }
75
76
77
78
79
80 type discoveryMechanismAndResolver struct {
81 dm DiscoveryMechanism
82 r endpointsResolver
83
84 childNameGen *nameGenerator
85 }
86
87 type resourceResolver struct {
88 parent *clusterResolverBalancer
89 logger *grpclog.PrefixLogger
90 updateChannel chan *resourceUpdate
91 serializer *grpcsync.CallbackSerializer
92 serializerCancel context.CancelFunc
93
94
95 mu sync.Mutex
96 mechanisms []DiscoveryMechanism
97 children []discoveryMechanismAndResolver
98
99
100
101
102
103
104 childrenMap map[discoveryMechanismKey]discoveryMechanismAndResolver
105
106
107
108
109 childNameGeneratorSeqID uint64
110 }
111
112 func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver {
113 rr := &resourceResolver{
114 parent: parent,
115 logger: logger,
116 updateChannel: make(chan *resourceUpdate, 1),
117 childrenMap: make(map[discoveryMechanismKey]discoveryMechanismAndResolver),
118 }
119 ctx, cancel := context.WithCancel(context.Background())
120 rr.serializer = grpcsync.NewCallbackSerializer(ctx)
121 rr.serializerCancel = cancel
122 return rr
123 }
124
125 func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool {
126 if len(a) != len(b) {
127 return false
128 }
129 for i, aa := range a {
130 bb := b[i]
131 if !aa.Equal(bb) {
132 return false
133 }
134 }
135 return true
136 }
137
138 func discoveryMechanismToKey(dm DiscoveryMechanism) discoveryMechanismKey {
139 switch dm.Type {
140 case DiscoveryMechanismTypeEDS:
141 nameToWatch := dm.EDSServiceName
142 if nameToWatch == "" {
143 nameToWatch = dm.Cluster
144 }
145 return discoveryMechanismKey{typ: dm.Type, name: nameToWatch}
146 case DiscoveryMechanismTypeLogicalDNS:
147 return discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname}
148 default:
149 return discoveryMechanismKey{}
150 }
151 }
152
153 func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
154 rr.mu.Lock()
155 defer rr.mu.Unlock()
156 if equalDiscoveryMechanisms(rr.mechanisms, mechanisms) {
157 return
158 }
159 rr.mechanisms = mechanisms
160 rr.children = make([]discoveryMechanismAndResolver, len(mechanisms))
161 newDMs := make(map[discoveryMechanismKey]bool)
162
163
164 for i, dm := range mechanisms {
165 dmKey := discoveryMechanismToKey(dm)
166 newDMs[dmKey] = true
167 dmAndResolver, ok := rr.childrenMap[dmKey]
168 if ok {
169
170
171
172
173
174
175
176 dmAndResolver.dm = dm
177 rr.children[i] = dmAndResolver
178 continue
179 }
180
181
182 var resolver endpointsResolver
183 switch dm.Type {
184 case DiscoveryMechanismTypeEDS:
185 resolver = newEDSResolver(dmKey.name, rr.parent.xdsClient, rr, rr.logger)
186 case DiscoveryMechanismTypeLogicalDNS:
187 resolver = newDNSResolver(dmKey.name, rr, rr.logger)
188 }
189 dmAndResolver = discoveryMechanismAndResolver{
190 dm: dm,
191 r: resolver,
192 childNameGen: newNameGenerator(rr.childNameGeneratorSeqID),
193 }
194 rr.childrenMap[dmKey] = dmAndResolver
195 rr.children[i] = dmAndResolver
196 rr.childNameGeneratorSeqID++
197 }
198
199
200 for dm, r := range rr.childrenMap {
201 if !newDMs[dm] {
202 delete(rr.childrenMap, dm)
203 go r.r.stop()
204 }
205 }
206
207
208 rr.generateLocked()
209 }
210
211
212
213 func (rr *resourceResolver) resolveNow() {
214 rr.mu.Lock()
215 defer rr.mu.Unlock()
216 for _, r := range rr.childrenMap {
217 r.r.resolveNow()
218 }
219 }
220
221 func (rr *resourceResolver) stop(closing bool) {
222 rr.mu.Lock()
223
224
225
226
227
228
229
230 cm := rr.childrenMap
231 rr.childrenMap = make(map[discoveryMechanismKey]discoveryMechanismAndResolver)
232 rr.mechanisms = nil
233 rr.children = nil
234
235 rr.mu.Unlock()
236
237 for _, r := range cm {
238 r.r.stop()
239 }
240
241 if closing {
242 rr.serializerCancel()
243 <-rr.serializer.Done()
244 }
245
246
247
248
249
250
251
252
253
254 select {
255 case <-rr.updateChannel:
256 default:
257 }
258 rr.updateChannel <- &resourceUpdate{}
259 }
260
261
262
263
264
265
266 func (rr *resourceResolver) generateLocked() {
267 var ret []priorityConfig
268 for _, rDM := range rr.children {
269 u, ok := rDM.r.lastUpdate()
270 if !ok {
271
272
273 return
274 }
275 switch uu := u.(type) {
276 case xdsresource.EndpointsUpdate:
277 ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu, childNameGen: rDM.childNameGen})
278 case []string:
279 ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu, childNameGen: rDM.childNameGen})
280 }
281 }
282 select {
283 case <-rr.updateChannel:
284 default:
285 }
286 rr.updateChannel <- &resourceUpdate{priorities: ret}
287 }
288
289 func (rr *resourceResolver) onUpdate() {
290 rr.serializer.Schedule(func(context.Context) {
291 rr.mu.Lock()
292 rr.generateLocked()
293 rr.mu.Unlock()
294 })
295 }
296
View as plain text