...
1
16
17 package source
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "sync"
24
25 "k8s.io/client-go/util/workqueue"
26 "k8s.io/utils/ptr"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28 "sigs.k8s.io/controller-runtime/pkg/event"
29 "sigs.k8s.io/controller-runtime/pkg/handler"
30 internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
31
32 "sigs.k8s.io/controller-runtime/pkg/cache"
33 "sigs.k8s.io/controller-runtime/pkg/predicate"
34 )
35
36
37
38
39
40
41
42
43
44 type Source interface {
45
46
47 Start(context.Context, workqueue.RateLimitingInterface) error
48 }
49
50
51
52 type SyncingSource interface {
53 Source
54 WaitForSync(ctx context.Context) error
55 }
56
57
58 func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource {
59 return &internal.Kind[T]{
60 Type: object,
61 Cache: cache,
62 Handler: handler,
63 Predicates: predicates,
64 }
65 }
66
67 var _ Source = &channel[string]{}
68
69
70 type ChannelOpt[T any] func(*channel[T])
71
72
73 func WithPredicates[T any](p ...predicate.TypedPredicate[T]) ChannelOpt[T] {
74 return func(c *channel[T]) {
75 c.predicates = append(c.predicates, p...)
76 }
77 }
78
79
80
81 func WithBufferSize[T any](bufferSize int) ChannelOpt[T] {
82 return func(c *channel[T]) {
83 c.bufferSize = &bufferSize
84 }
85 }
86
87
88
89
90 func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], opts ...ChannelOpt[T]) Source {
91 c := &channel[T]{
92 source: source,
93 handler: handler,
94 }
95 for _, opt := range opts {
96 opt(c)
97 }
98
99 return c
100 }
101
102 type channel[T any] struct {
103
104 once sync.Once
105
106
107 source <-chan event.TypedGenericEvent[T]
108
109 handler handler.TypedEventHandler[T]
110
111 predicates []predicate.TypedPredicate[T]
112
113 bufferSize *int
114
115
116 dest []chan event.TypedGenericEvent[T]
117
118
119 destLock sync.Mutex
120 }
121
122 func (cs *channel[T]) String() string {
123 return fmt.Sprintf("channel source: %p", cs)
124 }
125
126
127 func (cs *channel[T]) Start(
128 ctx context.Context,
129 queue workqueue.RateLimitingInterface,
130 ) error {
131
132 if cs.source == nil {
133 return fmt.Errorf("must specify Channel.Source")
134 }
135 if cs.handler == nil {
136 return errors.New("must specify Channel.Handler")
137 }
138
139 if cs.bufferSize == nil {
140 cs.bufferSize = ptr.To(1024)
141 }
142
143 dst := make(chan event.TypedGenericEvent[T], *cs.bufferSize)
144
145 cs.destLock.Lock()
146 cs.dest = append(cs.dest, dst)
147 cs.destLock.Unlock()
148
149 cs.once.Do(func() {
150
151 go cs.syncLoop(ctx)
152 })
153
154 go func() {
155 for evt := range dst {
156 shouldHandle := true
157 for _, p := range cs.predicates {
158 if !p.Generic(evt) {
159 shouldHandle = false
160 break
161 }
162 }
163
164 if shouldHandle {
165 func() {
166 ctx, cancel := context.WithCancel(ctx)
167 defer cancel()
168 cs.handler.Generic(ctx, evt, queue)
169 }()
170 }
171 }
172 }()
173
174 return nil
175 }
176
177 func (cs *channel[T]) doStop() {
178 cs.destLock.Lock()
179 defer cs.destLock.Unlock()
180
181 for _, dst := range cs.dest {
182 close(dst)
183 }
184 }
185
186 func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) {
187 cs.destLock.Lock()
188 defer cs.destLock.Unlock()
189
190 for _, dst := range cs.dest {
191
192
193
194
195
196 dst <- evt
197 }
198 }
199
200 func (cs *channel[T]) syncLoop(ctx context.Context) {
201 for {
202 select {
203 case <-ctx.Done():
204
205 cs.doStop()
206 return
207 case evt, stillOpen := <-cs.source:
208 if !stillOpen {
209
210
211 cs.doStop()
212 return
213 }
214 cs.distribute(evt)
215 }
216 }
217 }
218
219
220 type Informer struct {
221
222 Informer cache.Informer
223 Handler handler.EventHandler
224 Predicates []predicate.Predicate
225 }
226
227 var _ Source = &Informer{}
228
229
230
231 func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
232
233 if is.Informer == nil {
234 return fmt.Errorf("must specify Informer.Informer")
235 }
236 if is.Handler == nil {
237 return errors.New("must specify Informer.Handler")
238 }
239
240 _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs())
241 if err != nil {
242 return err
243 }
244 return nil
245 }
246
247 func (is *Informer) String() string {
248 return fmt.Sprintf("informer source: %p", is.Informer)
249 }
250
251 var _ Source = Func(nil)
252
253
254 type Func func(context.Context, workqueue.RateLimitingInterface) error
255
256
257 func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
258 return f(ctx, queue)
259 }
260
261 func (f Func) String() string {
262 return fmt.Sprintf("func source: %p", f)
263 }
264
View as plain text