...
1
18
19 package clusterresolver
20
21 import (
22 "sync"
23
24 "google.golang.org/grpc/internal/grpclog"
25 "google.golang.org/grpc/internal/grpcsync"
26 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
27 )
28
29 type edsDiscoveryMechanism struct {
30 nameToWatch string
31 cancelWatch func()
32 topLevelResolver topLevelResolver
33 stopped *grpcsync.Event
34 logger *grpclog.PrefixLogger
35
36 mu sync.Mutex
37 update *xdsresource.EndpointsUpdate
38 }
39
40 func (er *edsDiscoveryMechanism) lastUpdate() (any, bool) {
41 er.mu.Lock()
42 defer er.mu.Unlock()
43
44 if er.update == nil {
45 return nil, false
46 }
47 return *er.update, true
48 }
49
50 func (er *edsDiscoveryMechanism) resolveNow() {
51 }
52
53
54
55 func (er *edsDiscoveryMechanism) stop() {
56
57
58
59
60
61 er.stopped.Fire()
62 er.cancelWatch()
63 }
64
65
66
67 func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver, logger *grpclog.PrefixLogger) *edsDiscoveryMechanism {
68 ret := &edsDiscoveryMechanism{
69 nameToWatch: nameToWatch,
70 topLevelResolver: topLevelResolver,
71 logger: logger,
72 stopped: grpcsync.NewEvent(),
73 }
74 ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret)
75 return ret
76 }
77
78
79 func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
80 if er.stopped.HasFired() {
81 return
82 }
83
84 er.mu.Lock()
85 er.update = &update.Resource
86 er.mu.Unlock()
87
88 er.topLevelResolver.onUpdate()
89 }
90
91 func (er *edsDiscoveryMechanism) OnError(err error) {
92 if er.stopped.HasFired() {
93 return
94 }
95
96 if er.logger.V(2) {
97 er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
98 }
99
100 er.mu.Lock()
101 if er.update != nil {
102
103
104 er.mu.Unlock()
105 return
106 }
107
108
109
110
111
112
113
114 er.update = &xdsresource.EndpointsUpdate{}
115 er.mu.Unlock()
116
117 er.topLevelResolver.onUpdate()
118 }
119
120 func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
121 if er.stopped.HasFired() {
122 return
123 }
124
125 if er.logger.V(2) {
126 er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
127 }
128
129
130
131
132
133
134
135 er.mu.Lock()
136 er.update = &xdsresource.EndpointsUpdate{}
137 er.mu.Unlock()
138
139 er.topLevelResolver.onUpdate()
140 }
141
View as plain text