...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/operator/pkg/controllers/customizationwatcher.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/operator/pkg/controllers

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package controllers
    16  
    17  import (
    18  	"context"
    19  
    20  	customizev1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/operator/pkg/apis/core/customize/v1alpha1"
    21  	corekcck8s "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    22  
    23  	"github.com/go-logr/logr"
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    26  	"k8s.io/apimachinery/pkg/runtime/schema"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"k8s.io/apimachinery/pkg/watch"
    29  	"k8s.io/client-go/dynamic"
    30  	"sigs.k8s.io/controller-runtime/pkg/event"
    31  )
    32  
    33  var (
    34  	// CustomizationCRDsToWatch contains all the customization CRDs to watch
    35  	CustomizationCRDsToWatch = []schema.GroupVersionResource{
    36  		corekcck8s.ToGVR(customizev1alpha1.ControllerResourceGroupVersionKind),
    37  	}
    38  )
    39  
    40  // CustomizationWatcher setup watches on 'triggerGVRs', raises events on 'target'.
    41  type CustomizationWatcher struct {
    42  	// triggerGVRs contains all the GVRs to watch. An event on triggerGVRs will raise an event on the target.
    43  	triggerGVRs []schema.GroupVersionResource
    44  	targetNN    types.NamespacedName
    45  	// watchRegistered tracks the GVRs we are currently watching, avoid duplicate watches.
    46  	watchRegistered map[string]struct{}
    47  	// events is the channel that an event is raised and send to when CustomizationWatcher
    48  	// receives a watch event on the triggerGVRs it is watching.
    49  	events        chan event.GenericEvent
    50  	dynamicClient dynamic.Interface
    51  	log           logr.Logger
    52  }
    53  
    54  func NewWithDynamicClient(dc dynamic.Interface, gvrs []schema.GroupVersionResource, logger logr.Logger) *CustomizationWatcher {
    55  	return &CustomizationWatcher{
    56  		dynamicClient:   dc,
    57  		triggerGVRs:     gvrs,
    58  		watchRegistered: make(map[string]struct{}),
    59  		events:          make(chan event.GenericEvent),
    60  		log:             logger.WithName("customization-watcher"),
    61  	}
    62  }
    63  
    64  // Events returns a channel with events raised on target.
    65  func (w *CustomizationWatcher) Events() chan event.GenericEvent {
    66  	return w.events
    67  }
    68  
    69  // EnsureWatchStarted starts watches on triggerGVRs if not already done so.
    70  func (w *CustomizationWatcher) EnsureWatchStarted(ctx context.Context, nn types.NamespacedName) error {
    71  	w.targetNN = nn
    72  	for _, gvr := range w.triggerGVRs {
    73  		if _, started := w.watchRegistered[gvr.String()]; started {
    74  			continue
    75  		}
    76  		w.log.Info("starting watch", "trigger GVR", gvr.String(), "target NamespacedName", nn)
    77  		go w.startWatch(ctx, gvr)
    78  	}
    79  	return nil
    80  }
    81  
    82  // startWatch starts a watch for changes to "triggerGVR" and raises events on "targetNN".
    83  func (w *CustomizationWatcher) startWatch(ctx context.Context, triggerGVR schema.GroupVersionResource) {
    84  	log := w.log.WithValues("trigger GVR", triggerGVR, "target NamespacedName", w.targetNN)
    85  	opts := metav1.ListOptions{AllowWatchBookmarks: true}
    86  	triggerEvents, err := w.dynamicClient.Resource(triggerGVR).Namespace(w.targetNN.Namespace).Watch(ctx, opts)
    87  	if err != nil {
    88  		log.Error(err, "failed to start watch")
    89  		return
    90  	}
    91  	log.Info("watch started")
    92  	w.watchRegistered[triggerGVR.String()] = struct{}{}
    93  	defer func() {
    94  		triggerEvents.Stop()
    95  		delete(w.watchRegistered, triggerGVR.String())
    96  	}()
    97  	for {
    98  		select {
    99  		case <-ctx.Done():
   100  			log.Info("watch context cancelled")
   101  			return
   102  		case triggerEvent, ok := <-triggerEvents.ResultChan():
   103  			if !ok {
   104  				log.Info("watch channel closed")
   105  				return
   106  			}
   107  			switch triggerEvent.Type {
   108  			case watch.Bookmark:
   109  				continue
   110  			case watch.Error:
   111  				log.Error(err, "unexpected error from watch")
   112  			}
   113  			genEvent := event.GenericEvent{}
   114  			genEvent.Object = &unstructured.Unstructured{}
   115  			genEvent.Object.SetNamespace(w.targetNN.Namespace)
   116  			genEvent.Object.SetName(w.targetNN.Name)
   117  			w.events <- genEvent
   118  		}
   119  	}
   120  }
   121  

View as plain text