...
1 package utils
2
3 import (
4 "context"
5 "errors"
6 "time"
7
8 "k8s.io/apimachinery/pkg/fields"
9 "k8s.io/apimachinery/pkg/runtime"
10 kwatch "k8s.io/apimachinery/pkg/watch"
11 "sigs.k8s.io/controller-runtime/pkg/client"
12 )
13
14 var (
15 ErrWatchTimeout = errors.New("timeout getting data")
16 ErrNotAMatch = errors.New("updated object is not a match")
17 )
18
19
20 type MatchFn func(obj runtime.Object) error
21
22 func Watch(cl client.WithWatch, list client.ObjectList, objectKey client.ObjectKey, timeout time.Duration, matches MatchFn) error {
23 ctx, cancel := context.WithTimeout(context.Background(), timeout)
24 defer cancel()
25 watch, err := cl.Watch(ctx, list, &client.ListOptions{
26 FieldSelector: fields.OneTermEqualSelector("metadata.name", objectKey.Name),
27 Namespace: objectKey.Namespace,
28 })
29 if err != nil {
30 return err
31 }
32 defer watch.Stop()
33
34 watchChannel := watch.ResultChan()
35 for {
36 select {
37 case event := <-watchChannel:
38 switch event.Type {
39 case kwatch.Added, kwatch.Modified, kwatch.Deleted:
40 err = matches(event.Object)
41 if err == nil {
42 return nil
43 }
44 if !errors.Is(err, ErrNotAMatch) {
45 return err
46 }
47 }
48 case <-ctx.Done():
49 return ErrWatchTimeout
50 }
51 }
52 }
53
View as plain text