17 package watch
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "reflect"
24 "strings"
25 "testing"
26 "time"
28 corev1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/runtime"
31 "k8s.io/apimachinery/pkg/runtime/schema"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apimachinery/pkg/watch"
34 fakeclient "k8s.io/client-go/kubernetes/fake"
35 "k8s.io/client-go/tools/cache"
36 )
38 type fakePod struct {
39 }
41 func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
42 func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") }
44 func TestUntil(t *testing.T) {
45 fw := watch.NewFake()
46 go func() {
47 var obj *fakePod
48 fw.Add(obj)
49 fw.Modify(obj)
50 }()
51 conditions := []ConditionFunc{
52 func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
53 func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil },
54 }
56 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
57 defer cancel()
59 lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
60 if err != nil {
61 t.Fatalf("expected nil error, got %#v", err)
62 }
63 if lastEvent == nil {
64 t.Fatal("expected an event")
65 }
66 if lastEvent.Type != watch.Modified {
67 t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
68 }
69 if got, isPod := lastEvent.Object.(*fakePod); !isPod {
70 t.Fatalf("expected a pod event, got %#v", got)
71 }
72 }
74 func TestUntilMultipleConditions(t *testing.T) {
75 fw := watch.NewFake()
76 go func() {
77 var obj *fakePod
78 fw.Add(obj)
79 }()
80 conditions := []ConditionFunc{
81 func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
82 func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
83 }
85 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
86 defer cancel()
88 lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
89 if err != nil {
90 t.Fatalf("expected nil error, got %#v", err)
91 }
92 if lastEvent == nil {
93 t.Fatal("expected an event")
94 }
95 if lastEvent.Type != watch.Added {
96 t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
97 }
98 if got, isPod := lastEvent.Object.(*fakePod); !isPod {
99 t.Fatalf("expected a pod event, got %#v", got)
100 }
101 }
103 func TestUntilMultipleConditionsFail(t *testing.T) {
104 fw := watch.NewFake()
105 go func() {
106 var obj *fakePod
107 fw.Add(obj)
108 }()
109 conditions := []ConditionFunc{
110 func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
111 func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
112 func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil },
113 }
115 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
116 defer cancel()
118 lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
119 if err != wait.ErrWaitTimeout {
120 t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
121 }
122 if lastEvent == nil {
123 t.Fatal("expected an event")
124 }
125 if lastEvent.Type != watch.Added {
126 t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
127 }
128 if got, isPod := lastEvent.Object.(*fakePod); !isPod {
129 t.Fatalf("expected a pod event, got %#v", got)
130 }
131 }
133 func TestUntilTimeout(t *testing.T) {
134 fw := watch.NewFake()
135 go func() {
136 var obj *fakePod
137 fw.Add(obj)
138 fw.Modify(obj)
139 }()
140 conditions := []ConditionFunc{
141 func(event watch.Event) (bool, error) {
142 return event.Type == watch.Added, nil
143 },
144 func(event watch.Event) (bool, error) {
145 return event.Type == watch.Modified, nil
146 },
147 }
149 lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...)
150 if err != nil {
151 t.Fatalf("expected nil error, got %#v", err)
152 }
153 if lastEvent == nil {
154 t.Fatal("expected an event")
155 }
156 if lastEvent.Type != watch.Modified {
157 t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
158 }
159 if got, isPod := lastEvent.Object.(*fakePod); !isPod {
160 t.Fatalf("expected a pod event, got %#v", got)
161 }
162 }
164 func TestUntilErrorCondition(t *testing.T) {
165 fw := watch.NewFake()
166 go func() {
167 var obj *fakePod
168 fw.Add(obj)
169 }()
170 expected := "something bad"
171 conditions := []ConditionFunc{
172 func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
173 func(event watch.Event) (bool, error) { return false, errors.New(expected) },
174 }
176 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
177 defer cancel()
179 _, err := UntilWithoutRetry(ctx, fw, conditions...)
180 if err == nil {
181 t.Fatal("expected an error")
182 }
183 if !strings.Contains(err.Error(), expected) {
184 t.Fatalf("expected %q in error string, got %q", expected, err.Error())
185 }
186 }
188 func TestUntilWithSync(t *testing.T) {
190 tt := []struct {
191 name string
192 lw *cache.ListWatch
193 preconditionFunc PreconditionFunc
194 conditionFunc ConditionFunc
195 expectedErr error
196 expectedEvent *watch.Event
197 }{
198 {
199 name: "doesn't wait for sync with no precondition",
200 lw: &cache.ListWatch{
201 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
202 select {}
203 },
204 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
205 select {}
206 },
207 },
208 preconditionFunc: nil,
209 conditionFunc: func(e watch.Event) (bool, error) {
210 return true, nil
211 },
212 expectedErr: wait.ErrWaitTimeout,
213 expectedEvent: nil,
214 },
215 {
216 name: "waits indefinitely with precondition if it can't sync",
217 lw: &cache.ListWatch{
218 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
219 select {}
220 },
221 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
222 select {}
223 },
224 },
225 preconditionFunc: func(store cache.Store) (bool, error) {
226 return true, nil
227 },
228 conditionFunc: func(e watch.Event) (bool, error) {
229 return true, nil
230 },
231 expectedErr: fmt.Errorf("UntilWithSync: unable to sync caches: %w", context.DeadlineExceeded),
232 expectedEvent: nil,
233 },
234 {
235 name: "precondition can stop the loop",
236 lw: func() *cache.ListWatch {
237 fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
239 return &cache.ListWatch{
240 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
241 return fakeclient.CoreV1().Secrets("").List(context.TODO(), options)
242 },
243 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
244 return fakeclient.CoreV1().Secrets("").Watch(context.TODO(), options)
245 },
246 }
247 }(),
248 preconditionFunc: func(store cache.Store) (bool, error) {
249 _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"})
250 if err != nil {
251 return true, err
252 }
253 if exists {
254 return true, nil
255 }
256 return false, nil
257 },
258 conditionFunc: func(e watch.Event) (bool, error) {
259 return true, errors.New("should never reach this")
260 },
261 expectedErr: nil,
262 expectedEvent: nil,
263 },
264 {
265 name: "precondition lets it proceed to regular condition",
266 lw: func() *cache.ListWatch {
267 fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
269 return &cache.ListWatch{
270 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
271 return fakeclient.CoreV1().Secrets("").List(context.TODO(), options)
272 },
273 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
274 return fakeclient.CoreV1().Secrets("").Watch(context.TODO(), options)
275 },
276 }
277 }(),
278 preconditionFunc: func(store cache.Store) (bool, error) {
279 return false, nil
280 },
281 conditionFunc: func(e watch.Event) (bool, error) {
282 if e.Type == watch.Added {
283 return true, nil
284 }
285 panic("no other events are expected")
286 },
287 expectedErr: nil,
288 expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
289 },
290 }
292 for _, tc := range tt {
293 t.Run(tc.name, func(t *testing.T) {
296 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
297 defer cancel()
299 event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc)
301 if !reflect.DeepEqual(err, tc.expectedErr) {
302 t.Errorf("expected error %#v, got %#v", tc.expectedErr, err)
303 }
305 if !reflect.DeepEqual(event, tc.expectedEvent) {
306 t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event)
307 }
308 })
309 }
310 }
View as plain text