1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package resourcewatcher
16
17 import (
18 "context"
19 "fmt"
20
21 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
22
23 "github.com/go-logr/logr"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26 "k8s.io/apimachinery/pkg/fields"
27 "k8s.io/apimachinery/pkg/runtime/schema"
28 "k8s.io/apimachinery/pkg/types"
29 "k8s.io/apimachinery/pkg/watch"
30 "k8s.io/client-go/dynamic"
31 "k8s.io/client-go/rest"
32 )
33
34 type ResourceWatcher struct {
35 dynamicClient dynamic.Interface
36 logger logr.Logger
37 }
38
39
40
41 func New(clientConfig *rest.Config, logger logr.Logger) (*ResourceWatcher, error) {
42 dynamicClient, err := dynamic.NewForConfig(clientConfig)
43 if err != nil {
44 return nil, err
45 }
46 return NewWithClient(dynamicClient, logger), nil
47 }
48
49 func NewWithClient(dynamicClient dynamic.Interface, logger logr.Logger) *ResourceWatcher {
50 return &ResourceWatcher{
51 dynamicClient: dynamicClient,
52 logger: logger.WithName("resourcewatcher"),
53 }
54 }
55
56
57
58
59
60 func (r *ResourceWatcher) WaitForResourceToBeReady(ctx context.Context, nn types.NamespacedName, gvk schema.GroupVersionKind) error {
61 logger := r.logger.WithValues("resource", nn, "resourceGVK", gvk)
62 watch, err := r.WatchResource(ctx, nn, gvk)
63 if err != nil {
64 return err
65 }
66 defer watch.Stop()
67 logger.Info("successfully created watch on resource")
68 return WaitForResourceToBeReadyViaWatch(ctx, watch, logger)
69 }
70
71
72 func (r *ResourceWatcher) WatchResource(ctx context.Context, nn types.NamespacedName, gvk schema.GroupVersionKind) (watch.Interface, error) {
73 client := r.dynamicClient.Resource(k8s.ToGVR(gvk)).Namespace(nn.Namespace)
74 nameSelector := fields.OneTermEqualSelector("metadata.name", nn.Name).String()
75 watch, err := client.Watch(ctx, metav1.ListOptions{FieldSelector: nameSelector})
76 if err != nil {
77 return nil, fmt.Errorf("error creating watch on resource: %w", err)
78 }
79 return watch, nil
80 }
81
82
83
84
85
86 func WaitForResourceToBeReadyViaWatch(ctx context.Context, watch watch.Interface, logger logr.Logger) error {
87 for {
88 select {
89 case <-ctx.Done():
90 return fmt.Errorf("context was cancelled: %w", ctx.Err())
91 case event, ok := <-watch.ResultChan():
92 if !ok {
93 return fmt.Errorf("watch channel was closed")
94 }
95 ok, reason, err := isResourceReady(event)
96 if err != nil {
97 return fmt.Errorf("error checking if resource is ready: %w", err)
98 }
99 if !ok {
100 logger.Info("resource not ready", "reason", reason)
101 continue
102 }
103 logger.Info("resource is ready")
104 return nil
105 }
106 }
107 }
108
109
110
111
112 func isResourceReady(event watch.Event) (ok bool, reason string, err error) {
113 if event.Type != watch.Modified && event.Type != watch.Added {
114 return false, fmt.Sprintf("got watch event of type '%v', want event type '%v' or '%v'", event.Type, watch.Modified, watch.Added), nil
115 }
116 u, ok := event.Object.(*unstructured.Unstructured)
117 if !ok {
118 return false, "", fmt.Errorf("error casting event object '%v' of kind '%v' to unstructured", event.Object, event.Object.GetObjectKind())
119 }
120 resource, err := k8s.NewResource(u)
121 if err != nil {
122 return false, "", fmt.Errorf("error converting unstructured to resource: %w", err)
123 }
124
125
126 if resource.Kind == "Secret" {
127 return true, "", nil
128 }
129 if !k8s.IsResourceReady(resource) {
130 return false, "resource not ready", nil
131 }
132 return true, "", nil
133 }
134
View as plain text