...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/kates/client.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/kates

     1  package kates
     2  
     3  import (
     4  	"bufio"
     5  	"context"
     6  	"fmt"
     7  	"io"
     8  	"os"
     9  	"reflect"
    10  	"strconv"
    11  	"strings"
    12  	"sync"
    13  	"time"
    14  
    15  	"github.com/pkg/errors"
    16  	"github.com/spf13/pflag"
    17  
    18  	// k8s libraries
    19  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    20  	"k8s.io/apimachinery/pkg/api/meta"
    21  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    22  	"k8s.io/apimachinery/pkg/runtime"
    23  	"k8s.io/apimachinery/pkg/runtime/schema"
    24  	"k8s.io/apimachinery/pkg/watch"
    25  	"k8s.io/client-go/discovery"
    26  	"k8s.io/client-go/discovery/cached/disk"
    27  	"k8s.io/client-go/discovery/cached/memory"
    28  	"k8s.io/client-go/dynamic"
    29  	"k8s.io/client-go/rest"
    30  	"k8s.io/client-go/restmapper"
    31  	"k8s.io/client-go/tools/cache"
    32  	"k8s.io/kubectl/pkg/polymorphichelpers"
    33  
    34  	// k8s types
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  
    37  	// k8s plugins
    38  	_ "k8s.io/client-go/plugin/pkg/client/auth"
    39  
    40  	"github.com/datawire/dlib/dlog"
    41  	kates_internal "github.com/emissary-ingress/emissary/v3/pkg/kates_internal"
    42  )
    43  
    44  // The Client struct provides an interface to interact with the kubernetes api-server. You can think
    45  // of it like a programatic version of the familiar kubectl command line tool. In fact a goal of
    46  // these APIs is that where possible, your knowledge of kubectl should translate well into using
    47  // these APIs. It provides a golang-friendly way to perform basic CRUD and Watch operations on
    48  // kubernetes resources, as well as providing some additional capabilities.
    49  //
    50  // Differences from kubectl:
    51  //
    52  //   - You can also use a Client to update the status of a resource.
    53  //   - The Client struct cannot perform an apply operation.
    54  //   - The Client provides Read/write coherence (more about this below).
    55  //   - The Client provides load shedding via event coalescing for watches.
    56  //   - The Client provides bootstrapping of multiple watches.
    57  //
    58  // The biggest difference from kubectl (and also from using client-go directly) is the Read/Write
    59  // coherence it provides. Kubernetes Watches are inherently asynchronous. This means that if a
    60  // kubernetes resource is modified at time T0, a client won't find out about it until some later
    61  // time T1. It is normally difficult to notice this since the delay may be quite small, however if
    62  // you are writing a controller that uses watches in combination with modifying the resources it is
    63  // watching, the delay is big enough that a program will often be "notified" with versions of
    64  // resources that do not included updates made by the program itself. This even happens when a
    65  // program has a lock and is guaranteed to be the only process modifying a given resource. Needless
    66  // to say, programming against an API like this can make for some brain twisting logic. The Client
    67  // struct allows for much simpler code by tracking what changes have been made locally and updating
    68  // all Watch results with the most recent version of an object, thereby providing the guarantee that
    69  // your Watch results will *always* include any changes you have made via the Client performing the
    70  // watch.
    71  //
    72  // Additionally, the Accumulator API provides two key pieces of watch related functionality:
    73  //
    74  //  1. By coalescing multiple updates behind the scenes, the Accumulator API provides a natural
    75  //     form of load shedding if a user of the API cannot keep up with every single update.
    76  //
    77  //  2. The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on
    78  //     all watches prior to notifying the user that resources are available to process.
    79  type Client struct {
    80  	config                 *ConfigFlags
    81  	cli                    dynamic.Interface
    82  	mapper                 meta.RESTMapper
    83  	disco                  discovery.CachedDiscoveryInterface
    84  	mutex                  sync.Mutex
    85  	canonical              map[string]*Unstructured
    86  	maxAccumulatorInterval time.Duration
    87  
    88  	// This is an internal interface for testing, it lets us deliberately introduce delays into the
    89  	// implementation, e.g. effectively increasing the latency to the api server in a controllable
    90  	// way and letting us reproduce and test for race conditions far more efficiently than
    91  	// otherwise.
    92  	watchAdded   func(*Unstructured, *Unstructured)
    93  	watchUpdated func(*Unstructured, *Unstructured)
    94  	watchDeleted func(*Unstructured, *Unstructured)
    95  }
    96  
    97  // The ClientConfig struct holds all the parameters and configuration
    98  // that can be passed upon construct of a new Client.
    99  type ClientConfig struct {
   100  	Kubeconfig string
   101  	Context    string
   102  	Namespace  string
   103  }
   104  
   105  // The NewClient function constructs a new client with the supplied ClientConfig.
   106  func NewClient(options ClientConfig) (*Client, error) {
   107  	return NewClientFromConfigFlags(options.toConfigFlags())
   108  }
   109  
   110  // NewClientFactory adds flags to a flagset (i.e. before flagset.Parse()), and returns a function to
   111  // be called after flagset.Parse() that uses the parsed flags to construct a *Client.
   112  func NewClientFactory(flags *pflag.FlagSet) func() (*Client, error) {
   113  	if flags.Parsed() {
   114  		// panic is OK because this is a programming error.
   115  		panic("kates.NewClientFactory(flagset) must be called before flagset.Parse()")
   116  	}
   117  
   118  	config := NewConfigFlags(false)
   119  
   120  	// We can disable or enable flags by setting them to
   121  	// nil/non-nil prior to calling .AddFlags().
   122  	//
   123  	// .Username and .Password are already disabled by default in
   124  	// genericclioptions.NewConfigFlags().
   125  
   126  	config.AddFlags(flags)
   127  
   128  	return func() (*Client, error) {
   129  		if !flags.Parsed() {
   130  			return nil, fmt.Errorf("kates client factory must be called after flagset.Parse()")
   131  		}
   132  		return NewClientFromConfigFlags(config)
   133  	}
   134  }
   135  
   136  func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error) {
   137  	restconfig, err := config.ToRESTConfig()
   138  	if err != nil {
   139  		return nil, err
   140  	}
   141  
   142  	cli, err := dynamic.NewForConfig(restconfig)
   143  	if err != nil {
   144  		return nil, err
   145  	}
   146  
   147  	mapper, disco, err := NewRESTMapper(config)
   148  	if err != nil {
   149  		return nil, err
   150  	}
   151  
   152  	return &Client{
   153  		config:                 config,
   154  		cli:                    cli,
   155  		mapper:                 mapper,
   156  		disco:                  disco,
   157  		canonical:              make(map[string]*Unstructured),
   158  		maxAccumulatorInterval: 1 * time.Second,
   159  		watchAdded:             func(oldObj, newObj *Unstructured) {},
   160  		watchUpdated:           func(oldObj, newObj *Unstructured) {},
   161  		watchDeleted:           func(oldObj, newObj *Unstructured) {},
   162  	}, nil
   163  }
   164  
   165  func NewRESTMapper(config *ConfigFlags) (meta.RESTMapper, discovery.CachedDiscoveryInterface, error) {
   166  	// Throttling is scoped to rest.Config, so we use a dedicated
   167  	// rest.Config for discovery so we can disable throttling for
   168  	// discovery, but leave it in place for normal requests. This
   169  	// is largely the same thing that ConfigFlags.ToRESTMapper()
   170  	// does, hence the same thing that kubectl does. There are two
   171  	// differences we are introducing here: (1) is that if there
   172  	// is no cache dir supplied, we fallback to in-memory caching
   173  	// rather than not caching discovery requests at all. The
   174  	// second thing is that (2) unlike kubectl we do not cache
   175  	// non-discovery requests.
   176  	restconfig, err := config.ToRESTConfig()
   177  	if err != nil {
   178  		return nil, nil, err
   179  	}
   180  	restconfig.QPS = 1000000
   181  	restconfig.Burst = 1000000
   182  
   183  	var cachedDiscoveryClient discovery.CachedDiscoveryInterface
   184  	if config.CacheDir != nil {
   185  		cachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(restconfig, *config.CacheDir, "",
   186  			time.Duration(10*time.Minute))
   187  		if err != nil {
   188  			return nil, nil, err
   189  		}
   190  	} else {
   191  		discoveryClient, err := discovery.NewDiscoveryClientForConfig(restconfig)
   192  		if err != nil {
   193  			return nil, nil, err
   194  		}
   195  		cachedDiscoveryClient = memory.NewMemCacheClient(discoveryClient)
   196  	}
   197  
   198  	mapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)
   199  	expander := restmapper.NewShortcutExpander(mapper, cachedDiscoveryClient)
   200  
   201  	return expander, cachedDiscoveryClient, nil
   202  }
   203  
   204  // The InCluster function returns true if the process is running inside a kubernetes cluster, and
   205  // false if it is running outside the cluster. This is determined by heuristics, however it uses the
   206  // exact same heuristics as client-go does. This is copied from
   207  // (client-go/tools/clientcmd/client_config.go), as it is not publically invocable in its original
   208  // place. This should be re-copied if the original code changes.
   209  func InCluster() bool {
   210  	fi, err := os.Stat("/var/run/secrets/kubernetes.io/serviceaccount/token")
   211  	return os.Getenv("KUBERNETES_SERVICE_HOST") != "" &&
   212  		os.Getenv("KUBERNETES_SERVICE_PORT") != "" &&
   213  		err == nil && !fi.IsDir()
   214  }
   215  
   216  // Sets the max interval to wait before sending changes for snapshot updates. The interval must
   217  // be non-negative, otherwise it will return an error.
   218  func (c *Client) MaxAccumulatorInterval(interval time.Duration) error {
   219  	c.mutex.Lock()
   220  	defer c.mutex.Unlock()
   221  	if interval <= 0 {
   222  		return fmt.Errorf("interval must be positive")
   223  	}
   224  	c.maxAccumulatorInterval = interval
   225  	return nil
   226  }
   227  
   228  // DynamicInterface is an accessor method to the k8s dynamic client
   229  func (c *Client) DynamicInterface() dynamic.Interface {
   230  	return c.cli
   231  }
   232  
   233  func (c *Client) WaitFor(ctx context.Context, kindOrResource string) error {
   234  	for {
   235  		_, err := c.mappingFor(kindOrResource)
   236  		if err != nil {
   237  			_, ok := err.(*unknownResource)
   238  			if ok {
   239  				select {
   240  				case <-time.After(1 * time.Second):
   241  					if err := c.InvalidateCache(); err != nil {
   242  						return err
   243  					}
   244  					continue
   245  				case <-ctx.Done():
   246  					return nil
   247  				}
   248  			}
   249  		}
   250  		return nil
   251  	}
   252  }
   253  
   254  func (c *Client) InvalidateCache() error {
   255  	// TODO: it's possible that invalidate could be smarter now
   256  	// and use the methods on CachedDiscoveryInterface
   257  	mapper, disco, err := NewRESTMapper(c.config)
   258  	if err != nil {
   259  		return err
   260  	}
   261  	c.mapper = mapper
   262  	c.disco = disco
   263  	return nil
   264  }
   265  
   266  // The ServerVersion() method returns a struct with information about
   267  // the kubernetes api-server version.
   268  func (c *Client) ServerVersion() (*VersionInfo, error) {
   269  	return c.disco.ServerVersion()
   270  }
   271  
   272  // processAPIResourceLists takes a `[]*metav1.APIResourceList` as returned by any of several calls
   273  // to a DiscoveryInterface, and transforms it in to a straight-forward `[]metav1.APIResource`.
   274  //
   275  // If you weren't paying close-enough attention, you might have thought I said it takes a
   276  // `*metav1.APIResourceList` object, and now you're wondering why this needs to be anything more
   277  // than `return input.APIResources`.  Well:
   278  //
   279  //  1. The various DiscoveryInterface calls don't return a List, they actually return an array of
   280  //     Lists, where the Lists are grouped by the group/version of the resources.  So we need to
   281  //     flatten those out.
   282  //  2. I guess the reason they group them that way is to avoid repeating the group and version in
   283  //     each resource, because the List objects themselvs have .Group and .Version set, but the
   284  //     APIresource objects don't.  This lets them save 10s of bytes on an infrequently use API
   285  //     call!  Anyway, we'll need to fill those in on the returned objects because we're discarding
   286  //     the grouping.
   287  func processAPIResourceLists(listsByGV []*metav1.APIResourceList) []APIResource {
   288  	// Do some book-keeping to allow us to pre-allocate the entire list.
   289  	count := 0
   290  	for _, list := range listsByGV {
   291  		if list != nil {
   292  			count += len(list.APIResources)
   293  		}
   294  	}
   295  	if count == 0 {
   296  		return nil
   297  	}
   298  
   299  	// Build the processed list to return.
   300  	ret := make([]APIResource, 0, count)
   301  	for _, list := range listsByGV {
   302  		if list != nil {
   303  			gv, err := schema.ParseGroupVersion(list.GroupVersion)
   304  			if err != nil {
   305  				continue
   306  			}
   307  			for _, typeinfo := range list.APIResources {
   308  				// I'm not 100% sure that none of the DiscoveryInterface calls fill
   309  				// in .Group and .Version, so just in case one of the calls does
   310  				// fill them in, we'll only fill them in if they're not already set.
   311  				if typeinfo.Group == "" {
   312  					typeinfo.Group = gv.Group
   313  				}
   314  				if typeinfo.Version == "" {
   315  					typeinfo.Version = gv.Version
   316  				}
   317  				ret = append(ret, typeinfo)
   318  			}
   319  		}
   320  	}
   321  
   322  	return ret
   323  }
   324  
   325  // ServerPreferredResources returns the list of resource types that the server supports.
   326  //
   327  // If a resource type supports multiple versions, then *only* the preferred version is returned.
   328  // Use ServerResources to return a list that includes all versions.
   329  func (c *Client) ServerPreferredResources() ([]APIResource, error) {
   330  	// It's possible that an error prevented listing some apigroups, but not all; so process the
   331  	// output even if there is an error.
   332  	listsByGV, err := c.disco.ServerPreferredResources()
   333  	return processAPIResourceLists(listsByGV), err
   334  }
   335  
   336  // ServerResources returns the list of resource types that the server supports.
   337  //
   338  // If a resource type supports multiple versions, then a list entry for *each* version is returned.
   339  // Use ServerPreferredResources to return a list that includes just the preferred version.
   340  func (c *Client) ServerResources() ([]APIResource, error) {
   341  	// It's possible that an error prevented listing some apigroups, but not all; so process the
   342  	// output even if there is an error.
   343  	_, listsByGV, err := c.disco.ServerGroupsAndResources()
   344  	return processAPIResourceLists(listsByGV), err
   345  }
   346  
   347  // ==
   348  
   349  // TODO: Query is interpreted a bit differently for List and
   350  // Watch. Should either reconcile this or perhaps split Query into two
   351  // separate types.
   352  
   353  // A Query holds all the information needed to List or Watch a set of
   354  // kubernetes resources.
   355  type Query struct {
   356  	// The Name field holds the name of the Query. This is used by
   357  	// Watch to determine how multiple queries are unmarshaled by
   358  	// Accumulator.Update(). This is ignored for List.
   359  	Name string
   360  	// The Kind field indicates what sort of resource is being queried.
   361  	Kind string
   362  	// The Namespace field holds the namespace to Query.
   363  	Namespace string
   364  	// The FieldSelector field holds a string in selector syntax
   365  	// that is used to filter results based on field values. The
   366  	// only field values supported are metadata.name and
   367  	// metadata.namespace. This is only supported for List.
   368  	FieldSelector string
   369  	// The LabelSelector field holds a string in selector syntax
   370  	// that is used to filter results based on label values.
   371  	LabelSelector string
   372  }
   373  
   374  func (c *Client) Watch(ctx context.Context, queries ...Query) (*Accumulator, error) {
   375  	return newAccumulator(ctx, c, queries...)
   376  }
   377  
   378  // ==
   379  
   380  func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdate, cli dynamic.ResourceInterface) {
   381  	var informer cache.SharedInformer
   382  
   383  	// we override Watch to let us signal when our initial List is
   384  	// complete so we can send an update() even when there are no
   385  	// resource instances of the kind being watched
   386  	lw := newListWatcher(ctx, cli, query, func(lw *lw) {
   387  		if lw.hasSynced() {
   388  			target <- rawUpdate{query.Name, true, nil, nil, time.Now()}
   389  		}
   390  	})
   391  	informer = cache.NewSharedInformer(lw, &Unstructured{}, 5*time.Minute)
   392  	// TODO: uncomment this when we get to kubernetes 1.19. Right now errors will get logged by
   393  	// klog. With this error handler in place we will log them to our own logger and provide a
   394  	// more useful error message:
   395  	/*
   396  		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
   397  			// This is from client-go/tools/cache/reflector.go:563
   398  			isExpiredError := func(err error) bool {
   399  				// In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
   400  				// apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
   401  				// and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
   402  				// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
   403  				return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
   404  			}
   405  
   406  			switch {
   407  			case isExpiredError(err):
   408  				log.Printf("Watch of %s closed with: %v", query.Kind, err)
   409  			case err == io.EOF:
   410  				// watch closed normally
   411  			case err == io.ErrUnexpectedEOF:
   412  				log.Printf("Watch for %s closed with unexpected EOF: %v", query.Kind, err)
   413  			default:
   414  				log.Printf("Failed to watch %s: %v", query.Kind, err)
   415  			}
   416  		})
   417  	*/
   418  	informer.AddEventHandler(
   419  		cache.ResourceEventHandlerFuncs{
   420  			AddFunc: func(obj interface{}) {
   421  				// This is for testing. It allows us to deliberately increase the probability of
   422  				// race conditions by e.g. introducing sleeps. At some point I'm sure we will want a
   423  				// nicer prettier set of hooks, but for now all we need is this hack for
   424  				// better/faster tests.
   425  				c.watchAdded(nil, obj.(*Unstructured))
   426  				lw.countAddEvent()
   427  				target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured), time.Now()}
   428  			},
   429  			UpdateFunc: func(oldObj, newObj interface{}) {
   430  				old := oldObj.(*Unstructured)
   431  				new := newObj.(*Unstructured)
   432  
   433  				// This is for testing. It allows us to deliberately increase the probability of
   434  				// race conditions by e.g. introducing sleeps. At some point I'm sure we will want a
   435  				// nicer prettier set of hooks, but for now all we need is this hack for
   436  				// better/faster tests.
   437  				c.watchUpdated(old, new)
   438  				target <- rawUpdate{query.Name, lw.hasSynced(), old, new, time.Now()}
   439  			},
   440  			DeleteFunc: func(obj interface{}) {
   441  				var old *Unstructured
   442  				switch o := obj.(type) {
   443  				case cache.DeletedFinalStateUnknown:
   444  					old = o.Obj.(*Unstructured)
   445  				case *Unstructured:
   446  					old = o
   447  				}
   448  
   449  				// This is for testing. It allows us to deliberately increase the probability of
   450  				// race conditions by e.g. introducing sleeps. At some point I'm sure we will want a
   451  				// nicer prettier set of hooks, but for now all we need is this hack for
   452  				// better/faster tests.
   453  				c.watchDeleted(old, nil)
   454  
   455  				key := unKey(old)
   456  				// For the Add and Update cases, we clean out c.canonical in
   457  				// patchWatch.
   458  				c.mutex.Lock()
   459  				delete(c.canonical, key)
   460  				c.mutex.Unlock()
   461  				target <- rawUpdate{query.Name, lw.hasSynced(), old, nil, time.Now()}
   462  			},
   463  		},
   464  	)
   465  
   466  	go informer.Run(ctx.Done())
   467  }
   468  
   469  type rawUpdate struct {
   470  	name   string
   471  	synced bool
   472  	old    *unstructured.Unstructured
   473  	new    *unstructured.Unstructured
   474  	ts     time.Time
   475  }
   476  
   477  type lw struct {
   478  	// All these fields are read-only and initialized on construction.
   479  	ctx    context.Context
   480  	client dynamic.ResourceInterface
   481  	query  Query
   482  	synced func(*lw)
   483  	once   sync.Once
   484  
   485  	// The mutex protects all the read-write fields.
   486  	mutex            sync.Mutex
   487  	initialListDone  bool
   488  	initialListCount int
   489  	addEventCount    int
   490  	listForbidden    bool
   491  }
   492  
   493  func newListWatcher(ctx context.Context, client dynamic.ResourceInterface, query Query, synced func(*lw)) *lw {
   494  	return &lw{ctx: ctx, client: client, query: query, synced: synced}
   495  }
   496  
   497  func (lw *lw) withMutex(f func()) {
   498  	lw.mutex.Lock()
   499  	defer lw.mutex.Unlock()
   500  	f()
   501  }
   502  
   503  func (lw *lw) countAddEvent() {
   504  	lw.withMutex(func() {
   505  		lw.addEventCount++
   506  	})
   507  }
   508  
   509  // This computes whether we have synced a given watch. We used to use SharedInformer.HasSynced() for
   510  // this, but that seems to be a blatant lie that always return true. My best guess as to why it lies
   511  // is that it is actually reporting the synced state of an internal queue, but because the
   512  // SharedInformer mechanism adds another layer of dispatch on top of that internal queue, the
   513  // syncedness of that internal queue is irrelevant to whether enough layered events have been
   514  // dispatched to consider things synced at the dispatch layer.
   515  //
   516  // So to track syncedness properly for our users, when we do our first List() we remember how many
   517  // resources there are and we do not consider ourselves synced until we have dispatched at least as
   518  // many Add events as there are resources.
   519  func (lw *lw) hasSynced() (result bool) {
   520  	lw.withMutex(func() {
   521  		result = lw.initialListDone && lw.addEventCount >= lw.initialListCount
   522  	})
   523  	return
   524  }
   525  
   526  // List is used by a SharedInformer to get a baseline list of resources
   527  // that can then be maintained by a watch.
   528  func (lw *lw) List(opts ListOptions) (runtime.Object, error) {
   529  	// Our SharedInformer will call us every so often. Every time through,
   530  	// we'll decide whether we can be synchronized, and whether the list was
   531  	// forbidden.
   532  	synced := false
   533  	forbidden := false
   534  
   535  	opts.FieldSelector = lw.query.FieldSelector
   536  	opts.LabelSelector = lw.query.LabelSelector
   537  	result, err := lw.client.List(lw.ctx, opts)
   538  
   539  	if err == nil {
   540  		// No error, the list worked out fine. We can be synced now...
   541  		synced = true
   542  		// ...and the list was not forbidden.
   543  		forbidden = false
   544  	} else if apierrors.IsForbidden(err) {
   545  		// Forbidden. We'll still consider ourselves synchronized, but
   546  		// remember the forbidden error!
   547  		// dlog.Debugf(lw.ctx, "couldn't list %s (forbidden)", lw.query.Kind)
   548  		synced = true
   549  		forbidden = true
   550  
   551  		// Impedance matching for the SharedInformer interface: pretend
   552  		// that we got an empty list and no error.
   553  		result = &unstructured.UnstructuredList{}
   554  		err = nil
   555  	} else {
   556  		// Any other error we'll consider transient, and try again later.
   557  		// We're neither synced nor forbidden
   558  		dlog.Infof(lw.ctx, "couldn't list %s (will retry): %s", lw.query.Kind, err)
   559  	}
   560  
   561  	lw.withMutex(func() {
   562  		if synced {
   563  			if !lw.initialListDone {
   564  				lw.initialListDone = true
   565  				lw.initialListCount = len(result.Items)
   566  			}
   567  		}
   568  
   569  		lw.listForbidden = forbidden
   570  	})
   571  
   572  	return result, err
   573  }
   574  
   575  func (lw *lw) Watch(opts ListOptions) (watch.Interface, error) {
   576  	lw.once.Do(func() { lw.synced(lw) })
   577  	opts.FieldSelector = lw.query.FieldSelector
   578  	opts.LabelSelector = lw.query.LabelSelector
   579  
   580  	iface, err := lw.client.Watch(lw.ctx, opts)
   581  
   582  	if err != nil {
   583  		// If the list was forbidden, this error will likely just be "unknown", since we
   584  		// returned an unstructured.UnstructuredList to fake out the lister, so in that
   585  		// case just synthesize a slightly nicer error.
   586  		if lw.listForbidden {
   587  			err = errors.New(fmt.Sprintf("can't watch %s: forbidden", lw.query.Kind))
   588  		} else {
   589  			// Not forbidden. Go ahead and make sure the Kind we're querying for is in
   590  			// there, though.
   591  			err = errors.Wrap(err, fmt.Sprintf("can't watch %s", lw.query.Kind))
   592  		}
   593  	}
   594  
   595  	return iface, err
   596  }
   597  
   598  // ==
   599  
   600  func (c *Client) cliFor(mapping *meta.RESTMapping, namespace string) dynamic.ResourceInterface {
   601  	cli := c.cli.Resource(mapping.Resource)
   602  	if mapping.Scope.Name() == meta.RESTScopeNameNamespace && namespace != NamespaceAll {
   603  		return cli.Namespace(namespace)
   604  	} else {
   605  		return cli
   606  	}
   607  }
   608  
   609  func (c *Client) cliForResource(resource *Unstructured) (dynamic.ResourceInterface, error) {
   610  	mapping, err := c.mappingFor(resource.GroupVersionKind().GroupKind().String())
   611  	if err != nil {
   612  		return nil, err
   613  	}
   614  
   615  	// this will canonicalize the kind and version so any
   616  	// shortcuts are expanded
   617  	resource.SetGroupVersionKind(mapping.GroupVersionKind)
   618  
   619  	ns := resource.GetNamespace()
   620  	if ns == "" {
   621  		ns = "default"
   622  	}
   623  	return c.cliFor(mapping, ns), nil
   624  }
   625  
   626  func (c *Client) newField(q Query) (*field, error) {
   627  	mapping, err := c.mappingFor(q.Kind)
   628  	if err != nil {
   629  		return nil, err
   630  	}
   631  	sel, err := ParseSelector(q.LabelSelector)
   632  	if err != nil {
   633  		return nil, err
   634  	}
   635  
   636  	return &field{
   637  		query:    q,
   638  		mapping:  mapping,
   639  		selector: sel,
   640  		values:   make(map[string]*Unstructured),
   641  		deltas:   make(map[string]*Delta),
   642  	}, nil
   643  }
   644  
   645  // mappingFor returns the RESTMapping for the Kind given, or the Kind referenced by the resource.
   646  // Prefers a fully specified GroupVersionResource match. If one is not found, we match on a fully
   647  // specified GroupVersionKind, or fallback to a match on GroupKind.
   648  //
   649  // This is copy/pasted from k8s.io/cli-runtime/pkg/resource.Builder.mappingFor() (which is
   650  // unfortunately private), with modified lines marked with "// MODIFIED".
   651  func (c *Client) mappingFor(resourceOrKind string) (*meta.RESTMapping, error) { // MODIFIED: args
   652  	fullySpecifiedGVR, groupResource := schema.ParseResourceArg(resourceOrKind)
   653  	gvk := schema.GroupVersionKind{}
   654  	// MODIFIED: Don't call b.restMapperFn(), use c.mapper instead.
   655  
   656  	if fullySpecifiedGVR != nil {
   657  		gvk, _ = c.mapper.KindFor(*fullySpecifiedGVR)
   658  	}
   659  	if gvk.Empty() {
   660  		gvk, _ = c.mapper.KindFor(groupResource.WithVersion(""))
   661  	}
   662  	if !gvk.Empty() {
   663  		return c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
   664  	}
   665  
   666  	fullySpecifiedGVK, groupKind := schema.ParseKindArg(resourceOrKind)
   667  	if fullySpecifiedGVK == nil {
   668  		gvk := groupKind.WithVersion("")
   669  		fullySpecifiedGVK = &gvk
   670  	}
   671  
   672  	if !fullySpecifiedGVK.Empty() {
   673  		if mapping, err := c.mapper.RESTMapping(fullySpecifiedGVK.GroupKind(), fullySpecifiedGVK.Version); err == nil {
   674  			return mapping, nil
   675  		}
   676  	}
   677  
   678  	mapping, err := c.mapper.RESTMapping(groupKind, gvk.Version)
   679  	if err != nil {
   680  		// if we error out here, it is because we could not match a resource or a kind
   681  		// for the given argument. To maintain consistency with previous behavior,
   682  		// announce that a resource type could not be found.
   683  		// if the error is _not_ a *meta.NoKindMatchError, then we had trouble doing discovery,
   684  		// so we should return the original error since it may help a user diagnose what is actually wrong
   685  		if meta.IsNoMatchError(err) {
   686  			return nil, &unknownResource{resourceOrKind}
   687  		}
   688  		return nil, err
   689  	}
   690  
   691  	return mapping, nil
   692  }
   693  
   694  type unknownResource struct {
   695  	arg string
   696  }
   697  
   698  func (e *unknownResource) Error() string {
   699  	return fmt.Sprintf("the server doesn't have a resource type %q", e.arg)
   700  }
   701  
   702  // ==
   703  
   704  func (c *Client) List(ctx context.Context, query Query, target interface{}) error {
   705  	mapping, err := c.mappingFor(query.Kind)
   706  	if err != nil {
   707  		return err
   708  	}
   709  
   710  	items := make([]*Unstructured, 0)
   711  	if err := func() error {
   712  		c.mutex.Lock()
   713  		defer c.mutex.Unlock()
   714  		cli := c.cliFor(mapping, query.Namespace)
   715  		res, err := cli.List(ctx, ListOptions{
   716  			FieldSelector: query.FieldSelector,
   717  			LabelSelector: query.LabelSelector,
   718  		})
   719  		if err != nil {
   720  			return err
   721  		}
   722  
   723  		for _, un := range res.Items {
   724  			copy := un.DeepCopy()
   725  			key := unKey(copy)
   726  			// TODO: Deal with garbage collection in the case
   727  			// where there is no watch.
   728  			c.canonical[key] = copy
   729  			items = append(items, copy)
   730  		}
   731  		return nil
   732  	}(); err != nil {
   733  		return err
   734  	}
   735  
   736  	return convert(items, target)
   737  }
   738  
   739  // ==
   740  
   741  func (c *Client) Get(ctx context.Context, resource interface{}, target interface{}) error {
   742  	var un Unstructured
   743  	err := convert(resource, &un)
   744  	if err != nil {
   745  		return err
   746  	}
   747  
   748  	var res *Unstructured
   749  	if err := func() error {
   750  		c.mutex.Lock()
   751  		defer c.mutex.Unlock()
   752  		cli, err := c.cliForResource(&un)
   753  		if err != nil {
   754  			return err
   755  		}
   756  		res, err = cli.Get(ctx, un.GetName(), GetOptions{})
   757  		if err != nil {
   758  			return err
   759  		}
   760  		key := unKey(res)
   761  		// TODO: Deal with garbage collection in the case
   762  		// where there is no watch.
   763  		c.canonical[key] = res
   764  		return nil
   765  	}(); err != nil {
   766  		return err
   767  	}
   768  
   769  	return convert(res, target)
   770  }
   771  
   772  // ==
   773  
   774  func (c *Client) Create(ctx context.Context, resource interface{}, target interface{}) error {
   775  	var un Unstructured
   776  	err := convert(resource, &un)
   777  	if err != nil {
   778  		return err
   779  	}
   780  
   781  	var res *Unstructured
   782  	if err := func() error {
   783  		c.mutex.Lock()
   784  		defer c.mutex.Unlock()
   785  		cli, err := c.cliForResource(&un)
   786  		if err != nil {
   787  			return err
   788  		}
   789  		res, err = cli.Create(ctx, &un, CreateOptions{})
   790  		if err != nil {
   791  			return err
   792  		}
   793  		key := unKey(res)
   794  		c.canonical[key] = res
   795  		return nil
   796  	}(); err != nil {
   797  		return err
   798  	}
   799  
   800  	return convert(res, target)
   801  }
   802  
   803  // ==
   804  
   805  func (c *Client) Update(ctx context.Context, resource interface{}, target interface{}) error {
   806  	var un Unstructured
   807  	err := convert(resource, &un)
   808  	if err != nil {
   809  		return err
   810  	}
   811  
   812  	prev := un.GetResourceVersion()
   813  
   814  	var res *Unstructured
   815  	if err := func() error {
   816  		c.mutex.Lock()
   817  		defer c.mutex.Unlock()
   818  		cli, err := c.cliForResource(&un)
   819  		if err != nil {
   820  			return err
   821  		}
   822  		res, err = cli.Update(ctx, &un, UpdateOptions{})
   823  		if err != nil {
   824  			return err
   825  		}
   826  		if res.GetResourceVersion() != prev {
   827  			key := unKey(res)
   828  			c.canonical[key] = res
   829  		}
   830  		return nil
   831  	}(); err != nil {
   832  		return err
   833  	}
   834  
   835  	return convert(res, target)
   836  }
   837  
   838  // ==
   839  
   840  func (c *Client) Patch(ctx context.Context, resource interface{}, pt PatchType, data []byte, target interface{}) error {
   841  	var un Unstructured
   842  	err := convert(resource, &un)
   843  	if err != nil {
   844  		return err
   845  	}
   846  
   847  	prev := un.GetResourceVersion()
   848  
   849  	var res *Unstructured
   850  	if err := func() error {
   851  		c.mutex.Lock()
   852  		defer c.mutex.Unlock()
   853  		cli, err := c.cliForResource(&un)
   854  		if err != nil {
   855  			return err
   856  		}
   857  		res, err = cli.Patch(ctx, un.GetName(), pt, data, PatchOptions{})
   858  		if err != nil {
   859  			return err
   860  		}
   861  		if res.GetResourceVersion() != prev {
   862  			key := unKey(res)
   863  			c.canonical[key] = res
   864  		}
   865  		return nil
   866  	}(); err != nil {
   867  		return err
   868  	}
   869  
   870  	return convert(res, target)
   871  }
   872  
   873  // ==
   874  
   875  func (c *Client) Upsert(ctx context.Context, resource interface{}, source interface{}, target interface{}) error {
   876  	if resource == nil || reflect.ValueOf(resource).IsNil() {
   877  		resource = source
   878  	}
   879  
   880  	var un Unstructured
   881  	err := convert(resource, &un)
   882  	if err != nil {
   883  		return err
   884  	}
   885  
   886  	var unsrc Unstructured
   887  	err = convert(source, &unsrc)
   888  	if err != nil {
   889  		return err
   890  	}
   891  	MergeUpdate(&un, &unsrc)
   892  
   893  	prev := un.GetResourceVersion()
   894  
   895  	var res *Unstructured
   896  	if err := func() error {
   897  		c.mutex.Lock()
   898  		defer c.mutex.Unlock()
   899  		cli, err := c.cliForResource(&un)
   900  		if err != nil {
   901  			return err
   902  		}
   903  		create := false
   904  		rsrc := &un
   905  		if prev == "" {
   906  			stored, err := cli.Get(ctx, un.GetName(), GetOptions{})
   907  			if err != nil {
   908  				if IsNotFound(err) {
   909  					create = true
   910  					rsrc = &un
   911  				} else {
   912  					return err
   913  				}
   914  			} else {
   915  				rsrc = stored
   916  				MergeUpdate(rsrc, &unsrc)
   917  			}
   918  		}
   919  		if create {
   920  			res, err = cli.Create(ctx, rsrc, CreateOptions{})
   921  		} else {
   922  			// XXX: need to clean up the conflict case and add a test for it
   923  		update:
   924  			res, err = cli.Update(ctx, rsrc, UpdateOptions{})
   925  			if err != nil && IsConflict(err) {
   926  				stored, err := cli.Get(ctx, un.GetName(), GetOptions{})
   927  				if err != nil {
   928  					return err
   929  				}
   930  				rsrc = stored
   931  				MergeUpdate(rsrc, &unsrc)
   932  				goto update
   933  			}
   934  		}
   935  		if err != nil {
   936  			return err
   937  		}
   938  		if res.GetResourceVersion() != prev {
   939  			key := unKey(res)
   940  			c.canonical[key] = res
   941  		}
   942  		return nil
   943  	}(); err != nil {
   944  		return err
   945  	}
   946  
   947  	return convert(res, target)
   948  }
   949  
   950  // ==
   951  
   952  func (c *Client) UpdateStatus(ctx context.Context, resource interface{}, target interface{}) error {
   953  	var un Unstructured
   954  	err := convert(resource, &un)
   955  	if err != nil {
   956  		return err
   957  	}
   958  
   959  	prev := un.GetResourceVersion()
   960  
   961  	var res *Unstructured
   962  	if err := func() error {
   963  		c.mutex.Lock()
   964  		defer c.mutex.Unlock()
   965  		cli, err := c.cliForResource(&un)
   966  		if err != nil {
   967  			return err
   968  		}
   969  		res, err = cli.UpdateStatus(ctx, &un, UpdateOptions{})
   970  		if err != nil {
   971  			return err
   972  		}
   973  		if res.GetResourceVersion() != prev {
   974  			key := unKey(res)
   975  			c.canonical[key] = res
   976  		}
   977  		return nil
   978  	}(); err != nil {
   979  		return err
   980  	}
   981  
   982  	return convert(res, target)
   983  }
   984  
   985  // ==
   986  
   987  func (c *Client) Delete(ctx context.Context, resource interface{}, target interface{}) error {
   988  	var un Unstructured
   989  	err := convert(resource, &un)
   990  	if err != nil {
   991  		return err
   992  	}
   993  
   994  	if err := func() error {
   995  		c.mutex.Lock()
   996  		defer c.mutex.Unlock()
   997  		cli, err := c.cliForResource(&un)
   998  		if err != nil {
   999  			return err
  1000  		}
  1001  		err = cli.Delete(ctx, un.GetName(), DeleteOptions{})
  1002  		if err != nil {
  1003  			return err
  1004  		}
  1005  		key := unKey(&un)
  1006  		c.canonical[key] = nil
  1007  		return nil
  1008  	}(); err != nil {
  1009  		return err
  1010  	}
  1011  
  1012  	return convert(nil, target)
  1013  }
  1014  
  1015  // ==
  1016  
  1017  // Update the result of a watch with newer items from our local cache. This guarantees we never give
  1018  // back stale objects that are known to be modified by us.
  1019  func (c *Client) patchWatch(ctx context.Context, field *field) error {
  1020  	c.mutex.Lock()
  1021  	defer c.mutex.Unlock()
  1022  
  1023  	// The canonical map holds all local changes made by this client which have not been reflected
  1024  	// back to it through a watch. This should normally be quite small since objects only occupy
  1025  	// this map for the duration of a round trip to the API server. (XXX: We don't yet handle the
  1026  	// case of modifying objects that are not watched. Those objects will get stuck in this map, but
  1027  	// that is ok for our current set of use cases.)
  1028  
  1029  	// Loop over the canonical map and patch the watch result.
  1030  	for key, can := range c.canonical {
  1031  		item, ok := field.values[key]
  1032  		if ok {
  1033  			// An object is both in our canonical map and in the watch.
  1034  			if can == nil {
  1035  				// The object is deleted, but has not yet been reported so by the apiserver, so we
  1036  				// remove it.
  1037  				dlog.Println(ctx, "Patching delete", field.mapping.GroupVersionKind.Kind, key)
  1038  				delete(field.values, key)
  1039  				field.deltas[key] = newDelta(ObjectDelete, item)
  1040  			} else if newer, err := gteq(item.GetResourceVersion(), can.GetResourceVersion()); err != nil {
  1041  				return err
  1042  			} else if newer {
  1043  				// The object in the watch result is the same or newer than our canonical value, so
  1044  				// no need to track it anymore.
  1045  				dlog.Println(ctx, "Patching synced", field.mapping.GroupVersionKind.Kind, key)
  1046  				delete(c.canonical, key)
  1047  			} else {
  1048  				// The object in the watch result is stale, so we update it with the canonical
  1049  				// version and track it as a delta.
  1050  				dlog.Println(ctx, "Patching update", field.mapping.GroupVersionKind.Kind, key)
  1051  				field.values[key] = can
  1052  				field.deltas[key] = newDelta(ObjectUpdate, can)
  1053  			}
  1054  		} else if can != nil && can.GroupVersionKind() == field.mapping.GroupVersionKind &&
  1055  			field.selector.Matches(LabelSet(can.GetLabels())) {
  1056  			// An object that was created locally is not yet present in the watch result, so we add it.
  1057  			dlog.Println(ctx, "Patching add", field.mapping.GroupVersionKind.Kind, key)
  1058  			field.values[key] = can
  1059  			field.deltas[key] = newDelta(ObjectAdd, can)
  1060  		}
  1061  	}
  1062  	return nil
  1063  }
  1064  
  1065  // ==
  1066  
  1067  // The LogEvent struct is used to communicate log output from a pod. It includes PodID and Timestamp information so that
  1068  // LogEvents from different pods can be interleaved without losing information about total ordering and/or pod identity.
  1069  type LogEvent struct {
  1070  	// The PodID field indicates what pod the log output is associated with.
  1071  	PodID string `json:"podID"`
  1072  	// The Timestamp field indicates when the log output was created.
  1073  	Timestamp string `json:"timestamp"`
  1074  
  1075  	// The Output field contains log output from the pod.
  1076  	Output string `json:"output,omitempty"`
  1077  
  1078  	// The Closed field is true if the supply of log events from the given pod was terminated. This does not
  1079  	// necessarily mean there is no more log data.
  1080  	Closed bool
  1081  	// The Error field contains error information if the log events were terminated due to an error.
  1082  	Error error `json:"error,omitempty"`
  1083  }
  1084  
  1085  func parseLogLine(line string) (timestamp string, output string) {
  1086  	if parts := strings.SplitN(line, " ", 2); len(parts) == 2 {
  1087  		if _, err := time.Parse(time.RFC3339Nano, parts[0]); err == nil {
  1088  			timestamp = parts[0]
  1089  			output = parts[1]
  1090  			return
  1091  		}
  1092  	}
  1093  	output = line
  1094  	return
  1095  }
  1096  
  1097  // The PodLogs method accesses the log output of a given pod by sending LogEvent structs down the supplied channel. The
  1098  // LogEvent struct is designed to hold enough information that it is feasible to invoke PodLogs multiple times with a
  1099  // single channel in order to multiplex log output from many pods, e.g.:
  1100  //
  1101  //	events := make(chan LogEvent)
  1102  //	client.PodLogs(ctx, pod1, options, events)
  1103  //	client.PodLogs(ctx, pod2, options, events)
  1104  //	client.PodLogs(ctx, pod3, options, events)
  1105  //
  1106  //	for event := range events {
  1107  //	    fmt.Printf("%s: %s: %s", event.PodId, event.Timestamp, event.Output)
  1108  //	}
  1109  //
  1110  // The above code will print log output from all 3 pods.
  1111  func (c *Client) PodLogs(ctx context.Context, pod *Pod, options *PodLogOptions, events chan<- LogEvent) error {
  1112  	// always use timestamps
  1113  	options.Timestamps = true
  1114  	timeout := 10 * time.Second
  1115  	allContainers := true
  1116  
  1117  	requests, err := polymorphichelpers.LogsForObjectFn(c.config, pod, options, timeout,
  1118  		allContainers)
  1119  	if err != nil {
  1120  		return err
  1121  	}
  1122  
  1123  	podID := string(pod.GetUID())
  1124  	for _, request := range requests {
  1125  		go func(request rest.ResponseWrapper) {
  1126  			readCloser, err := request.Stream(ctx)
  1127  			if err != nil {
  1128  				events <- LogEvent{PodID: podID, Error: err, Closed: true}
  1129  				return
  1130  			}
  1131  			defer readCloser.Close()
  1132  
  1133  			r := bufio.NewReader(readCloser)
  1134  			for {
  1135  				bytes, err := r.ReadBytes('\n')
  1136  				if len(bytes) > 0 {
  1137  					timestamp, output := parseLogLine(string(bytes))
  1138  					events <- LogEvent{
  1139  						PodID:     podID,
  1140  						Timestamp: timestamp,
  1141  						Output:    output,
  1142  					}
  1143  				}
  1144  				if err != nil {
  1145  					if err != io.EOF {
  1146  						events <- LogEvent{
  1147  							PodID:  podID,
  1148  							Error:  err,
  1149  							Closed: true,
  1150  						}
  1151  					} else {
  1152  						events <- LogEvent{PodID: podID, Closed: true}
  1153  					}
  1154  					return
  1155  				}
  1156  			}
  1157  		}(request)
  1158  	}
  1159  
  1160  	return nil
  1161  }
  1162  
  1163  // Technically this is sketchy since resource versions are opaque, however this exact same approach
  1164  // is also taken deep in the bowels of client-go and from what I understand of the k3s folk's
  1165  // efforts replacing etcd (the source of these resource versions) with a different store, the
  1166  // kubernetes team was very adamant about the approach to pluggable stores being to create an etcd
  1167  // shim rather than to go more abstract. I believe this makes it relatively safe to depend on in
  1168  // practice.
  1169  func gteq(v1, v2 string) (bool, error) {
  1170  	i1, err := strconv.ParseInt(v1, 10, 64)
  1171  	if err != nil {
  1172  		return false, err
  1173  	}
  1174  	i2, err := strconv.ParseInt(v2, 10, 64)
  1175  	if err != nil {
  1176  		return false, err
  1177  	}
  1178  	return i1 >= i2, nil
  1179  }
  1180  
  1181  func convert(in interface{}, out interface{}) error {
  1182  	return kates_internal.Convert(in, out)
  1183  }
  1184  
  1185  func unKey(u *Unstructured) string {
  1186  	return string(u.GetUID())
  1187  }
  1188  
  1189  func (options ClientConfig) toConfigFlags() *ConfigFlags {
  1190  	result := NewConfigFlags(false)
  1191  
  1192  	if options.Kubeconfig != "" {
  1193  		result.KubeConfig = &options.Kubeconfig
  1194  	}
  1195  	if options.Context != "" {
  1196  		result.Context = &options.Context
  1197  	}
  1198  	if options.Namespace != "" {
  1199  		result.Namespace = &options.Namespace
  1200  	}
  1201  
  1202  	return result
  1203  }
  1204  

View as plain text