...
1 package agent
2
3 import (
4 "context"
5 "sync"
6
7 "github.com/datawire/dlib/dlog"
8 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9 "k8s.io/apimachinery/pkg/runtime/schema"
10 "k8s.io/client-go/dynamic"
11 "k8s.io/client-go/dynamic/dynamicinformer"
12 "k8s.io/client-go/tools/cache"
13 )
14
15
16 type CallbackEventType string
17
18 const (
19 CallbackEventAdded CallbackEventType = "ADDED"
20 CallbackEventDeleted CallbackEventType = "DELETED"
21 CallbackEventUpdated CallbackEventType = "UPDATED"
22 )
23
24
25 type GenericCallback struct {
26
27 EventType CallbackEventType
28
29
30
31 Obj *unstructured.Unstructured
32
33
34 Sotw []interface{}
35 }
36
37
38
39
40 type DynamicClient struct {
41 newInformer InformerFunc
42 di dynamic.Interface
43 done bool
44 mux sync.Mutex
45 }
46
47
48 func NewDynamicClient(di dynamic.Interface, informerFn InformerFunc) *DynamicClient {
49 return &DynamicClient{
50 newInformer: informerFn,
51 di: di,
52 }
53 }
54
55
56
57 type Informer interface {
58 AddEventHandler(handler cache.ResourceEventHandler)
59 Run(stopCh <-chan struct{})
60 ListCache() []interface{}
61 }
62
63 type InformerFunc func(dynamic.Interface, string, *schema.GroupVersionResource) Informer
64
65
66 type K8sInformer struct {
67 cache.SharedIndexInformer
68 }
69
70
71
72 func (i *K8sInformer) ListCache() []interface{} {
73 return i.GetStore().List()
74 }
75
76
77 func NewK8sInformer(cli dynamic.Interface, ns string, gvr *schema.GroupVersionResource) Informer {
78 f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(cli, 0, ns, nil)
79 i := f.ForResource(*gvr).Informer()
80 return &K8sInformer{
81 SharedIndexInformer: i,
82 }
83 }
84
85 func (dc *DynamicClient) sendCallback(callbackChan chan<- *GenericCallback, callback *GenericCallback) {
86 dc.mux.Lock()
87 defer dc.mux.Unlock()
88 if dc.done {
89 return
90 }
91 callbackChan <- callback
92 }
93
94
95
96 func (dc *DynamicClient) WatchGeneric(ctx context.Context, ns string, gvr *schema.GroupVersionResource) <-chan *GenericCallback {
97 callbackChan := make(chan *GenericCallback)
98 go func() {
99 <-ctx.Done()
100 dc.mux.Lock()
101 defer dc.mux.Unlock()
102 dc.done = true
103 close(callbackChan)
104 }()
105 i := dc.newInformer(dc.di, ns, gvr)
106 i.AddEventHandler(
107 cache.ResourceEventHandlerFuncs{
108 AddFunc: func(obj interface{}) {
109 dlog.Debugf(ctx, "WatchGeneric: AddFunc called for resource %q", gvr.String())
110 new := obj.(*unstructured.Unstructured)
111 sotw := i.ListCache()
112 callback := &GenericCallback{EventType: CallbackEventAdded, Obj: new, Sotw: sotw}
113 dc.sendCallback(callbackChan, callback)
114 },
115 UpdateFunc: func(oldObj, newObj interface{}) {
116 dlog.Debugf(ctx, "WatchGeneric: UpdateFunc called for resource %q", gvr.String())
117 new := newObj.(*unstructured.Unstructured)
118 sotw := i.ListCache()
119 callback := &GenericCallback{EventType: CallbackEventUpdated, Obj: new, Sotw: sotw}
120 dc.sendCallback(callbackChan, callback)
121 },
122 DeleteFunc: func(obj interface{}) {
123 dlog.Debugf(ctx, "WatchGeneric: DeleteFunc called for resource %q", gvr.String())
124 var old *unstructured.Unstructured
125 switch o := obj.(type) {
126 case cache.DeletedFinalStateUnknown:
127 old = o.Obj.(*unstructured.Unstructured)
128 case *unstructured.Unstructured:
129 old = o
130 }
131 sotw := i.ListCache()
132 callback := &GenericCallback{EventType: CallbackEventDeleted, Obj: old, Sotw: sotw}
133 dc.sendCallback(callbackChan, callback)
134 },
135 },
136 )
137 go i.Run(ctx.Done())
138 dlog.Infof(ctx, "WatchGeneric: Listening for events from resouce %q", gvr.String())
139 return callbackChan
140 }
141
View as plain text