...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/watcher/cluster_store.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/watcher

     1  package watcher
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"sync"
     8  
     9  	"github.com/linkerd/linkerd2/controller/k8s"
    10  	pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
    11  	"github.com/prometheus/client_golang/prometheus"
    12  	"github.com/prometheus/client_golang/prometheus/promauto"
    13  	logging "github.com/sirupsen/logrus"
    14  	v1 "k8s.io/api/core/v1"
    15  	"k8s.io/client-go/kubernetes"
    16  	"k8s.io/client-go/tools/cache"
    17  	"k8s.io/client-go/tools/clientcmd"
    18  )
    19  
    20  type (
    21  	// ClusterStore indexes clusters in which remote service discovery may be
    22  	// performed. For each store item, an EndpointsWatcher is created to read
    23  	// state directly from the respective cluster's API Server. In addition,
    24  	// each store item has some associated and immutable configuration that is
    25  	// required for service discovery.
    26  	ClusterStore struct {
    27  		// Protects against illegal accesses
    28  		sync.RWMutex
    29  
    30  		api                  *k8s.API
    31  		store                map[string]remoteCluster
    32  		enableEndpointSlices bool
    33  		log                  *logging.Entry
    34  
    35  		// Function used to parse a kubeconfig from a byte buffer. Based on the
    36  		// kubeconfig, it creates API Server clients
    37  		decodeFn configDecoder
    38  
    39  		size_gauge prometheus.GaugeFunc
    40  	}
    41  
    42  	// remoteCluster is a helper struct that represents a store item
    43  	remoteCluster struct {
    44  		watcher *EndpointsWatcher
    45  		config  clusterConfig
    46  
    47  		// Used to signal shutdown to the associated watcher's informers
    48  		stopCh chan<- struct{}
    49  	}
    50  
    51  	// clusterConfig holds immutable configuration for a given cluster
    52  	clusterConfig struct {
    53  		TrustDomain   string
    54  		ClusterDomain string
    55  	}
    56  
    57  	// configDecoder is the type of a function that given a byte buffer, returns
    58  	// a pair of API Server clients. The cache uses this function to dynamically
    59  	// create clients after discovering a Secret.
    60  	configDecoder = func(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error)
    61  )
    62  
    63  const (
    64  	clusterNameLabel        = "multicluster.linkerd.io/cluster-name"
    65  	trustDomainAnnotation   = "multicluster.linkerd.io/trust-domain"
    66  	clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain"
    67  )
    68  
    69  // NewClusterStore creates a new (empty) ClusterStore. It
    70  // requires a Kubernetes API Server client instantiated for the local cluster.
    71  //
    72  // When created, a pair of event handlers are registered for the local cluster's
    73  // Secret informer. The event handlers are responsible for driving the discovery
    74  // of remote clusters and their configuration
    75  func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool) (*ClusterStore, error) {
    76  	return NewClusterStoreWithDecoder(client, namespace, enableEndpointSlices, decodeK8sConfigFromSecret)
    77  }
    78  
    79  func (cs *ClusterStore) Sync(stopCh <-chan struct{}) {
    80  	cs.api.Sync(stopCh)
    81  }
    82  
    83  func (cs *ClusterStore) UnregisterGauges() {
    84  	prometheus.Unregister(cs.size_gauge)
    85  }
    86  
    87  // newClusterStoreWithDecoder is a helper function that allows the creation of a
    88  // store with an arbitrary `configDecoder` function.
    89  func NewClusterStoreWithDecoder(client kubernetes.Interface, namespace string, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error) {
    90  	api := k8s.NewNamespacedAPI(client, nil, nil, namespace, "local", k8s.Secret)
    91  
    92  	cs := &ClusterStore{
    93  		store: make(map[string]remoteCluster),
    94  		log: logging.WithFields(logging.Fields{
    95  			"component": "cluster-store",
    96  		}),
    97  		enableEndpointSlices: enableEndpointSlices,
    98  		api:                  api,
    99  		decodeFn:             decodeFn,
   100  	}
   101  
   102  	cs.size_gauge = promauto.NewGaugeFunc(prometheus.GaugeOpts{
   103  		Name: "cluster_store_size",
   104  		Help: "The number of linked clusters in the remote discovery cluster store",
   105  	}, func() float64 { return (float64)(len(cs.store)) })
   106  
   107  	_, err := cs.api.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   108  		AddFunc: func(obj interface{}) {
   109  			secret, ok := obj.(*v1.Secret)
   110  			if !ok {
   111  				cs.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret)
   112  				return
   113  			}
   114  
   115  			if secret.Type != pkgK8s.MirrorSecretType {
   116  				cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: invalid type %s", secret.Namespace, secret.Name, secret.Type)
   117  				return
   118  			}
   119  
   120  			clusterName, found := secret.GetLabels()[clusterNameLabel]
   121  			if !found {
   122  				cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel)
   123  				return
   124  			}
   125  
   126  			if err := cs.addCluster(clusterName, secret); err != nil {
   127  				cs.log.Errorf("Error adding cluster %s to store: %v", clusterName, err)
   128  			}
   129  		},
   130  		DeleteFunc: func(obj interface{}) {
   131  			secret, ok := obj.(*v1.Secret)
   132  			if !ok {
   133  				// If the Secret was deleted when the watch was disconnected
   134  				// (for whatever reason) and the event was missed, the object is
   135  				// added with 'DeletedFinalStateUnknown'. Its state may be
   136  				// stale, so it should be cleaned-up.
   137  				tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   138  				if !ok {
   139  					cs.log.Debugf("Unable to get object from DeletedFinalStateUnknown %#v", obj)
   140  					return
   141  				}
   142  				// If the zombie object is a `Secret` we can delete it.
   143  				secret, ok = tombstone.Obj.(*v1.Secret)
   144  				if !ok {
   145  					cs.log.Debugf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj)
   146  					return
   147  				}
   148  			}
   149  
   150  			clusterName, found := secret.GetLabels()[clusterNameLabel]
   151  			if !found {
   152  				cs.log.Tracef("Skipping Delete event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel)
   153  				return
   154  			}
   155  
   156  			cs.removeCluster(clusterName)
   157  
   158  		},
   159  	})
   160  
   161  	if err != nil {
   162  		return nil, err
   163  	}
   164  
   165  	return cs, nil
   166  }
   167  
   168  // Get safely retrieves a store item given a cluster name.
   169  func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfig, bool) {
   170  	cs.RLock()
   171  	defer cs.RUnlock()
   172  	cw, found := cs.store[clusterName]
   173  	return cw.watcher, cw.config, found
   174  }
   175  
   176  // removeCluster is triggered by the cache's Secret informer when a secret is
   177  // removed. Given a cluster name, it removes the entry from the cache after
   178  // stopping the associated watcher.
   179  func (cs *ClusterStore) removeCluster(clusterName string) {
   180  	cs.Lock()
   181  	defer cs.Unlock()
   182  	r, found := cs.store[clusterName]
   183  	if !found {
   184  		return
   185  	}
   186  	r.watcher.removeHandlers()
   187  	r.watcher.k8sAPI.UnregisterGauges()
   188  	r.watcher.metadataAPI.UnregisterGauges()
   189  	close(r.stopCh)
   190  	delete(cs.store, clusterName)
   191  	cs.log.Infof("Removed cluster %s from ClusterStore", clusterName)
   192  }
   193  
   194  // addCluster is triggered by the cache's Secret informer when a secret is
   195  // discovered for the first time. Given a cluster name and a Secret
   196  // object, it creates an EndpointsWatcher for a remote cluster and syncs its
   197  // informers before returning.
   198  func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error {
   199  	data, found := secret.Data[pkgK8s.ConfigKeyName]
   200  	if !found {
   201  		return errors.New("missing kubeconfig file")
   202  	}
   203  
   204  	clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation]
   205  	if !found {
   206  		return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation)
   207  	}
   208  
   209  	trustDomain, found := secret.GetAnnotations()[trustDomainAnnotation]
   210  	if !found {
   211  		return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation)
   212  	}
   213  
   214  	remoteAPI, metadataAPI, err := cs.decodeFn(data, clusterName, cs.enableEndpointSlices)
   215  	if err != nil {
   216  		return err
   217  	}
   218  
   219  	stopCh := make(chan struct{}, 1)
   220  	watcher, err := NewEndpointsWatcher(
   221  		remoteAPI,
   222  		metadataAPI,
   223  		logging.WithFields(logging.Fields{
   224  			"remote-cluster": clusterName,
   225  		}),
   226  		cs.enableEndpointSlices,
   227  		clusterName,
   228  	)
   229  	if err != nil {
   230  		return err
   231  	}
   232  
   233  	cs.Lock()
   234  	defer cs.Unlock()
   235  	cs.store[clusterName] = remoteCluster{
   236  		watcher,
   237  		clusterConfig{
   238  			trustDomain,
   239  			clusterDomain,
   240  		},
   241  		stopCh,
   242  	}
   243  
   244  	go remoteAPI.Sync(stopCh)
   245  	go metadataAPI.Sync(stopCh)
   246  
   247  	cs.log.Infof("Added cluster %s to ClusterStore", clusterName)
   248  
   249  	return nil
   250  }
   251  
   252  // decodeK8sConfigFromSecret implements the decoder function type. Given a byte
   253  // buffer, it attempts to parse it as a kubeconfig file. If successful, returns
   254  // a pair of API Server clients.
   255  func decodeK8sConfigFromSecret(data []byte, cluster string, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) {
   256  	cfg, err := clientcmd.RESTConfigFromKubeConfig(data)
   257  	if err != nil {
   258  		return nil, nil, err
   259  	}
   260  
   261  	ctx := context.Background()
   262  	var remoteAPI *k8s.API
   263  	if enableEndpointSlices {
   264  		remoteAPI, err = k8s.InitializeAPIForConfig(
   265  			ctx,
   266  			cfg,
   267  			true,
   268  			cluster,
   269  			k8s.ES, k8s.Pod, k8s.Svc, k8s.Srv,
   270  		)
   271  	} else {
   272  		remoteAPI, err = k8s.InitializeAPIForConfig(
   273  			ctx,
   274  			cfg,
   275  			true,
   276  			cluster,
   277  			k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.Srv,
   278  		)
   279  	}
   280  	if err != nil {
   281  		return nil, nil, err
   282  	}
   283  
   284  	metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, cluster, k8s.RS)
   285  	if err != nil {
   286  		return nil, nil, err
   287  	}
   288  
   289  	return remoteAPI, metadataAPI, nil
   290  }
   291  

View as plain text