...

Source file src/github.com/datawire/ambassador/v2/pkg/k8s/watcher.go

Documentation: github.com/datawire/ambassador/v2/pkg/k8s

     1  package k8s
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  	"sync"
     8  	"time"
     9  
    10  	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    11  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    12  	"k8s.io/apimachinery/pkg/runtime"
    13  	"k8s.io/apimachinery/pkg/runtime/schema"
    14  	pwatch "k8s.io/apimachinery/pkg/watch"
    15  
    16  	"k8s.io/client-go/dynamic"
    17  	_ "k8s.io/client-go/plugin/pkg/client/auth"
    18  	"k8s.io/client-go/tools/cache"
    19  )
    20  
    21  type listWatchAdapter struct {
    22  	resource      dynamic.ResourceInterface
    23  	fieldSelector string
    24  	labelSelector string
    25  }
    26  
    27  func (lw listWatchAdapter) List(options v1.ListOptions) (runtime.Object, error) {
    28  	options.FieldSelector = lw.fieldSelector
    29  	options.LabelSelector = lw.labelSelector
    30  	// silently coerce the returned *unstructured.UnstructuredList
    31  	// struct to a runtime.Object interface.
    32  	return lw.resource.List(context.TODO(), options)
    33  }
    34  
    35  func (lw listWatchAdapter) Watch(options v1.ListOptions) (pwatch.Interface, error) {
    36  	options.FieldSelector = lw.fieldSelector
    37  	options.LabelSelector = lw.labelSelector
    38  	return lw.resource.Watch(context.TODO(), options)
    39  }
    40  
    41  // Watcher is a kubernetes watcher that can watch multiple queries simultaneously
    42  type Watcher struct {
    43  	Client  *Client
    44  	watches map[ResourceType]watch
    45  	stop    chan struct{}
    46  	wg      sync.WaitGroup
    47  	mutex   sync.Mutex
    48  	stopMu  sync.Mutex
    49  	started bool
    50  	stopped bool
    51  }
    52  
    53  type watch struct {
    54  	query    Query
    55  	resource dynamic.NamespaceableResourceInterface
    56  	store    cache.Store
    57  	invoke   func()
    58  	runner   func()
    59  }
    60  
    61  // NewWatcher returns a Kubernetes watcher for the specified cluster.
    62  func NewWatcher(info *KubeInfo) (*Watcher, error) {
    63  	cli, err := NewClient(info)
    64  	if err != nil {
    65  		return nil, err
    66  	}
    67  	return cli.Watcher(), nil
    68  }
    69  
    70  // Watcher returns a Kubernetes Watcher for the specified client.
    71  func (c *Client) Watcher() *Watcher {
    72  	w := &Watcher{
    73  		Client:  c,
    74  		watches: make(map[ResourceType]watch),
    75  		stop:    make(chan struct{}),
    76  	}
    77  
    78  	return w
    79  }
    80  
    81  // WatchQuery watches the set of resources identified by the supplied
    82  // query and invokes the supplied listener whenever they change.
    83  func (w *Watcher) WatchQuery(query Query, listener func(*Watcher) error) error {
    84  	err := query.resolve(w.Client)
    85  	if err != nil {
    86  		return err
    87  	}
    88  	ri := query.resourceType
    89  
    90  	dyn, err := dynamic.NewForConfig(w.Client.config)
    91  	if err != nil {
    92  		return err
    93  	}
    94  
    95  	resource := dyn.Resource(schema.GroupVersionResource{
    96  		Group:    ri.Group,
    97  		Version:  ri.Version,
    98  		Resource: ri.Name,
    99  	})
   100  
   101  	var watched dynamic.ResourceInterface
   102  	if ri.Namespaced && query.Namespace != "" {
   103  		watched = resource.Namespace(query.Namespace)
   104  	} else {
   105  		watched = resource
   106  	}
   107  
   108  	invoke := func() {
   109  		w.mutex.Lock()
   110  		defer w.mutex.Unlock()
   111  		if err := listener(w); err != nil {
   112  			panic(fmt.Errorf("I'm sorry, the pkg/k8s API really painted us in to a hole and I couldn't handle this error properly: %w", err))
   113  		}
   114  	}
   115  
   116  	store, controller := cache.NewInformer(
   117  		listWatchAdapter{watched, query.FieldSelector, query.LabelSelector},
   118  		nil,
   119  		5*time.Minute,
   120  		cache.ResourceEventHandlerFuncs{
   121  			AddFunc: func(obj interface{}) {
   122  				invoke()
   123  			},
   124  			UpdateFunc: func(oldObj, newObj interface{}) {
   125  				oldUn := oldObj.(*unstructured.Unstructured)
   126  				newUn := newObj.(*unstructured.Unstructured)
   127  				// we ignore updates for objects
   128  				// already in our store because we
   129  				// assume this means we made the
   130  				// change to them
   131  				if oldUn.GetResourceVersion() != newUn.GetResourceVersion() {
   132  					// kube-scheduler and kube-controller-manager endpoints are
   133  					// updated almost every second, leading to terrible noise,
   134  					// and hence constant listener invokation. So, here we
   135  					// ignore endpoint updates from kube-system namespace. More:
   136  					// https://github.com/kubernetes/kubernetes/issues/41635
   137  					// https://github.com/kubernetes/kubernetes/issues/34627
   138  					if oldUn.GetKind() == "Endpoints" &&
   139  						newUn.GetKind() == "Endpoints" &&
   140  						oldUn.GetNamespace() == "kube-system" &&
   141  						newUn.GetNamespace() == "kube-system" {
   142  						return
   143  					}
   144  					invoke()
   145  				}
   146  			},
   147  			DeleteFunc: func(obj interface{}) {
   148  				invoke()
   149  			},
   150  		},
   151  	)
   152  
   153  	runner := func() {
   154  		controller.Run(w.stop)
   155  		w.wg.Done()
   156  	}
   157  
   158  	w.watches[ri] = watch{
   159  		query:    query,
   160  		resource: resource,
   161  		store:    store,
   162  		invoke:   invoke,
   163  		runner:   runner,
   164  	}
   165  
   166  	return nil
   167  }
   168  
   169  // Start starts the watcher
   170  func (w *Watcher) Start(ctx context.Context) error {
   171  	w.mutex.Lock()
   172  	if w.started {
   173  		w.mutex.Unlock()
   174  		return nil
   175  	} else {
   176  		w.started = true
   177  		w.mutex.Unlock()
   178  	}
   179  	for kind := range w.watches {
   180  		if err := w.sync(ctx, kind); err != nil {
   181  			return err
   182  		}
   183  	}
   184  
   185  	for _, watch := range w.watches {
   186  		watch.invoke()
   187  	}
   188  
   189  	w.wg.Add(len(w.watches))
   190  	for _, watch := range w.watches {
   191  		go watch.runner()
   192  	}
   193  	return nil
   194  }
   195  
   196  func (w *Watcher) sync(ctx context.Context, kind ResourceType) error {
   197  	watch := w.watches[kind]
   198  	resources, err := w.Client.ListQuery(ctx, watch.query)
   199  	if err != nil {
   200  		return err
   201  	}
   202  	for _, rsrc := range resources {
   203  		var uns unstructured.Unstructured
   204  		uns.SetUnstructuredContent(rsrc)
   205  		err = watch.store.Update(&uns)
   206  		if err != nil {
   207  			return err
   208  		}
   209  	}
   210  	return nil
   211  }
   212  
   213  // List lists all the resources with kind `kind`
   214  func (w *Watcher) List(kind string) ([]Resource, error) {
   215  	ri, err := w.Client.ResolveResourceType(kind)
   216  	if err != nil {
   217  		return nil, err
   218  	}
   219  	watch, ok := w.watches[ri]
   220  	if ok {
   221  		objs := watch.store.List()
   222  		result := make([]Resource, len(objs))
   223  		for idx, obj := range objs {
   224  			result[idx] = obj.(*unstructured.Unstructured).UnstructuredContent()
   225  		}
   226  		return result, nil
   227  	}
   228  	return nil, nil
   229  }
   230  
   231  // UpdateStatus updates the status of the `resource` provided
   232  func (w *Watcher) UpdateStatus(ctx context.Context, resource Resource) (Resource, error) {
   233  	ri, err := w.Client.ResolveResourceType(resource.QKind())
   234  	if err != nil {
   235  		return nil, err
   236  	}
   237  	watch, ok := w.watches[ri]
   238  	if !ok {
   239  		return nil, fmt.Errorf("no watch: %v, %v", ri, w.watches)
   240  	}
   241  
   242  	var uns unstructured.Unstructured
   243  	uns.SetUnstructuredContent(resource)
   244  
   245  	var cli dynamic.ResourceInterface
   246  	if ri.Namespaced {
   247  		cli = watch.resource.Namespace(uns.GetNamespace())
   248  	} else {
   249  		cli = watch.resource
   250  	}
   251  
   252  	result, err := cli.UpdateStatus(ctx, &uns, v1.UpdateOptions{})
   253  	if err != nil {
   254  		return nil, err
   255  	}
   256  	if err := watch.store.Update(result); err != nil {
   257  		return nil, err
   258  	}
   259  	return result.UnstructuredContent(), nil
   260  }
   261  
   262  // Get gets the `qname` resource (of kind `kind`)
   263  func (w *Watcher) Get(kind, qname string) (Resource, error) {
   264  	resources, err := w.List(kind)
   265  	if err != nil {
   266  		return Resource{}, err
   267  	}
   268  	for _, res := range resources {
   269  		if strings.EqualFold(res.QName(), qname) {
   270  			return res, nil
   271  		}
   272  	}
   273  	return Resource{}, nil
   274  }
   275  
   276  // Exists returns true if the `qname` resource (of kind `kind`) exists
   277  func (w *Watcher) Exists(kind, qname string) (bool, error) {
   278  	resource, err := w.Get(kind, qname)
   279  	if err != nil {
   280  		return false, err
   281  	}
   282  	return resource.Name() != "", nil
   283  }
   284  
   285  // Stop stops a watch. It is safe to call Stop from multiple
   286  // goroutines and call it multiple times. This is useful, e.g. for
   287  // implementing a timed wait pattern. You can have your watch callback
   288  // test for a condition and invoke Stop() when that condition is met,
   289  // while simultaneously havin a background goroutine call Stop() when
   290  // a timeout is exceeded and not worry about these two things racing
   291  // each other (at least with respect to invoking Stop()).
   292  func (w *Watcher) Stop() {
   293  	// Use a separate lock for Stop so it is safe to call from a watch callback.
   294  	w.stopMu.Lock()
   295  	defer w.stopMu.Unlock()
   296  	if !w.stopped {
   297  		close(w.stop)
   298  		w.stopped = true
   299  	}
   300  }
   301  
   302  func (w *Watcher) Wait(ctx context.Context) error {
   303  	if err := w.Start(ctx); err != nil {
   304  		return err
   305  	}
   306  	w.wg.Wait()
   307  	return nil
   308  }
   309  

View as plain text