1
16
17 package watch
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "net/http"
25 "time"
26
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/util/dump"
30 "k8s.io/apimachinery/pkg/util/net"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/apimachinery/pkg/watch"
33 "k8s.io/client-go/tools/cache"
34 "k8s.io/klog/v2"
35 )
36
37
38
39 type resourceVersionGetter interface {
40 GetResourceVersion() string
41 }
42
43
44
45
46
47
48
49 type RetryWatcher struct {
50 lastResourceVersion string
51 watcherClient cache.Watcher
52 resultChan chan watch.Event
53 stopChan chan struct{}
54 doneChan chan struct{}
55 minRestartDelay time.Duration
56 }
57
58
59
60
61 func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
62 return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second)
63 }
64
65 func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
66 switch initialResourceVersion {
67 case "", "0":
68
69
70 return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
71 default:
72 break
73 }
74
75 rw := &RetryWatcher{
76 lastResourceVersion: initialResourceVersion,
77 watcherClient: watcherClient,
78 stopChan: make(chan struct{}),
79 doneChan: make(chan struct{}),
80 resultChan: make(chan watch.Event, 0),
81 minRestartDelay: minRestartDelay,
82 }
83
84 go rw.receive()
85 return rw, nil
86 }
87
88 func (rw *RetryWatcher) send(event watch.Event) bool {
89
90
91 select {
92 case rw.resultChan <- event:
93 return true
94 case <-rw.stopChan:
95 return false
96 }
97 }
98
99
100
101 func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
102 watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
103 ResourceVersion: rw.lastResourceVersion,
104 AllowWatchBookmarks: true,
105 })
106
107
108
109 switch err {
110 case nil:
111 break
112
113 case io.EOF:
114
115 return false, 0
116
117 case io.ErrUnexpectedEOF:
118 klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err)
119 return false, 0
120
121 default:
122 msg := "Watch failed"
123 if net.IsProbableEOF(err) || net.IsTimeout(err) {
124 klog.V(5).InfoS(msg, "err", err)
125
126 return false, 0
127 }
128
129 klog.ErrorS(err, msg)
130
131 return false, 0
132 }
133
134 if watcher == nil {
135 klog.ErrorS(nil, "Watch returned nil watcher")
136
137 return false, 0
138 }
139
140 ch := watcher.ResultChan()
141 defer watcher.Stop()
142
143 for {
144 select {
145 case <-rw.stopChan:
146 klog.V(4).InfoS("Stopping RetryWatcher.")
147 return true, 0
148 case event, ok := <-ch:
149 if !ok {
150 klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion)
151 return false, 0
152 }
153
154
155 switch event.Type {
156 case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
157 metaObject, ok := event.Object.(resourceVersionGetter)
158 if !ok {
159 _ = rw.send(watch.Event{
160 Type: watch.Error,
161 Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
162 })
163
164 return true, 0
165 }
166
167 resourceVersion := metaObject.GetResourceVersion()
168 if resourceVersion == "" {
169 _ = rw.send(watch.Event{
170 Type: watch.Error,
171 Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
172 })
173
174 return true, 0
175 }
176
177
178 if event.Type != watch.Bookmark {
179 ok = rw.send(event)
180 if !ok {
181 return true, 0
182 }
183 }
184 rw.lastResourceVersion = resourceVersion
185
186 continue
187
188 case watch.Error:
189
190 errObject := apierrors.FromObject(event.Object)
191 statusErr, ok := errObject.(*apierrors.StatusError)
192 if !ok {
193 klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object)))
194
195 return false, 0
196 }
197
198 status := statusErr.ErrStatus
199
200 statusDelay := time.Duration(0)
201 if status.Details != nil {
202 statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second
203 }
204
205 switch status.Code {
206 case http.StatusGone:
207
208 _ = rw.send(event)
209 return true, 0
210
211 case http.StatusGatewayTimeout, http.StatusInternalServerError:
212
213 return false, statusDelay
214
215 default:
216
217
218
219
220
221
222 klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object)))
223
224
225 return false, statusDelay
226 }
227
228 default:
229 klog.Errorf("Failed to recognize Event type %q", event.Type)
230 _ = rw.send(watch.Event{
231 Type: watch.Error,
232 Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
233 })
234
235 return true, 0
236 }
237 }
238 }
239 }
240
241
242 func (rw *RetryWatcher) receive() {
243 defer close(rw.doneChan)
244 defer close(rw.resultChan)
245
246 klog.V(4).Info("Starting RetryWatcher.")
247 defer klog.V(4).Info("Stopping RetryWatcher.")
248
249 ctx, cancel := context.WithCancel(context.Background())
250 defer cancel()
251 go func() {
252 select {
253 case <-rw.stopChan:
254 cancel()
255 return
256 case <-ctx.Done():
257 return
258 }
259 }()
260
261
262
263 wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
264 done, retryAfter := rw.doReceive()
265 if done {
266 cancel()
267 return
268 }
269
270 timer := time.NewTimer(retryAfter)
271 select {
272 case <-ctx.Done():
273 timer.Stop()
274 return
275 case <-timer.C:
276 }
277
278 klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
279 }, rw.minRestartDelay)
280 }
281
282
283 func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
284 return rw.resultChan
285 }
286
287
288 func (rw *RetryWatcher) Stop() {
289 close(rw.stopChan)
290 }
291
292
293 func (rw *RetryWatcher) Done() <-chan struct{} {
294 return rw.doneChan
295 }
296
View as plain text