1 package k8s
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12 "k8s.io/apimachinery/pkg/runtime"
13 "k8s.io/apimachinery/pkg/runtime/schema"
14 pwatch "k8s.io/apimachinery/pkg/watch"
15
16 "k8s.io/client-go/dynamic"
17 _ "k8s.io/client-go/plugin/pkg/client/auth"
18 "k8s.io/client-go/tools/cache"
19 )
20
21 type listWatchAdapter struct {
22 resource dynamic.ResourceInterface
23 fieldSelector string
24 labelSelector string
25 }
26
27 func (lw listWatchAdapter) List(options v1.ListOptions) (runtime.Object, error) {
28 options.FieldSelector = lw.fieldSelector
29 options.LabelSelector = lw.labelSelector
30
31
32 return lw.resource.List(context.TODO(), options)
33 }
34
35 func (lw listWatchAdapter) Watch(options v1.ListOptions) (pwatch.Interface, error) {
36 options.FieldSelector = lw.fieldSelector
37 options.LabelSelector = lw.labelSelector
38 return lw.resource.Watch(context.TODO(), options)
39 }
40
41
42 type Watcher struct {
43 Client *Client
44 watches map[ResourceType]watch
45 stop chan struct{}
46 wg sync.WaitGroup
47 mutex sync.Mutex
48 stopMu sync.Mutex
49 started bool
50 stopped bool
51 }
52
53 type watch struct {
54 query Query
55 resource dynamic.NamespaceableResourceInterface
56 store cache.Store
57 invoke func()
58 runner func()
59 }
60
61
62 func NewWatcher(info *KubeInfo) (*Watcher, error) {
63 cli, err := NewClient(info)
64 if err != nil {
65 return nil, err
66 }
67 return cli.Watcher(), nil
68 }
69
70
71 func (c *Client) Watcher() *Watcher {
72 w := &Watcher{
73 Client: c,
74 watches: make(map[ResourceType]watch),
75 stop: make(chan struct{}),
76 }
77
78 return w
79 }
80
81
82
83 func (w *Watcher) WatchQuery(query Query, listener func(*Watcher) error) error {
84 err := query.resolve(w.Client)
85 if err != nil {
86 return err
87 }
88 ri := query.resourceType
89
90 dyn, err := dynamic.NewForConfig(w.Client.config)
91 if err != nil {
92 return err
93 }
94
95 resource := dyn.Resource(schema.GroupVersionResource{
96 Group: ri.Group,
97 Version: ri.Version,
98 Resource: ri.Name,
99 })
100
101 var watched dynamic.ResourceInterface
102 if ri.Namespaced && query.Namespace != "" {
103 watched = resource.Namespace(query.Namespace)
104 } else {
105 watched = resource
106 }
107
108 invoke := func() {
109 w.mutex.Lock()
110 defer w.mutex.Unlock()
111 if err := listener(w); err != nil {
112 panic(fmt.Errorf("I'm sorry, the pkg/k8s API really painted us in to a hole and I couldn't handle this error properly: %w", err))
113 }
114 }
115
116 store, controller := cache.NewInformer(
117 listWatchAdapter{watched, query.FieldSelector, query.LabelSelector},
118 nil,
119 5*time.Minute,
120 cache.ResourceEventHandlerFuncs{
121 AddFunc: func(obj interface{}) {
122 invoke()
123 },
124 UpdateFunc: func(oldObj, newObj interface{}) {
125 oldUn := oldObj.(*unstructured.Unstructured)
126 newUn := newObj.(*unstructured.Unstructured)
127
128
129
130
131 if oldUn.GetResourceVersion() != newUn.GetResourceVersion() {
132
133
134
135
136
137
138 if oldUn.GetKind() == "Endpoints" &&
139 newUn.GetKind() == "Endpoints" &&
140 oldUn.GetNamespace() == "kube-system" &&
141 newUn.GetNamespace() == "kube-system" {
142 return
143 }
144 invoke()
145 }
146 },
147 DeleteFunc: func(obj interface{}) {
148 invoke()
149 },
150 },
151 )
152
153 runner := func() {
154 controller.Run(w.stop)
155 w.wg.Done()
156 }
157
158 w.watches[ri] = watch{
159 query: query,
160 resource: resource,
161 store: store,
162 invoke: invoke,
163 runner: runner,
164 }
165
166 return nil
167 }
168
169
170 func (w *Watcher) Start(ctx context.Context) error {
171 w.mutex.Lock()
172 if w.started {
173 w.mutex.Unlock()
174 return nil
175 } else {
176 w.started = true
177 w.mutex.Unlock()
178 }
179 for kind := range w.watches {
180 if err := w.sync(ctx, kind); err != nil {
181 return err
182 }
183 }
184
185 for _, watch := range w.watches {
186 watch.invoke()
187 }
188
189 w.wg.Add(len(w.watches))
190 for _, watch := range w.watches {
191 go watch.runner()
192 }
193 return nil
194 }
195
196 func (w *Watcher) sync(ctx context.Context, kind ResourceType) error {
197 watch := w.watches[kind]
198 resources, err := w.Client.ListQuery(ctx, watch.query)
199 if err != nil {
200 return err
201 }
202 for _, rsrc := range resources {
203 var uns unstructured.Unstructured
204 uns.SetUnstructuredContent(rsrc)
205 err = watch.store.Update(&uns)
206 if err != nil {
207 return err
208 }
209 }
210 return nil
211 }
212
213
214 func (w *Watcher) List(kind string) ([]Resource, error) {
215 ri, err := w.Client.ResolveResourceType(kind)
216 if err != nil {
217 return nil, err
218 }
219 watch, ok := w.watches[ri]
220 if ok {
221 objs := watch.store.List()
222 result := make([]Resource, len(objs))
223 for idx, obj := range objs {
224 result[idx] = obj.(*unstructured.Unstructured).UnstructuredContent()
225 }
226 return result, nil
227 }
228 return nil, nil
229 }
230
231
232 func (w *Watcher) UpdateStatus(ctx context.Context, resource Resource) (Resource, error) {
233 ri, err := w.Client.ResolveResourceType(resource.QKind())
234 if err != nil {
235 return nil, err
236 }
237 watch, ok := w.watches[ri]
238 if !ok {
239 return nil, fmt.Errorf("no watch: %v, %v", ri, w.watches)
240 }
241
242 var uns unstructured.Unstructured
243 uns.SetUnstructuredContent(resource)
244
245 var cli dynamic.ResourceInterface
246 if ri.Namespaced {
247 cli = watch.resource.Namespace(uns.GetNamespace())
248 } else {
249 cli = watch.resource
250 }
251
252 result, err := cli.UpdateStatus(ctx, &uns, v1.UpdateOptions{})
253 if err != nil {
254 return nil, err
255 }
256 if err := watch.store.Update(result); err != nil {
257 return nil, err
258 }
259 return result.UnstructuredContent(), nil
260 }
261
262
263 func (w *Watcher) Get(kind, qname string) (Resource, error) {
264 resources, err := w.List(kind)
265 if err != nil {
266 return Resource{}, err
267 }
268 for _, res := range resources {
269 if strings.EqualFold(res.QName(), qname) {
270 return res, nil
271 }
272 }
273 return Resource{}, nil
274 }
275
276
277 func (w *Watcher) Exists(kind, qname string) (bool, error) {
278 resource, err := w.Get(kind, qname)
279 if err != nil {
280 return false, err
281 }
282 return resource.Name() != "", nil
283 }
284
285
286
287
288
289
290
291
292 func (w *Watcher) Stop() {
293
294 w.stopMu.Lock()
295 defer w.stopMu.Unlock()
296 if !w.stopped {
297 close(w.stop)
298 w.stopped = true
299 }
300 }
301
302 func (w *Watcher) Wait(ctx context.Context) error {
303 if err := w.Start(ctx); err != nil {
304 return err
305 }
306 w.wg.Wait()
307 return nil
308 }
309
View as plain text