1 package kates
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "reflect"
8 "sync"
9 "time"
10
11 "k8s.io/apimachinery/pkg/api/meta"
12 )
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 type Accumulator struct {
53 client *Client
54 fields map[string]*field
55
56 excluded map[string]bool
57 synced int
58 changed chan struct{}
59 mutex sync.Mutex
60 }
61
62 type field struct {
63 query Query
64 selector Selector
65 mapping *meta.RESTMapping
66
67
68 values map[string]*Unstructured
69
70 deltas map[string]*Delta
71
72 synced bool
73 firstUpdate bool
74 }
75
76 type DeltaType int
77
78 const (
79 ObjectAdd DeltaType = iota
80 ObjectUpdate
81 ObjectDelete
82 )
83
84 type changeStatus int
85
86 const (
87 awaitingDispatch changeStatus = iota
88 dispatched
89 )
90
91 func (dt DeltaType) MarshalJSON() ([]byte, error) {
92 switch dt {
93 case ObjectAdd:
94 return []byte(`"add"`), nil
95 case ObjectUpdate:
96 return []byte(`"update"`), nil
97 case ObjectDelete:
98 return []byte(`"delete"`), nil
99 default:
100 return nil, fmt.Errorf("invalid DeltaType enum: %d", dt)
101 }
102 }
103
104 func (dt *DeltaType) UnmarshalJSON(b []byte) error {
105 var str string
106 err := json.Unmarshal(b, &str)
107 if err != nil {
108 return err
109 }
110
111 switch str {
112 case "add":
113 *dt = ObjectAdd
114 case "update":
115 *dt = ObjectUpdate
116 case "delete":
117 *dt = ObjectDelete
118 default:
119 return fmt.Errorf("unrecognized delta type: %s", str)
120 }
121
122 return nil
123 }
124
125 type Delta struct {
126 TypeMeta `json:""`
127 ObjectMeta `json:"metadata,omitempty"`
128 DeltaType DeltaType `json:"deltaType"`
129 }
130
131 func NewDelta(deltaType DeltaType, obj *Unstructured) *Delta {
132 return newDelta(deltaType, obj)
133 }
134
135 func NewDeltaFromObject(deltaType DeltaType, obj Object) (*Delta, error) {
136 var un *Unstructured
137 err := convert(obj, &un)
138 if err != nil {
139 return nil, err
140 }
141 return NewDelta(deltaType, un), nil
142 }
143
144 func newDelta(deltaType DeltaType, obj *Unstructured) *Delta {
145
146 return &Delta{
147 TypeMeta: TypeMeta{
148 APIVersion: obj.GetAPIVersion(),
149 Kind: obj.GetKind(),
150 },
151 ObjectMeta: ObjectMeta{
152 Name: obj.GetName(),
153 Namespace: obj.GetNamespace(),
154
155 CreationTimestamp: obj.GetCreationTimestamp(),
156 },
157 DeltaType: deltaType,
158 }
159 }
160
161 func newAccumulator(ctx context.Context, client *Client, queries ...Query) (*Accumulator, error) {
162 changed := make(chan struct{})
163
164 fields := make(map[string]*field)
165 rawUpdateCh := make(chan rawUpdate)
166
167 for _, q := range queries {
168 field, err := client.newField(q)
169 if err != nil {
170 return nil, err
171 }
172 fields[q.Name] = field
173 client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(field.mapping, q.Namespace))
174 }
175
176 acc := &Accumulator{
177 client: client,
178 fields: fields,
179 excluded: map[string]bool{},
180 synced: 0,
181 changed: changed,
182 mutex: sync.Mutex{},
183 }
184
185 go acc.Listen(ctx, rawUpdateCh, client.maxAccumulatorInterval)
186
187 return acc, nil
188 }
189
190
191
192
193
194
195 func (a *Accumulator) Listen(ctx context.Context, rawUpdateCh <-chan rawUpdate, interval time.Duration) {
196 ticker := time.NewTicker(interval)
197 defer ticker.Stop()
198 var changeStatus changeStatus
199 var lastChangeSent time.Time
200 var synced bool
201
202 sendUpdate := func() {
203 a.changed <- struct{}{}
204 changeStatus = dispatched
205 lastChangeSent = time.Now()
206 }
207
208 for {
209 select {
210
211
212
213
214
215
216 case rawUp := <-rawUpdateCh:
217 synced = a.storeUpdate(rawUp)
218 since := rawUp.ts.Sub(lastChangeSent)
219 if synced && since >= interval {
220 sendUpdate()
221 } else {
222 changeStatus = awaitingDispatch
223 }
224 case <-ticker.C:
225 if synced && changeStatus == awaitingDispatch {
226 sendUpdate()
227 }
228 case <-ctx.Done():
229 return
230 }
231 }
232 }
233
234 func (a *Accumulator) Changed() <-chan struct{} {
235 return a.changed
236 }
237
238 func (a *Accumulator) Update(ctx context.Context, target interface{}) (bool, error) {
239 return a.UpdateWithDeltas(ctx, target, nil)
240 }
241
242 func (a *Accumulator) UpdateWithDeltas(ctx context.Context, target interface{}, deltas *[]*Delta) (bool, error) {
243 return a.FilteredUpdate(ctx, target, deltas, nil)
244 }
245
246
247
248
249 func (a *Accumulator) FilteredUpdate(ctx context.Context, target interface{}, deltas *[]*Delta, predicate func(*Unstructured) bool) (bool, error) {
250 a.mutex.Lock()
251 defer a.mutex.Unlock()
252 return a.update(ctx, reflect.ValueOf(target), deltas, predicate)
253 }
254
255 func (a *Accumulator) storeUpdate(update rawUpdate) bool {
256 a.mutex.Lock()
257 defer a.mutex.Unlock()
258 field := a.fields[update.name]
259 if update.new != nil {
260 key := unKey(update.new)
261 oldValue, oldExists := field.values[key]
262 field.values[key] = update.new
263
264 if oldExists && oldValue.GetResourceVersion() == update.new.GetResourceVersion() {
265
266
267 } else {
268 if update.old == nil {
269 field.deltas[key] = newDelta(ObjectAdd, update.new)
270 } else {
271 field.deltas[key] = newDelta(ObjectUpdate, update.new)
272 }
273 }
274 } else if update.old != nil {
275 key := unKey(update.old)
276 _, oldExists := field.values[key]
277 delete(field.values, key)
278
279 if !oldExists {
280
281
282 } else {
283 field.deltas[key] = newDelta(ObjectDelete, update.old)
284 }
285 }
286 if update.synced && !field.synced {
287 field.synced = true
288 a.synced += 1
289 }
290 return a.synced >= len(a.fields)
291 }
292
293 func (a *Accumulator) updateField(
294 ctx context.Context,
295 target reflect.Value,
296 name string,
297 field *field,
298 deltas *[]*Delta,
299 predicate func(*Unstructured) bool,
300 ) (bool, error) {
301 if err := a.client.patchWatch(ctx, field); err != nil {
302 return false, err
303 }
304
305 if field.firstUpdate && len(field.deltas) == 0 {
306 return false, nil
307 }
308
309 field.firstUpdate = true
310 for key, delta := range field.deltas {
311 delete(field.deltas, key)
312 if deltas != nil {
313 *deltas = append(*deltas, delta)
314 }
315
316 if predicate != nil {
317 if delta.DeltaType == ObjectDelete {
318 delete(a.excluded, key)
319 } else {
320 un := field.values[key]
321 if predicate(un) {
322 delete(a.excluded, key)
323 } else {
324 a.excluded[key] = true
325 }
326 }
327 }
328 }
329
330 var items []*Unstructured
331 for key, un := range field.values {
332 if a.excluded[key] {
333 continue
334 }
335 items = append(items, un)
336 }
337
338 jsonBytes, err := json.Marshal(items)
339 if err != nil {
340 return false, err
341 }
342
343 fieldEntry, ok := target.Type().Elem().FieldByName(name)
344 if !ok {
345 return false, fmt.Errorf("no such field: %q", name)
346 }
347
348 var val reflect.Value
349 if fieldEntry.Type.Kind() == reflect.Slice {
350 val = reflect.New(fieldEntry.Type)
351 err := json.Unmarshal(jsonBytes, val.Interface())
352 if err != nil {
353 return false, err
354 }
355 } else if fieldEntry.Type.Kind() == reflect.Map {
356 val = reflect.MakeMap(fieldEntry.Type)
357 for _, item := range items {
358 innerVal := reflect.New(fieldEntry.Type.Elem())
359 err := convert(item, innerVal.Interface())
360 if err != nil {
361 return false, err
362 }
363 val.SetMapIndex(reflect.ValueOf(item.GetName()), reflect.Indirect(innerVal))
364 }
365 } else {
366 return false, fmt.Errorf("don't know how to unmarshal to: %v", fieldEntry.Type)
367 }
368
369 target.Elem().FieldByName(name).Set(reflect.Indirect(val))
370
371 return true, nil
372 }
373
374 func (a *Accumulator) update(ctx context.Context, target reflect.Value, deltas *[]*Delta, predicate func(*Unstructured) bool) (bool, error) {
375 if deltas != nil {
376 *deltas = nil
377 }
378
379 updated := false
380 for name, field := range a.fields {
381 _updated, err := a.updateField(ctx, target, name, field, deltas, predicate)
382 if _updated {
383 updated = true
384 }
385 if err != nil {
386 return updated, err
387 }
388 }
389
390 return updated, nil
391 }
392
View as plain text