...

Source file src/github.com/linkerd/linkerd2/controller/k8s/metadata_api.go

Documentation: github.com/linkerd/linkerd2/controller/k8s

     1  package k8s
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  
     8  	"github.com/linkerd/linkerd2/pkg/k8s"
     9  	"github.com/prometheus/client_golang/prometheus"
    10  	log "github.com/sirupsen/logrus"
    11  	appsv1 "k8s.io/api/apps/v1"
    12  	batchv1 "k8s.io/api/batch/v1"
    13  	corev1 "k8s.io/api/core/v1"
    14  	"k8s.io/apimachinery/pkg/api/meta"
    15  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    16  	"k8s.io/apimachinery/pkg/labels"
    17  	"k8s.io/apimachinery/pkg/runtime"
    18  	"k8s.io/client-go/informers"
    19  	"k8s.io/client-go/metadata"
    20  	"k8s.io/client-go/metadata/metadatainformer"
    21  	"k8s.io/client-go/rest"
    22  	"k8s.io/client-go/tools/cache"
    23  )
    24  
    25  // MetadataAPI provides shared metadata informers for some Kubernetes resources
    26  type MetadataAPI struct {
    27  	promGauges
    28  
    29  	client          metadata.Interface
    30  	inf             map[APIResource]informers.GenericInformer
    31  	syncChecks      []cache.InformerSynced
    32  	sharedInformers metadatainformer.SharedInformerFactory
    33  }
    34  
    35  // InitializeMetadataAPI returns an instance of MetadataAPI with metadata
    36  // informers for the provided resources
    37  func InitializeMetadataAPI(kubeConfig string, cluster string, resources ...APIResource) (*MetadataAPI, error) {
    38  	config, err := k8s.GetConfig(kubeConfig, "")
    39  	if err != nil {
    40  		return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err)
    41  	}
    42  	return InitializeMetadataAPIForConfig(config, cluster, resources...)
    43  }
    44  
    45  func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, cluster string, resources ...APIResource) (*MetadataAPI, error) {
    46  	client, err := metadata.NewForConfig(kubeConfig)
    47  	if err != nil {
    48  		return nil, err
    49  	}
    50  
    51  	api, err := newClusterScopedMetadataAPI(client, cluster, resources...)
    52  	if err != nil {
    53  		return nil, err
    54  	}
    55  
    56  	for _, gauge := range api.gauges {
    57  		if err := prometheus.Register(gauge); err != nil {
    58  			log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err)
    59  		}
    60  	}
    61  	return api, nil
    62  
    63  }
    64  
    65  func newClusterScopedMetadataAPI(
    66  	metadataClient metadata.Interface,
    67  	cluster string,
    68  	resources ...APIResource,
    69  ) (*MetadataAPI, error) {
    70  	sharedInformers := metadatainformer.NewFilteredSharedInformerFactory(
    71  		metadataClient,
    72  		ResyncTime,
    73  		metav1.NamespaceAll,
    74  		nil,
    75  	)
    76  
    77  	api := &MetadataAPI{
    78  		client:          metadataClient,
    79  		inf:             make(map[APIResource]informers.GenericInformer),
    80  		syncChecks:      make([]cache.InformerSynced, 0),
    81  		sharedInformers: sharedInformers,
    82  	}
    83  
    84  	informerLabels := prometheus.Labels{
    85  		"cluster": cluster,
    86  	}
    87  
    88  	for _, resource := range resources {
    89  		if err := api.addInformer(resource, informerLabels); err != nil {
    90  			return nil, err
    91  		}
    92  	}
    93  	return api, nil
    94  }
    95  
    96  // Sync waits for all informers to be synced.
    97  func (api *MetadataAPI) Sync(stopCh <-chan struct{}) {
    98  	api.sharedInformers.Start(stopCh)
    99  
   100  	waitForCacheSync(api.syncChecks)
   101  }
   102  
   103  // UnregisterGauges unregisters all the prometheus cache gauges associated to this API
   104  func (api *MetadataAPI) UnregisterGauges() {
   105  	api.promGauges.unregister()
   106  }
   107  
   108  func (api *MetadataAPI) getLister(res APIResource) (cache.GenericLister, error) {
   109  	inf, ok := api.inf[res]
   110  	if !ok {
   111  		return nil, fmt.Errorf("metadata informer (%v) not configured", res)
   112  	}
   113  
   114  	return inf.Lister(), nil
   115  }
   116  
   117  // Get returns the metadata for the supplied object type and name. This uses a
   118  // shared informer and the results may be out of date if the informer is
   119  // lagging behind.
   120  func (api *MetadataAPI) Get(res APIResource, name string) (*metav1.PartialObjectMetadata, error) {
   121  	ls, err := api.getLister(res)
   122  	if err != nil {
   123  		return nil, err
   124  	}
   125  
   126  	obj, err := ls.Get(name)
   127  	if err != nil {
   128  		return nil, err
   129  	}
   130  
   131  	// ls' concrete type is metadatalister.metadataListerShim, whose
   132  	// Get method always returns *metav1.PartialObjectMetadata
   133  	nsMeta, ok := obj.(*metav1.PartialObjectMetadata)
   134  	if !ok {
   135  		return nil, fmt.Errorf("couldn't convert obj %v to PartialObjectMetadata", obj)
   136  	}
   137  
   138  	return nsMeta, nil
   139  }
   140  
   141  func (api *MetadataAPI) getByNamespace(res APIResource, ns, name string) (*metav1.PartialObjectMetadata, error) {
   142  	ls, err := api.getLister(res)
   143  	if err != nil {
   144  		return nil, err
   145  	}
   146  
   147  	obj, err := ls.ByNamespace(ns).Get(name)
   148  	if err != nil {
   149  		return nil, err
   150  	}
   151  
   152  	nsMeta, ok := obj.(*metav1.PartialObjectMetadata)
   153  	if !ok {
   154  		return nil, fmt.Errorf("couldn't convert obj %v to PartialObjectMetadata", obj)
   155  	}
   156  
   157  	return nsMeta, nil
   158  }
   159  
   160  // GetByNamespaceFiltered returns a list of Kubernetes object references, given
   161  // a type, namespace, name and label selector. This uses a shared informer and
   162  // the results may be out of date if the informer is lagging behind.
   163  func (api *MetadataAPI) GetByNamespaceFiltered(
   164  	restype APIResource,
   165  	ns string,
   166  	name string,
   167  	labelSelector labels.Selector,
   168  ) ([]*metav1.PartialObjectMetadata, error) {
   169  	ls, err := api.getLister(restype)
   170  	if err != nil {
   171  		return nil, err
   172  	}
   173  
   174  	var objs []runtime.Object
   175  	if ns == "" {
   176  		objs, err = ls.List(labelSelector)
   177  	} else if name == "" {
   178  		objs, err = ls.ByNamespace(ns).List(labelSelector)
   179  	} else {
   180  		var obj runtime.Object
   181  		obj, err = ls.ByNamespace(ns).Get(name)
   182  		objs = []runtime.Object{obj}
   183  	}
   184  
   185  	if err != nil {
   186  		return nil, err
   187  	}
   188  
   189  	objMetas := []*metav1.PartialObjectMetadata{}
   190  	for _, obj := range objs {
   191  		// ls' concrete type is metadatalister.metadataListerShim, which
   192  		// guarantees this cast won't fail
   193  		objMeta, ok := obj.(*metav1.PartialObjectMetadata)
   194  		if !ok {
   195  			return nil, fmt.Errorf("couldn't convert obj %v to PartialObjectMetadata", obj)
   196  		}
   197  		gvk, err := restype.GVK()
   198  		if err != nil {
   199  			return nil, err
   200  		}
   201  
   202  		// objMeta's TypeMeta fields aren't getting populated, so we do it
   203  		// manually here
   204  		objMeta.SetGroupVersionKind(gvk)
   205  		objMetas = append(objMetas, objMeta)
   206  	}
   207  
   208  	return objMetas, nil
   209  }
   210  
   211  // GetOwnerKindAndName returns the pod owner's kind and name, using owner
   212  // references from the Kubernetes API. The kind is represented as the
   213  // Kubernetes singular resource type (e.g. deployment, daemonset, job, etc.).
   214  // If retry is true, when the shared informer cache doesn't return anything we
   215  // try again with a direct Kubernetes API call.
   216  func (api *MetadataAPI) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod, retry bool) (string, string, error) {
   217  	ownerRefs := pod.GetOwnerReferences()
   218  	if len(ownerRefs) == 0 {
   219  		// pod without a parent
   220  		return "pod", pod.Name, nil
   221  	} else if len(ownerRefs) > 1 {
   222  		log.Debugf("unexpected owner reference count (%d): %+v", len(ownerRefs), ownerRefs)
   223  		return "pod", pod.Name, nil
   224  	}
   225  
   226  	parent := ownerRefs[0]
   227  	var parentObj metav1.Object
   228  	var err error
   229  	switch parent.Kind {
   230  	case "Job":
   231  		parentObj, err = api.getByNamespace(Job, pod.Namespace, parent.Name)
   232  		if err != nil {
   233  			log.Warnf("failed to retrieve job from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
   234  			if retry {
   235  				parentObj, err = api.client.
   236  					Resource(batchv1.SchemeGroupVersion.WithResource("jobs")).
   237  					Namespace(pod.Namespace).
   238  					Get(ctx, parent.Name, metav1.GetOptions{})
   239  				if err != nil {
   240  					log.Warnf("failed to retrieve job from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
   241  				}
   242  			}
   243  		}
   244  	case "ReplicaSet":
   245  		var rsObj *metav1.PartialObjectMetadata
   246  		rsObj, err = api.getByNamespace(RS, pod.Namespace, parent.Name)
   247  		if err != nil {
   248  			log.Warnf("failed to retrieve replicaset from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
   249  			if retry {
   250  				rsObj, err = api.client.
   251  					Resource(appsv1.SchemeGroupVersion.WithResource("replicasets")).
   252  					Namespace(pod.Namespace).
   253  					Get(ctx, parent.Name, metav1.GetOptions{})
   254  				if err != nil {
   255  					log.Warnf("failed to retrieve replicaset from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
   256  				}
   257  			}
   258  		}
   259  
   260  		if rsObj == nil || !isValidRSParent(rsObj) {
   261  			return strings.ToLower(parent.Kind), parent.Name, nil
   262  		}
   263  		parentObj = rsObj
   264  	default:
   265  		return strings.ToLower(parent.Kind), parent.Name, nil
   266  	}
   267  
   268  	if err == nil && len(parentObj.GetOwnerReferences()) == 1 {
   269  		grandParent := parentObj.GetOwnerReferences()[0]
   270  		return strings.ToLower(grandParent.Kind), grandParent.Name, nil
   271  	}
   272  	return strings.ToLower(parent.Kind), parent.Name, nil
   273  }
   274  
   275  func (api *MetadataAPI) addInformer(res APIResource, informerLabels prometheus.Labels) error {
   276  	gvk, err := res.GVK()
   277  	if err != nil {
   278  		return err
   279  	}
   280  	gvr, _ := meta.UnsafeGuessKindToResource(gvk)
   281  	inf := api.sharedInformers.ForResource(gvr)
   282  	api.syncChecks = append(api.syncChecks, inf.Informer().HasSynced)
   283  	api.promGauges.addInformerSize(strings.ToLower(gvk.Kind), informerLabels, inf.Informer())
   284  	api.inf[res] = inf
   285  
   286  	return nil
   287  }
   288  

View as plain text