...

Source file src/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go

Documentation: sigs.k8s.io/controller-runtime/pkg/internal/source

     1  package internal
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"reflect"
     8  	"time"
     9  
    10  	"k8s.io/apimachinery/pkg/api/meta"
    11  	"k8s.io/apimachinery/pkg/runtime"
    12  	"k8s.io/apimachinery/pkg/util/wait"
    13  	"k8s.io/client-go/util/workqueue"
    14  
    15  	"sigs.k8s.io/controller-runtime/pkg/cache"
    16  	"sigs.k8s.io/controller-runtime/pkg/client"
    17  	"sigs.k8s.io/controller-runtime/pkg/handler"
    18  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    19  )
    20  
    21  // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
    22  type Kind[T client.Object] struct {
    23  	// Type is the type of object to watch.  e.g. &v1.Pod{}
    24  	Type T
    25  
    26  	// Cache used to watch APIs
    27  	Cache cache.Cache
    28  
    29  	Handler handler.TypedEventHandler[T]
    30  
    31  	Predicates []predicate.TypedPredicate[T]
    32  
    33  	// startedErr may contain an error if one was encountered during startup. If its closed and does not
    34  	// contain an error, startup and syncing finished.
    35  	startedErr  chan error
    36  	startCancel func()
    37  }
    38  
    39  // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
    40  // to enqueue reconcile.Requests.
    41  func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
    42  	if isNil(ks.Type) {
    43  		return fmt.Errorf("must create Kind with a non-nil object")
    44  	}
    45  	if isNil(ks.Cache) {
    46  		return fmt.Errorf("must create Kind with a non-nil cache")
    47  	}
    48  	if isNil(ks.Handler) {
    49  		return errors.New("must create Kind with non-nil handler")
    50  	}
    51  
    52  	// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
    53  	// sync that informer (most commonly due to RBAC issues).
    54  	ctx, ks.startCancel = context.WithCancel(ctx)
    55  	ks.startedErr = make(chan error)
    56  	go func() {
    57  		var (
    58  			i       cache.Informer
    59  			lastErr error
    60  		)
    61  
    62  		// Tries to get an informer until it returns true,
    63  		// an error or the specified context is cancelled or expired.
    64  		if err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {
    65  			// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
    66  			i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
    67  			if lastErr != nil {
    68  				kindMatchErr := &meta.NoKindMatchError{}
    69  				switch {
    70  				case errors.As(lastErr, &kindMatchErr):
    71  					log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
    72  						"kind", kindMatchErr.GroupKind)
    73  				case runtime.IsNotRegisteredError(lastErr):
    74  					log.Error(lastErr, "kind must be registered to the Scheme")
    75  				default:
    76  					log.Error(lastErr, "failed to get informer from cache")
    77  				}
    78  				return false, nil // Retry.
    79  			}
    80  			return true, nil
    81  		}); err != nil {
    82  			if lastErr != nil {
    83  				ks.startedErr <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
    84  				return
    85  			}
    86  			ks.startedErr <- err
    87  			return
    88  		}
    89  
    90  		_, err := i.AddEventHandler(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs())
    91  		if err != nil {
    92  			ks.startedErr <- err
    93  			return
    94  		}
    95  		if !ks.Cache.WaitForCacheSync(ctx) {
    96  			// Would be great to return something more informative here
    97  			ks.startedErr <- errors.New("cache did not sync")
    98  		}
    99  		close(ks.startedErr)
   100  	}()
   101  
   102  	return nil
   103  }
   104  
   105  func (ks *Kind[T]) String() string {
   106  	if !isNil(ks.Type) {
   107  		return fmt.Sprintf("kind source: %T", ks.Type)
   108  	}
   109  	return "kind source: unknown type"
   110  }
   111  
   112  // WaitForSync implements SyncingSource to allow controllers to wait with starting
   113  // workers until the cache is synced.
   114  func (ks *Kind[T]) WaitForSync(ctx context.Context) error {
   115  	select {
   116  	case err := <-ks.startedErr:
   117  		return err
   118  	case <-ctx.Done():
   119  		ks.startCancel()
   120  		if errors.Is(ctx.Err(), context.Canceled) {
   121  			return nil
   122  		}
   123  		return fmt.Errorf("timed out waiting for cache to be synced for Kind %T", ks.Type)
   124  	}
   125  }
   126  
   127  func isNil(arg any) bool {
   128  	if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
   129  		v.Kind() == reflect.Interface ||
   130  		v.Kind() == reflect.Slice ||
   131  		v.Kind() == reflect.Map ||
   132  		v.Kind() == reflect.Chan ||
   133  		v.Kind() == reflect.Func) && v.IsNil()) {
   134  		return true
   135  	}
   136  	return false
   137  }
   138  

View as plain text