...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher/resourcewatcher.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package resourcewatcher
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  
    21  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    22  
    23  	"github.com/go-logr/logr"
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    26  	"k8s.io/apimachinery/pkg/fields"
    27  	"k8s.io/apimachinery/pkg/runtime/schema"
    28  	"k8s.io/apimachinery/pkg/types"
    29  	"k8s.io/apimachinery/pkg/watch"
    30  	"k8s.io/client-go/dynamic"
    31  	"k8s.io/client-go/rest"
    32  )
    33  
    34  type ResourceWatcher struct {
    35  	dynamicClient dynamic.Interface
    36  	logger        logr.Logger
    37  }
    38  
    39  // New creates a new ResourceWatcher that uses a dynamic client
    40  // to monitor the status of requested resources
    41  func New(clientConfig *rest.Config, logger logr.Logger) (*ResourceWatcher, error) {
    42  	dynamicClient, err := dynamic.NewForConfig(clientConfig)
    43  	if err != nil {
    44  		return nil, err
    45  	}
    46  	return NewWithClient(dynamicClient, logger), nil
    47  }
    48  
    49  func NewWithClient(dynamicClient dynamic.Interface, logger logr.Logger) *ResourceWatcher {
    50  	return &ResourceWatcher{
    51  		dynamicClient: dynamicClient,
    52  		logger:        logger.WithName("resourcewatcher"),
    53  	}
    54  }
    55  
    56  // WaitForResourceToBeReady waits for the resource identified by the given GVK
    57  // and NamespacedName. It blocks until the resource is ready, an error occurs, or a context
    58  // cancellation occurs. Note that a nil return value signifies that the resource is ready and
    59  // no errors have occurred.
    60  func (r *ResourceWatcher) WaitForResourceToBeReady(ctx context.Context, nn types.NamespacedName, gvk schema.GroupVersionKind) error {
    61  	logger := r.logger.WithValues("resource", nn, "resourceGVK", gvk)
    62  	watch, err := r.WatchResource(ctx, nn, gvk)
    63  	if err != nil {
    64  		return err
    65  	}
    66  	defer watch.Stop()
    67  	logger.Info("successfully created watch on resource")
    68  	return WaitForResourceToBeReadyViaWatch(ctx, watch, logger)
    69  }
    70  
    71  // WatchResource creates a watch on a resource identified by the given GVK and NamespacedName.
    72  func (r *ResourceWatcher) WatchResource(ctx context.Context, nn types.NamespacedName, gvk schema.GroupVersionKind) (watch.Interface, error) {
    73  	client := r.dynamicClient.Resource(k8s.ToGVR(gvk)).Namespace(nn.Namespace)
    74  	nameSelector := fields.OneTermEqualSelector("metadata.name", nn.Name).String()
    75  	watch, err := client.Watch(ctx, metav1.ListOptions{FieldSelector: nameSelector})
    76  	if err != nil {
    77  		return nil, fmt.Errorf("error creating watch on resource: %w", err)
    78  	}
    79  	return watch, nil
    80  }
    81  
    82  // WaitForResourceToBeReadyViaWatch monitors a given 'Watch' for any
    83  // updates to the resource that the given 'Watch' is targeting. Note that
    84  // an error is returned to signify a failure during the 'Watch' process,
    85  // while nil is returned to signify the watched resource is ready.
    86  func WaitForResourceToBeReadyViaWatch(ctx context.Context, watch watch.Interface, logger logr.Logger) error {
    87  	for {
    88  		select {
    89  		case <-ctx.Done():
    90  			return fmt.Errorf("context was cancelled: %w", ctx.Err())
    91  		case event, ok := <-watch.ResultChan():
    92  			if !ok {
    93  				return fmt.Errorf("watch channel was closed")
    94  			}
    95  			ok, reason, err := isResourceReady(event)
    96  			if err != nil {
    97  				return fmt.Errorf("error checking if resource is ready: %w", err)
    98  			}
    99  			if !ok {
   100  				logger.Info("resource not ready", "reason", reason)
   101  				continue
   102  			}
   103  			logger.Info("resource is ready")
   104  			return nil
   105  		}
   106  	}
   107  }
   108  
   109  // isResourceReady returns whether a resource identified by the given GVK
   110  // and NamespacedName is ready. Note that a 'reason' for failure is returned only
   111  // when the resource is not ready and no fatal error has occurred.
   112  func isResourceReady(event watch.Event) (ok bool, reason string, err error) {
   113  	if event.Type != watch.Modified && event.Type != watch.Added {
   114  		return false, fmt.Sprintf("got watch event of type '%v', want event type '%v' or '%v'", event.Type, watch.Modified, watch.Added), nil
   115  	}
   116  	u, ok := event.Object.(*unstructured.Unstructured)
   117  	if !ok {
   118  		return false, "", fmt.Errorf("error casting event object '%v' of kind '%v' to unstructured", event.Object, event.Object.GetObjectKind())
   119  	}
   120  	resource, err := k8s.NewResource(u)
   121  	if err != nil {
   122  		return false, "", fmt.Errorf("error converting unstructured to resource: %w", err)
   123  	}
   124  	// Secrets don't have a 'ready' condition. As long as they can be
   125  	// found on the API server, we consider them ready as resources.
   126  	if resource.Kind == "Secret" {
   127  		return true, "", nil
   128  	}
   129  	if !k8s.IsResourceReady(resource) {
   130  		return false, "resource not ready", nil
   131  	}
   132  	return true, "", nil
   133  }
   134  

View as plain text