...
1
16
17 package client
18
19 import (
20 "context"
21 "strings"
22
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 "k8s.io/apimachinery/pkg/runtime"
25 "k8s.io/apimachinery/pkg/watch"
26 "k8s.io/client-go/rest"
27 )
28
29
30 func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) {
31 client, err := newClient(config, options)
32 if err != nil {
33 return nil, err
34 }
35 return &watchingClient{client: client}, nil
36 }
37
38 type watchingClient struct {
39 *client
40 }
41
42 func (w *watchingClient) Watch(ctx context.Context, list ObjectList, opts ...ListOption) (watch.Interface, error) {
43 switch l := list.(type) {
44 case runtime.Unstructured:
45 return w.unstructuredWatch(ctx, l, opts...)
46 case *metav1.PartialObjectMetadataList:
47 return w.metadataWatch(ctx, l, opts...)
48 default:
49 return w.typedWatch(ctx, l, opts...)
50 }
51 }
52
53 func (w *watchingClient) listOpts(opts ...ListOption) ListOptions {
54 listOpts := ListOptions{}
55 listOpts.ApplyOptions(opts)
56 if listOpts.Raw == nil {
57 listOpts.Raw = &metav1.ListOptions{}
58 }
59 listOpts.Raw.Watch = true
60
61 return listOpts
62 }
63
64 func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialObjectMetadataList, opts ...ListOption) (watch.Interface, error) {
65 gvk := obj.GroupVersionKind()
66 gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
67
68 listOpts := w.listOpts(opts...)
69
70 resInt, err := w.client.metadataClient.getResourceInterface(gvk, listOpts.Namespace)
71 if err != nil {
72 return nil, err
73 }
74
75 return resInt.Watch(ctx, *listOpts.AsListOptions())
76 }
77
78 func (w *watchingClient) unstructuredWatch(ctx context.Context, obj runtime.Unstructured, opts ...ListOption) (watch.Interface, error) {
79 r, err := w.client.unstructuredClient.resources.getResource(obj)
80 if err != nil {
81 return nil, err
82 }
83
84 listOpts := w.listOpts(opts...)
85
86 return r.Get().
87 NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
88 Resource(r.resource()).
89 VersionedParams(listOpts.AsListOptions(), w.client.unstructuredClient.paramCodec).
90 Watch(ctx)
91 }
92
93 func (w *watchingClient) typedWatch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) {
94 r, err := w.client.typedClient.resources.getResource(obj)
95 if err != nil {
96 return nil, err
97 }
98
99 listOpts := w.listOpts(opts...)
100
101 return r.Get().
102 NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
103 Resource(r.resource()).
104 VersionedParams(listOpts.AsListOptions(), w.client.typedClient.paramCodec).
105 Watch(ctx)
106 }
107
View as plain text