...
1
18
19 package server
20
21 import (
22 "sync"
23
24 igrpclog "google.golang.org/grpc/internal/grpclog"
25 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
26 )
27
28
29
30
31
32 type rdsHandler struct {
33 xdsC XDSClient
34 logger *igrpclog.PrefixLogger
35
36 callback func(string, rdsWatcherUpdate)
37
38
39
40
41
42
43 updates map[string]rdsWatcherUpdate
44
45 mu sync.Mutex
46 cancels map[string]func()
47 }
48
49
50
51
52 func newRDSHandler(cb func(string, rdsWatcherUpdate), xdsC XDSClient, logger *igrpclog.PrefixLogger) *rdsHandler {
53 return &rdsHandler{
54 xdsC: xdsC,
55 logger: logger,
56 callback: cb,
57 updates: make(map[string]rdsWatcherUpdate),
58 cancels: make(map[string]func()),
59 }
60 }
61
62
63
64
65
66
67 func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) {
68 rh.mu.Lock()
69 defer rh.mu.Unlock()
70
71 for routeName := range routeNamesToWatch {
72 if _, ok := rh.cancels[routeName]; !ok {
73
74
75
76 w := &rdsWatcher{parent: rh, routeName: routeName}
77 cancel := xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
78
79
80 rh.cancels[routeName] = func() {
81 w.mu.Lock()
82 w.canceled = true
83 w.mu.Unlock()
84 cancel()
85 }
86 }
87 }
88
89
90
91 for routeName := range rh.cancels {
92 if _, ok := routeNamesToWatch[routeName]; !ok {
93 rh.cancels[routeName]()
94 delete(rh.cancels, routeName)
95 delete(rh.updates, routeName)
96 }
97 }
98 }
99
100
101
102 func (rh *rdsHandler) determineRouteConfigurationReady() bool {
103
104
105 return len(rh.updates) == len(rh.cancels)
106 }
107
108
109 func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdate) {
110 rwu := rh.updates[routeName]
111
112
113
114
115
116 if rwu.data == nil || update.err == nil || xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
117 rwu = update
118 }
119 rh.updates[routeName] = rwu
120 rh.callback(routeName, rwu)
121 }
122
123
124
125
126 func (rh *rdsHandler) close() {
127 rh.mu.Lock()
128 defer rh.mu.Unlock()
129 for _, cancel := range rh.cancels {
130 cancel()
131 }
132 }
133
134 type rdsWatcherUpdate struct {
135 data *xdsresource.RouteConfigUpdate
136 err error
137 }
138
139
140
141 type rdsWatcher struct {
142 parent *rdsHandler
143 logger *igrpclog.PrefixLogger
144 routeName string
145
146 mu sync.Mutex
147 canceled bool
148 }
149
150 func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
151 rw.mu.Lock()
152 if rw.canceled {
153 rw.mu.Unlock()
154 return
155 }
156 rw.mu.Unlock()
157 if rw.logger.V(2) {
158 rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
159 }
160 rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
161 }
162
163 func (rw *rdsWatcher) OnError(err error) {
164 rw.mu.Lock()
165 if rw.canceled {
166 rw.mu.Unlock()
167 return
168 }
169 rw.mu.Unlock()
170 if rw.logger.V(2) {
171 rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
172 }
173 rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
174 }
175
176 func (rw *rdsWatcher) OnResourceDoesNotExist() {
177 rw.mu.Lock()
178 if rw.canceled {
179 rw.mu.Unlock()
180 return
181 }
182 rw.mu.Unlock()
183 if rw.logger.V(2) {
184 rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
185 }
186 err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
187 rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
188 }
189
View as plain text