...

Source file src/github.com/datawire/ambassador/v2/pkg/agent/k8s.go

Documentation: github.com/datawire/ambassador/v2/pkg/agent

     1  package agent
     2  
     3  import (
     4  	"context"
     5  	"sync"
     6  
     7  	"github.com/datawire/dlib/dlog"
     8  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
     9  	"k8s.io/apimachinery/pkg/runtime/schema"
    10  	"k8s.io/client-go/dynamic"
    11  	"k8s.io/client-go/dynamic/dynamicinformer"
    12  	"k8s.io/client-go/tools/cache"
    13  )
    14  
    15  // CallbackEventType defines the possible callback types of events.
    16  type CallbackEventType string
    17  
    18  const (
    19  	CallbackEventAdded   CallbackEventType = "ADDED"
    20  	CallbackEventDeleted CallbackEventType = "DELETED"
    21  	CallbackEventUpdated CallbackEventType = "UPDATED"
    22  )
    23  
    24  // GenericCallback is used to be returned in the channel managed by the WatchGeneric method.
    25  type GenericCallback struct {
    26  	// EventType is the event type that originated this callback.
    27  	EventType CallbackEventType
    28  
    29  	// Obj has the new resource state for this event type. If event type is CallbackEventDeleted
    30  	// it will contain the last resource state before being deleted.
    31  	Obj *unstructured.Unstructured
    32  
    33  	// Sotw has the state of the world for all resources of the type being watched.
    34  	Sotw []interface{}
    35  }
    36  
    37  // DynamicClient is the struct that provides the main functionality of watching
    38  // generic Kubernetes resources that may of may not be available (installed) in
    39  // the cluster.
    40  type DynamicClient struct {
    41  	newInformer InformerFunc
    42  	di          dynamic.Interface
    43  	done        bool
    44  	mux         sync.Mutex
    45  }
    46  
    47  // NewDynamicClient is the main contructor of DynamicClient
    48  func NewDynamicClient(di dynamic.Interface, informerFn InformerFunc) *DynamicClient {
    49  	return &DynamicClient{
    50  		newInformer: informerFn,
    51  		di:          di,
    52  	}
    53  }
    54  
    55  // Informer holds the operations necessary from a k8s informer in order to
    56  // provide the functionality to watch a generic resource.
    57  type Informer interface {
    58  	AddEventHandler(handler cache.ResourceEventHandler)
    59  	Run(stopCh <-chan struct{})
    60  	ListCache() []interface{}
    61  }
    62  
    63  type InformerFunc func(dynamic.Interface, string, *schema.GroupVersionResource) Informer
    64  
    65  // K8sInformer is a real Informer implementation.
    66  type K8sInformer struct {
    67  	cache.SharedIndexInformer
    68  }
    69  
    70  // ListCache will return the current state of the cache store from the Kubernetes
    71  // informer.
    72  func (i *K8sInformer) ListCache() []interface{} {
    73  	return i.GetStore().List()
    74  }
    75  
    76  // NewK8sInformer builds and returns a real Kubernetes Informer implementation.
    77  func NewK8sInformer(cli dynamic.Interface, ns string, gvr *schema.GroupVersionResource) Informer {
    78  	f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(cli, 0, ns, nil)
    79  	i := f.ForResource(*gvr).Informer()
    80  	return &K8sInformer{
    81  		SharedIndexInformer: i,
    82  	}
    83  }
    84  
    85  func (dc *DynamicClient) sendCallback(callbackChan chan<- *GenericCallback, callback *GenericCallback) {
    86  	dc.mux.Lock()
    87  	defer dc.mux.Unlock()
    88  	if dc.done {
    89  		return
    90  	}
    91  	callbackChan <- callback
    92  }
    93  
    94  // WatchGeneric will watch any resource existing in the cluster or not. This is usefull for
    95  // watching CRDs that may or may not be available in the cluster.
    96  func (dc *DynamicClient) WatchGeneric(ctx context.Context, ns string, gvr *schema.GroupVersionResource) <-chan *GenericCallback {
    97  	callbackChan := make(chan *GenericCallback)
    98  	go func() {
    99  		<-ctx.Done()
   100  		dc.mux.Lock()
   101  		defer dc.mux.Unlock()
   102  		dc.done = true
   103  		close(callbackChan)
   104  	}()
   105  	i := dc.newInformer(dc.di, ns, gvr)
   106  	i.AddEventHandler(
   107  		cache.ResourceEventHandlerFuncs{
   108  			AddFunc: func(obj interface{}) {
   109  				dlog.Debugf(ctx, "WatchGeneric: AddFunc called for resource %q", gvr.String())
   110  				new := obj.(*unstructured.Unstructured)
   111  				sotw := i.ListCache()
   112  				callback := &GenericCallback{EventType: CallbackEventAdded, Obj: new, Sotw: sotw}
   113  				dc.sendCallback(callbackChan, callback)
   114  			},
   115  			UpdateFunc: func(oldObj, newObj interface{}) {
   116  				dlog.Debugf(ctx, "WatchGeneric: UpdateFunc called for resource %q", gvr.String())
   117  				new := newObj.(*unstructured.Unstructured)
   118  				sotw := i.ListCache()
   119  				callback := &GenericCallback{EventType: CallbackEventUpdated, Obj: new, Sotw: sotw}
   120  				dc.sendCallback(callbackChan, callback)
   121  			},
   122  			DeleteFunc: func(obj interface{}) {
   123  				dlog.Debugf(ctx, "WatchGeneric: DeleteFunc called for resource %q", gvr.String())
   124  				var old *unstructured.Unstructured
   125  				switch o := obj.(type) {
   126  				case cache.DeletedFinalStateUnknown:
   127  					old = o.Obj.(*unstructured.Unstructured)
   128  				case *unstructured.Unstructured:
   129  					old = o
   130  				}
   131  				sotw := i.ListCache()
   132  				callback := &GenericCallback{EventType: CallbackEventDeleted, Obj: old, Sotw: sotw}
   133  				dc.sendCallback(callbackChan, callback)
   134  			},
   135  		},
   136  	)
   137  	go i.Run(ctx.Done())
   138  	dlog.Infof(ctx, "WatchGeneric: Listening for events from resouce %q", gvr.String())
   139  	return callbackChan
   140  }
   141  

View as plain text