...
1 package internal
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "reflect"
8 "time"
9
10 "k8s.io/apimachinery/pkg/api/meta"
11 "k8s.io/apimachinery/pkg/runtime"
12 "k8s.io/apimachinery/pkg/util/wait"
13 "k8s.io/client-go/util/workqueue"
14
15 "sigs.k8s.io/controller-runtime/pkg/cache"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17 "sigs.k8s.io/controller-runtime/pkg/handler"
18 "sigs.k8s.io/controller-runtime/pkg/predicate"
19 )
20
21
22 type Kind[T client.Object] struct {
23
24 Type T
25
26
27 Cache cache.Cache
28
29 Handler handler.TypedEventHandler[T]
30
31 Predicates []predicate.TypedPredicate[T]
32
33
34
35 startedErr chan error
36 startCancel func()
37 }
38
39
40
41 func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
42 if isNil(ks.Type) {
43 return fmt.Errorf("must create Kind with a non-nil object")
44 }
45 if isNil(ks.Cache) {
46 return fmt.Errorf("must create Kind with a non-nil cache")
47 }
48 if isNil(ks.Handler) {
49 return errors.New("must create Kind with non-nil handler")
50 }
51
52
53
54 ctx, ks.startCancel = context.WithCancel(ctx)
55 ks.startedErr = make(chan error)
56 go func() {
57 var (
58 i cache.Informer
59 lastErr error
60 )
61
62
63
64 if err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {
65
66 i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
67 if lastErr != nil {
68 kindMatchErr := &meta.NoKindMatchError{}
69 switch {
70 case errors.As(lastErr, &kindMatchErr):
71 log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
72 "kind", kindMatchErr.GroupKind)
73 case runtime.IsNotRegisteredError(lastErr):
74 log.Error(lastErr, "kind must be registered to the Scheme")
75 default:
76 log.Error(lastErr, "failed to get informer from cache")
77 }
78 return false, nil
79 }
80 return true, nil
81 }); err != nil {
82 if lastErr != nil {
83 ks.startedErr <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
84 return
85 }
86 ks.startedErr <- err
87 return
88 }
89
90 _, err := i.AddEventHandler(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs())
91 if err != nil {
92 ks.startedErr <- err
93 return
94 }
95 if !ks.Cache.WaitForCacheSync(ctx) {
96
97 ks.startedErr <- errors.New("cache did not sync")
98 }
99 close(ks.startedErr)
100 }()
101
102 return nil
103 }
104
105 func (ks *Kind[T]) String() string {
106 if !isNil(ks.Type) {
107 return fmt.Sprintf("kind source: %T", ks.Type)
108 }
109 return "kind source: unknown type"
110 }
111
112
113
114 func (ks *Kind[T]) WaitForSync(ctx context.Context) error {
115 select {
116 case err := <-ks.startedErr:
117 return err
118 case <-ctx.Done():
119 ks.startCancel()
120 if errors.Is(ctx.Err(), context.Canceled) {
121 return nil
122 }
123 return fmt.Errorf("timed out waiting for cache to be synced for Kind %T", ks.Type)
124 }
125 }
126
127 func isNil(arg any) bool {
128 if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
129 v.Kind() == reflect.Interface ||
130 v.Kind() == reflect.Slice ||
131 v.Kind() == reflect.Map ||
132 v.Kind() == reflect.Chan ||
133 v.Kind() == reflect.Func) && v.IsNil()) {
134 return true
135 }
136 return false
137 }
138
View as plain text