...

Source file src/github.com/linkerd/linkerd2/controller/k8s/api.go

Documentation: github.com/linkerd/linkerd2/controller/k8s

     1  package k8s
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  
     8  	"k8s.io/client-go/dynamic"
     9  	"k8s.io/client-go/rest"
    10  
    11  	spv1alpha2 "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
    12  	l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
    13  	l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions"
    14  	ewinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/externalworkload/v1beta1"
    15  	srvinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/server/v1beta2"
    16  	spinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile/v1alpha2"
    17  	"github.com/linkerd/linkerd2/pkg/k8s"
    18  	"github.com/prometheus/client_golang/prometheus"
    19  	log "github.com/sirupsen/logrus"
    20  	"google.golang.org/grpc/codes"
    21  	"google.golang.org/grpc/status"
    22  	appsv1 "k8s.io/api/apps/v1"
    23  	batchv1 "k8s.io/api/batch/v1"
    24  	corev1 "k8s.io/api/core/v1"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/labels"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	"k8s.io/apimachinery/pkg/types"
    30  	"k8s.io/client-go/informers"
    31  	arinformers "k8s.io/client-go/informers/admissionregistration/v1"
    32  	appv1informers "k8s.io/client-go/informers/apps/v1"
    33  	batchv1informers "k8s.io/client-go/informers/batch/v1"
    34  	coreinformers "k8s.io/client-go/informers/core/v1"
    35  	discoveryinformers "k8s.io/client-go/informers/discovery/v1"
    36  	"k8s.io/client-go/kubernetes"
    37  	"k8s.io/client-go/tools/cache"
    38  )
    39  
    40  // API provides shared informers for all Kubernetes objects
    41  type API struct {
    42  	promGauges
    43  
    44  	Client        kubernetes.Interface
    45  	DynamicClient dynamic.Interface
    46  
    47  	cj       batchv1informers.CronJobInformer
    48  	cm       coreinformers.ConfigMapInformer
    49  	deploy   appv1informers.DeploymentInformer
    50  	ds       appv1informers.DaemonSetInformer
    51  	endpoint coreinformers.EndpointsInformer
    52  	es       discoveryinformers.EndpointSliceInformer
    53  	ew       ewinformers.ExternalWorkloadInformer
    54  	job      batchv1informers.JobInformer
    55  	mwc      arinformers.MutatingWebhookConfigurationInformer
    56  	ns       coreinformers.NamespaceInformer
    57  	pod      coreinformers.PodInformer
    58  	rc       coreinformers.ReplicationControllerInformer
    59  	rs       appv1informers.ReplicaSetInformer
    60  	sp       spinformers.ServiceProfileInformer
    61  	ss       appv1informers.StatefulSetInformer
    62  	svc      coreinformers.ServiceInformer
    63  	node     coreinformers.NodeInformer
    64  	secret   coreinformers.SecretInformer
    65  	srv      srvinformers.ServerInformer
    66  
    67  	syncChecks            []cache.InformerSynced
    68  	sharedInformers       informers.SharedInformerFactory
    69  	l5dCrdSharedInformers l5dcrdinformer.SharedInformerFactory
    70  }
    71  
    72  // InitializeAPI creates Kubernetes clients and returns an initialized API
    73  // wrapper. This creates informers on each one of resources passed, registering
    74  // metrics on each one; don't forget to call UnregisterGauges() on the returned
    75  // API reference to clean them up!
    76  func InitializeAPI(ctx context.Context, kubeConfig string, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
    77  	config, err := k8s.GetConfig(kubeConfig, "")
    78  	if err != nil {
    79  		return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err)
    80  	}
    81  
    82  	dynamicClient, err := dynamic.NewForConfig(config)
    83  	if err != nil {
    84  		return nil, err
    85  	}
    86  
    87  	k8sClient, err := k8s.NewAPIForConfig(config, "", []string{}, 0, 0, 0)
    88  	if err != nil {
    89  		return nil, err
    90  	}
    91  
    92  	return initAPI(ctx, k8sClient, dynamicClient, config, ensureClusterWideAccess, cluster, resources...)
    93  }
    94  
    95  // InitializeAPIForConfig creates Kubernetes clients and returns an initialized
    96  // API wrapper. This creates informers on each one of resources passed,
    97  // registering metrics on each one; don't forget to call UnregisterGauges() on
    98  // the returned API reference to clean them up!
    99  func InitializeAPIForConfig(ctx context.Context, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
   100  	k8sClient, err := k8s.NewAPIForConfig(kubeConfig, "", []string{}, 0, 0, 0)
   101  	if err != nil {
   102  		return nil, err
   103  	}
   104  
   105  	return initAPI(ctx, k8sClient, nil, kubeConfig, ensureClusterWideAccess, cluster, resources...)
   106  }
   107  
   108  func initAPI(ctx context.Context, k8sClient *k8s.KubernetesAPI, dynamicClient dynamic.Interface, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
   109  	// check for cluster-wide access
   110  	var err error
   111  
   112  	if ensureClusterWideAccess {
   113  		err := k8s.ClusterAccess(ctx, k8sClient)
   114  		if err != nil {
   115  			return nil, err
   116  		}
   117  	}
   118  
   119  	// check for need and access to Linkerd CRD clients
   120  	var l5dCrdClient *l5dcrdclient.Clientset
   121  	for _, res := range resources {
   122  		switch {
   123  		case res == SP:
   124  			err := k8s.ServiceProfilesAccess(ctx, k8sClient)
   125  			if err != nil {
   126  				return nil, err
   127  			}
   128  		case res == Srv:
   129  			err := k8s.ServersAccess(ctx, k8sClient)
   130  			if err != nil {
   131  				return nil, err
   132  			}
   133  		case res == ExtWorkload:
   134  			err := k8s.ExtWorkloadAccess(ctx, k8sClient)
   135  			if err != nil {
   136  				return nil, err
   137  			}
   138  		default:
   139  			continue
   140  		}
   141  		l5dCrdClient, err = NewL5DCRDClient(kubeConfig)
   142  		if err != nil {
   143  			return nil, err
   144  		}
   145  		break
   146  	}
   147  
   148  	api := NewClusterScopedAPI(k8sClient, dynamicClient, l5dCrdClient, cluster, resources...)
   149  	for _, gauge := range api.gauges {
   150  		if err := prometheus.Register(gauge); err != nil {
   151  			log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err)
   152  		}
   153  	}
   154  	return api, nil
   155  }
   156  
   157  // NewClusterScopedAPI takes a Kubernetes client and returns an initialized
   158  // cluster-wide API. This creates informers on each one of resources passed,
   159  // registering metrics on each one; don't forget to call UnregisterGauges() on
   160  // the returned API reference to clean them up!
   161  func NewClusterScopedAPI(
   162  	k8sClient kubernetes.Interface,
   163  	dynamicClient dynamic.Interface,
   164  	l5dCrdClient l5dcrdclient.Interface,
   165  	cluster string,
   166  	resources ...APIResource,
   167  ) *API {
   168  	sharedInformers := informers.NewSharedInformerFactory(k8sClient, ResyncTime)
   169  	return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...)
   170  }
   171  
   172  // NewNamespacedAPI takes a Kubernetes client and returns an initialized API
   173  // scoped to namespace. This creates informers on each one of resources passed,
   174  // registering metrics on each one; don't forget to call UnregisterGauges() on
   175  // the returned API reference to clean them up!
   176  func NewNamespacedAPI(
   177  	k8sClient kubernetes.Interface,
   178  	dynamicClient dynamic.Interface,
   179  	l5dCrdClient l5dcrdclient.Interface,
   180  	namespace string,
   181  	cluster string,
   182  	resources ...APIResource,
   183  ) *API {
   184  	sharedInformers := informers.NewSharedInformerFactoryWithOptions(k8sClient, ResyncTime, informers.WithNamespace(namespace))
   185  	return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...)
   186  }
   187  
   188  // newAPI takes a Kubernetes client and returns an initialized API.
   189  func newAPI(
   190  	k8sClient kubernetes.Interface,
   191  	dynamicClient dynamic.Interface,
   192  	l5dCrdClient l5dcrdclient.Interface,
   193  	sharedInformers informers.SharedInformerFactory,
   194  	cluster string,
   195  	resources ...APIResource,
   196  ) *API {
   197  	var l5dCrdSharedInformers l5dcrdinformer.SharedInformerFactory
   198  	if l5dCrdClient != nil {
   199  		l5dCrdSharedInformers = l5dcrdinformer.NewSharedInformerFactory(l5dCrdClient, ResyncTime)
   200  	}
   201  
   202  	api := &API{
   203  		Client:                k8sClient,
   204  		DynamicClient:         dynamicClient,
   205  		syncChecks:            make([]cache.InformerSynced, 0),
   206  		sharedInformers:       sharedInformers,
   207  		l5dCrdSharedInformers: l5dCrdSharedInformers,
   208  	}
   209  
   210  	informerLabels := prometheus.Labels{
   211  		"cluster": cluster,
   212  	}
   213  
   214  	for _, resource := range resources {
   215  		switch resource {
   216  		case CJ:
   217  			api.cj = sharedInformers.Batch().V1().CronJobs()
   218  			api.syncChecks = append(api.syncChecks, api.cj.Informer().HasSynced)
   219  			api.promGauges.addInformerSize(k8s.CronJob, informerLabels, api.cj.Informer())
   220  		case CM:
   221  			api.cm = sharedInformers.Core().V1().ConfigMaps()
   222  			api.syncChecks = append(api.syncChecks, api.cm.Informer().HasSynced)
   223  			api.promGauges.addInformerSize(k8s.ConfigMap, informerLabels, api.cm.Informer())
   224  		case Deploy:
   225  			api.deploy = sharedInformers.Apps().V1().Deployments()
   226  			api.syncChecks = append(api.syncChecks, api.deploy.Informer().HasSynced)
   227  			api.promGauges.addInformerSize(k8s.Deployment, informerLabels, api.deploy.Informer())
   228  		case DS:
   229  			api.ds = sharedInformers.Apps().V1().DaemonSets()
   230  			api.syncChecks = append(api.syncChecks, api.ds.Informer().HasSynced)
   231  			api.promGauges.addInformerSize(k8s.DaemonSet, informerLabels, api.ds.Informer())
   232  		case Endpoint:
   233  			api.endpoint = sharedInformers.Core().V1().Endpoints()
   234  			api.syncChecks = append(api.syncChecks, api.endpoint.Informer().HasSynced)
   235  			api.promGauges.addInformerSize(k8s.Endpoints, informerLabels, api.endpoint.Informer())
   236  		case ES:
   237  			api.es = sharedInformers.Discovery().V1().EndpointSlices()
   238  			api.syncChecks = append(api.syncChecks, api.es.Informer().HasSynced)
   239  			api.promGauges.addInformerSize(k8s.EndpointSlices, informerLabels, api.es.Informer())
   240  		case ExtWorkload:
   241  			if l5dCrdSharedInformers == nil {
   242  				panic("Linkerd CRD shared informer not configured")
   243  			}
   244  			api.ew = l5dCrdSharedInformers.Externalworkload().V1beta1().ExternalWorkloads()
   245  			api.syncChecks = append(api.syncChecks, api.ew.Informer().HasSynced)
   246  			api.promGauges.addInformerSize(k8s.ExtWorkload, informerLabels, api.ew.Informer())
   247  		case Job:
   248  			api.job = sharedInformers.Batch().V1().Jobs()
   249  			api.syncChecks = append(api.syncChecks, api.job.Informer().HasSynced)
   250  			api.promGauges.addInformerSize(k8s.Job, informerLabels, api.job.Informer())
   251  		case MWC:
   252  			api.mwc = sharedInformers.Admissionregistration().V1().MutatingWebhookConfigurations()
   253  			api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced)
   254  			api.promGauges.addInformerSize(k8s.MutatingWebhookConfig, informerLabels, api.mwc.Informer())
   255  		case NS:
   256  			api.ns = sharedInformers.Core().V1().Namespaces()
   257  			api.syncChecks = append(api.syncChecks, api.ns.Informer().HasSynced)
   258  			api.promGauges.addInformerSize(k8s.Namespace, informerLabels, api.ns.Informer())
   259  		case Pod:
   260  			api.pod = sharedInformers.Core().V1().Pods()
   261  			api.syncChecks = append(api.syncChecks, api.pod.Informer().HasSynced)
   262  			api.promGauges.addInformerSize(k8s.Pod, informerLabels, api.pod.Informer())
   263  		case RC:
   264  			api.rc = sharedInformers.Core().V1().ReplicationControllers()
   265  			api.syncChecks = append(api.syncChecks, api.rc.Informer().HasSynced)
   266  			api.promGauges.addInformerSize(k8s.ReplicationController, informerLabels, api.rc.Informer())
   267  		case RS:
   268  			api.rs = sharedInformers.Apps().V1().ReplicaSets()
   269  			api.syncChecks = append(api.syncChecks, api.rs.Informer().HasSynced)
   270  			api.promGauges.addInformerSize(k8s.ReplicaSet, informerLabels, api.rs.Informer())
   271  		case SP:
   272  			if l5dCrdSharedInformers == nil {
   273  				panic("Linkerd CRD shared informer not configured")
   274  			}
   275  			api.sp = l5dCrdSharedInformers.Linkerd().V1alpha2().ServiceProfiles()
   276  			api.syncChecks = append(api.syncChecks, api.sp.Informer().HasSynced)
   277  			api.promGauges.addInformerSize(k8s.ServiceProfile, informerLabels, api.sp.Informer())
   278  		case Srv:
   279  			if l5dCrdSharedInformers == nil {
   280  				panic("Linkerd CRD shared informer not configured")
   281  			}
   282  			api.srv = l5dCrdSharedInformers.Server().V1beta2().Servers()
   283  			api.syncChecks = append(api.syncChecks, api.srv.Informer().HasSynced)
   284  			api.promGauges.addInformerSize(k8s.Server, informerLabels, api.srv.Informer())
   285  		case SS:
   286  			api.ss = sharedInformers.Apps().V1().StatefulSets()
   287  			api.syncChecks = append(api.syncChecks, api.ss.Informer().HasSynced)
   288  			api.promGauges.addInformerSize(k8s.StatefulSet, informerLabels, api.ss.Informer())
   289  		case Svc:
   290  			api.svc = sharedInformers.Core().V1().Services()
   291  			api.syncChecks = append(api.syncChecks, api.svc.Informer().HasSynced)
   292  			api.promGauges.addInformerSize(k8s.Service, informerLabels, api.svc.Informer())
   293  		case Node:
   294  			api.node = sharedInformers.Core().V1().Nodes()
   295  			api.syncChecks = append(api.syncChecks, api.node.Informer().HasSynced)
   296  			api.promGauges.addInformerSize(k8s.Node, informerLabels, api.node.Informer())
   297  		case Secret:
   298  			api.secret = sharedInformers.Core().V1().Secrets()
   299  			api.syncChecks = append(api.syncChecks, api.secret.Informer().HasSynced)
   300  			api.promGauges.addInformerSize(k8s.Secret, informerLabels, api.secret.Informer())
   301  		}
   302  	}
   303  	return api
   304  }
   305  
   306  // Sync waits for all informers to be synced.
   307  func (api *API) Sync(stopCh <-chan struct{}) {
   308  	api.sharedInformers.Start(stopCh)
   309  
   310  	if api.l5dCrdSharedInformers != nil {
   311  		api.l5dCrdSharedInformers.Start(stopCh)
   312  	}
   313  
   314  	waitForCacheSync(api.syncChecks)
   315  }
   316  
   317  // UnregisterGauges unregisters all the prometheus cache gauges associated to this API
   318  func (api *API) UnregisterGauges() {
   319  	api.promGauges.unregister()
   320  }
   321  
   322  // NS provides access to a shared informer and lister for Namespaces.
   323  func (api *API) NS() coreinformers.NamespaceInformer {
   324  	if api.ns == nil {
   325  		panic("NS informer not configured")
   326  	}
   327  	return api.ns
   328  }
   329  
   330  // Deploy provides access to a shared informer and lister for Deployments.
   331  func (api *API) Deploy() appv1informers.DeploymentInformer {
   332  	if api.deploy == nil {
   333  		panic("Deploy informer not configured")
   334  	}
   335  	return api.deploy
   336  }
   337  
   338  // DS provides access to a shared informer and lister for Daemonsets.
   339  func (api *API) DS() appv1informers.DaemonSetInformer {
   340  	if api.ds == nil {
   341  		panic("DS informer not configured")
   342  	}
   343  	return api.ds
   344  }
   345  
   346  // SS provides access to a shared informer and lister for Statefulsets.
   347  func (api *API) SS() appv1informers.StatefulSetInformer {
   348  	if api.ss == nil {
   349  		panic("SS informer not configured")
   350  	}
   351  	return api.ss
   352  }
   353  
   354  // RS provides access to a shared informer and lister for ReplicaSets.
   355  func (api *API) RS() appv1informers.ReplicaSetInformer {
   356  	if api.rs == nil {
   357  		panic("RS informer not configured")
   358  	}
   359  	return api.rs
   360  }
   361  
   362  // Pod provides access to a shared informer and lister for Pods.
   363  func (api *API) Pod() coreinformers.PodInformer {
   364  	if api.pod == nil {
   365  		panic("Pod informer not configured")
   366  	}
   367  	return api.pod
   368  }
   369  
   370  // RC provides access to a shared informer and lister for
   371  // ReplicationControllers.
   372  func (api *API) RC() coreinformers.ReplicationControllerInformer {
   373  	if api.rc == nil {
   374  		panic("RC informer not configured")
   375  	}
   376  	return api.rc
   377  }
   378  
   379  // Svc provides access to a shared informer and lister for Services.
   380  func (api *API) Svc() coreinformers.ServiceInformer {
   381  	if api.svc == nil {
   382  		panic("Svc informer not configured")
   383  	}
   384  	return api.svc
   385  }
   386  
   387  // Endpoint provides access to a shared informer and lister for Endpoints.
   388  func (api *API) Endpoint() coreinformers.EndpointsInformer {
   389  	if api.endpoint == nil {
   390  		panic("Endpoint informer not configured")
   391  	}
   392  	return api.endpoint
   393  }
   394  
   395  // ES provides access to a shared informer and lister for EndpointSlices
   396  func (api *API) ES() discoveryinformers.EndpointSliceInformer {
   397  	if api.es == nil {
   398  		panic("EndpointSlices informer not configured")
   399  	}
   400  	return api.es
   401  }
   402  
   403  // ExtWorkload() provides access to a shared informer and lister for
   404  // ExternalWorkload CRDs
   405  func (api *API) ExtWorkload() ewinformers.ExternalWorkloadInformer {
   406  	if api.ew == nil {
   407  		panic("ExternalWorkload informer not configured")
   408  	}
   409  	return api.ew
   410  }
   411  
   412  // CM provides access to a shared informer and lister for ConfigMaps.
   413  func (api *API) CM() coreinformers.ConfigMapInformer {
   414  	if api.cm == nil {
   415  		panic("CM informer not configured")
   416  	}
   417  	return api.cm
   418  }
   419  
   420  // SP provides access to a shared informer and lister for ServiceProfiles.
   421  func (api *API) SP() spinformers.ServiceProfileInformer {
   422  	if api.sp == nil {
   423  		panic("SP informer not configured")
   424  	}
   425  	return api.sp
   426  }
   427  
   428  // Srv provides access to a shared informer and lister for Servers.
   429  func (api *API) Srv() srvinformers.ServerInformer {
   430  	if api.srv == nil {
   431  		panic("Srv informer not configured")
   432  	}
   433  	return api.srv
   434  }
   435  
   436  // MWC provides access to a shared informer and lister for MutatingWebhookConfigurations.
   437  func (api *API) MWC() arinformers.MutatingWebhookConfigurationInformer {
   438  	if api.mwc == nil {
   439  		panic("MWC informer not configured")
   440  	}
   441  	return api.mwc
   442  }
   443  
   444  // Job provides access to a shared informer and lister for Jobs.
   445  func (api *API) Job() batchv1informers.JobInformer {
   446  	if api.job == nil {
   447  		panic("Job informer not configured")
   448  	}
   449  	return api.job
   450  }
   451  
   452  // SPAvailable informs the caller whether this API is configured to retrieve
   453  // ServiceProfiles
   454  func (api *API) SPAvailable() bool {
   455  	return api.sp != nil
   456  }
   457  
   458  // Node provides access to a shared informer and lister for Nodes.
   459  func (api *API) Node() coreinformers.NodeInformer {
   460  	if api.node == nil {
   461  		panic("Node informer not configured")
   462  	}
   463  	return api.node
   464  }
   465  
   466  // Secret provides access to a shared informer and lister for Secrets.
   467  func (api *API) Secret() coreinformers.SecretInformer {
   468  	if api.secret == nil {
   469  		panic("Secret informer not configured")
   470  	}
   471  	return api.secret
   472  }
   473  
   474  // CJ provides access to a shared informer and lister for CronJobs.
   475  func (api *API) CJ() batchv1informers.CronJobInformer {
   476  	if api.cj == nil {
   477  		panic("CJ informer not configured")
   478  	}
   479  	return api.cj
   480  }
   481  
   482  // GetObjects returns a list of Kubernetes objects, given a namespace, type, name and label selector.
   483  // If namespace is an empty string, match objects in all namespaces.
   484  // If name is an empty string, match all objects of the given type.
   485  // If label selector is an empty string, match all labels.
   486  func (api *API) GetObjects(namespace, restype, name string, label labels.Selector) ([]runtime.Object, error) {
   487  	switch restype {
   488  	case k8s.Namespace:
   489  		return api.getNamespaces(name, label)
   490  	case k8s.CronJob:
   491  		return api.getCronjobs(namespace, name, label)
   492  	case k8s.DaemonSet:
   493  		return api.getDaemonsets(namespace, name, label)
   494  	case k8s.Deployment:
   495  		return api.getDeployments(namespace, name, label)
   496  	case k8s.Job:
   497  		return api.getJobs(namespace, name, label)
   498  	case k8s.Pod:
   499  		return api.getPods(namespace, name, label)
   500  	case k8s.ReplicationController:
   501  		return api.getRCs(namespace, name, label)
   502  	case k8s.ReplicaSet:
   503  		return api.getReplicasets(namespace, name, label)
   504  	case k8s.Service:
   505  		return api.getServices(namespace, name)
   506  	case k8s.StatefulSet:
   507  		return api.getStatefulsets(namespace, name, label)
   508  	default:
   509  		return nil, status.Errorf(codes.Unimplemented, "unimplemented resource type: %s", restype)
   510  	}
   511  }
   512  
   513  // KindSupported returns true if there is an informer configured for the
   514  // specified resource type.
   515  func (api *API) KindSupported(restype string) bool {
   516  	switch restype {
   517  	case k8s.Namespace:
   518  		return api.ns != nil
   519  	case k8s.CronJob:
   520  		return api.cj != nil
   521  	case k8s.DaemonSet:
   522  		return api.ds != nil
   523  	case k8s.Deployment:
   524  		return api.deploy != nil
   525  	case k8s.Job:
   526  		return api.job != nil
   527  	case k8s.Pod:
   528  		return api.pod != nil
   529  	case k8s.ReplicationController:
   530  		return api.rc != nil
   531  	case k8s.ReplicaSet:
   532  		return api.rs != nil
   533  	case k8s.Service:
   534  		return api.svc != nil
   535  	case k8s.StatefulSet:
   536  		return api.ss != nil
   537  	default:
   538  		return false
   539  	}
   540  }
   541  
   542  // GetOwnerKindAndName returns the pod owner's kind and name, using owner
   543  // references from the Kubernetes API. The kind is represented as the Kubernetes
   544  // singular resource type (e.g. deployment, daemonset, job, etc.).
   545  // If retry is true, when the shared informer cache doesn't return anything
   546  // we try again with a direct Kubernetes API call.
   547  func (api *API) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod, retry bool) (string, string) {
   548  	ownerRefs := pod.GetOwnerReferences()
   549  	if len(ownerRefs) == 0 {
   550  		// pod without a parent
   551  		return k8s.Pod, pod.Name
   552  	} else if len(ownerRefs) > 1 {
   553  		log.Debugf("unexpected owner reference count (%d): %+v", len(ownerRefs), ownerRefs)
   554  		return k8s.Pod, pod.Name
   555  	}
   556  
   557  	parent := ownerRefs[0]
   558  	var parentObj metav1.Object
   559  	var err error
   560  	switch parent.Kind {
   561  	case "Job":
   562  		parentObj, err = api.Job().Lister().Jobs(pod.Namespace).Get(parent.Name)
   563  		if err != nil {
   564  			log.Warnf("failed to retrieve job from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
   565  			if retry {
   566  				parentObj, err = api.Client.BatchV1().Jobs(pod.Namespace).Get(ctx, parent.Name, metav1.GetOptions{})
   567  				if err != nil {
   568  					log.Warnf("failed to retrieve job from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
   569  				}
   570  			}
   571  		}
   572  	case "ReplicaSet":
   573  		rsObj, err := api.RS().Lister().ReplicaSets(pod.Namespace).Get(parent.Name)
   574  		if err != nil {
   575  			log.Warnf("failed to retrieve replicaset from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
   576  			if retry {
   577  				rsObj, err = api.Client.AppsV1().ReplicaSets(pod.Namespace).Get(ctx, parent.Name, metav1.GetOptions{})
   578  				if err != nil {
   579  					log.Warnf("failed to retrieve replicaset from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
   580  				}
   581  			}
   582  		}
   583  
   584  		if rsObj == nil || !isValidRSParent(rsObj.GetObjectMeta()) {
   585  			return strings.ToLower(parent.Kind), parent.Name
   586  		}
   587  		parentObj = rsObj
   588  
   589  	default:
   590  		return strings.ToLower(parent.Kind), parent.Name
   591  	}
   592  
   593  	if err == nil && len(parentObj.GetOwnerReferences()) == 1 {
   594  		grandParent := parentObj.GetOwnerReferences()[0]
   595  		return strings.ToLower(grandParent.Kind), grandParent.Name
   596  	}
   597  	return strings.ToLower(parent.Kind), parent.Name
   598  }
   599  
   600  // GetPodsFor returns all running and pending Pods associated with a given
   601  // Kubernetes object. Use includeFailed to also get failed Pods
   602  func (api *API) GetPodsFor(obj runtime.Object, includeFailed bool) ([]*corev1.Pod, error) {
   603  	var namespace string
   604  	var selector labels.Selector
   605  	var ownerUID types.UID
   606  	var err error
   607  
   608  	pods := []*corev1.Pod{}
   609  	switch typed := obj.(type) {
   610  	case *corev1.Namespace:
   611  		namespace = typed.Name
   612  		selector = labels.Everything()
   613  
   614  	case *batchv1.CronJob:
   615  		namespace = typed.Namespace
   616  		selector = labels.Everything()
   617  		jobs, err := api.Job().Lister().Jobs(namespace).List(selector)
   618  		if err != nil {
   619  			return nil, err
   620  		}
   621  		for _, job := range jobs {
   622  			if isOwner(typed.UID, job.GetOwnerReferences()) {
   623  				jobPods, err := api.GetPodsFor(job, includeFailed)
   624  				if err != nil {
   625  					return nil, err
   626  				}
   627  				pods = append(pods, jobPods...)
   628  			}
   629  		}
   630  		return pods, nil
   631  
   632  	case *appsv1.DaemonSet:
   633  		namespace = typed.Namespace
   634  		selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
   635  		ownerUID = typed.UID
   636  
   637  	case *appsv1.Deployment:
   638  		namespace = typed.Namespace
   639  		selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
   640  		ret, err := api.RS().Lister().ReplicaSets(namespace).List(selector)
   641  		if err != nil {
   642  			return nil, err
   643  		}
   644  		for _, rs := range ret {
   645  			if isOwner(typed.UID, rs.GetOwnerReferences()) {
   646  				podsRS, err := api.GetPodsFor(rs, includeFailed)
   647  				if err != nil {
   648  					return nil, err
   649  				}
   650  				pods = append(pods, podsRS...)
   651  			}
   652  		}
   653  		return pods, nil
   654  
   655  	case *appsv1.ReplicaSet:
   656  		namespace = typed.Namespace
   657  		selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
   658  		ownerUID = typed.UID
   659  
   660  	case *batchv1.Job:
   661  		namespace = typed.Namespace
   662  		selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
   663  		ownerUID = typed.UID
   664  
   665  	case *corev1.ReplicationController:
   666  		namespace = typed.Namespace
   667  		selector = labels.Set(typed.Spec.Selector).AsSelector()
   668  		ownerUID = typed.UID
   669  
   670  	case *corev1.Service:
   671  		if typed.Spec.Type == corev1.ServiceTypeExternalName {
   672  			return []*corev1.Pod{}, nil
   673  		}
   674  		namespace = typed.Namespace
   675  		if typed.Spec.Selector == nil {
   676  			selector = labels.Nothing()
   677  		} else {
   678  			selector = labels.Set(typed.Spec.Selector).AsSelector()
   679  		}
   680  
   681  	case *appsv1.StatefulSet:
   682  		namespace = typed.Namespace
   683  		selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
   684  		ownerUID = typed.UID
   685  
   686  	case *corev1.Pod:
   687  		// Special case for pods:
   688  		// GetPodsFor a pod should just return the pod itself
   689  		namespace = typed.Namespace
   690  		pod, err := api.Pod().Lister().Pods(typed.Namespace).Get(typed.Name)
   691  		if err != nil {
   692  			return nil, err
   693  		}
   694  		pods = []*corev1.Pod{pod}
   695  
   696  	default:
   697  		return nil, fmt.Errorf("Cannot get object selector: %v", obj)
   698  	}
   699  
   700  	// if obj.(type) is Pod, we've already retrieved it and put it in pods
   701  	// for the other types, pods will still be empty
   702  	if len(pods) == 0 {
   703  		pods, err = api.Pod().Lister().Pods(namespace).List(selector)
   704  		if err != nil {
   705  			return nil, err
   706  		}
   707  	}
   708  
   709  	allPods := []*corev1.Pod{}
   710  	for _, pod := range pods {
   711  		if isPendingOrRunning(pod) || (includeFailed && isFailed(pod)) {
   712  			if ownerUID == "" || isOwner(ownerUID, pod.GetOwnerReferences()) {
   713  				allPods = append(allPods, pod)
   714  			}
   715  		}
   716  	}
   717  	return allPods, nil
   718  }
   719  
   720  func isOwner(u types.UID, ownerRefs []metav1.OwnerReference) bool {
   721  	for _, or := range ownerRefs {
   722  		if u == or.UID {
   723  			return true
   724  		}
   725  	}
   726  	return false
   727  }
   728  
   729  // GetNameAndNamespaceOf returns the name and namespace of the given object.
   730  func GetNameAndNamespaceOf(obj runtime.Object) (string, string, error) {
   731  	switch typed := obj.(type) {
   732  	case *corev1.Namespace:
   733  		return typed.Name, typed.Name, nil
   734  
   735  	case *batchv1.CronJob:
   736  		return typed.Name, typed.Namespace, nil
   737  
   738  	case *appsv1.DaemonSet:
   739  		return typed.Name, typed.Namespace, nil
   740  
   741  	case *appsv1.Deployment:
   742  		return typed.Name, typed.Namespace, nil
   743  
   744  	case *batchv1.Job:
   745  		return typed.Name, typed.Namespace, nil
   746  
   747  	case *appsv1.ReplicaSet:
   748  		return typed.Name, typed.Namespace, nil
   749  
   750  	case *corev1.ReplicationController:
   751  		return typed.Name, typed.Namespace, nil
   752  
   753  	case *corev1.Service:
   754  		return typed.Name, typed.Namespace, nil
   755  
   756  	case *appsv1.StatefulSet:
   757  		return typed.Name, typed.Namespace, nil
   758  
   759  	case *corev1.Pod:
   760  		return typed.Name, typed.Namespace, nil
   761  
   762  	default:
   763  		return "", "", fmt.Errorf("Cannot determine object type: %v", obj)
   764  	}
   765  }
   766  
   767  // GetNameOf returns the name of the given object.
   768  func GetNameOf(obj runtime.Object) (string, error) {
   769  	name, _, err := GetNameAndNamespaceOf(obj)
   770  	if err != nil {
   771  		return "", err
   772  	}
   773  	return name, nil
   774  }
   775  
   776  // GetNamespaceOf returns the namespace of the given object.
   777  func GetNamespaceOf(obj runtime.Object) (string, error) {
   778  	_, namespace, err := GetNameAndNamespaceOf(obj)
   779  	if err != nil {
   780  		return "", err
   781  	}
   782  	return namespace, nil
   783  }
   784  
   785  // getNamespaces returns the namespace matching the specified name. If no name
   786  // is given, it returns all namespaces.
   787  func (api *API) getNamespaces(name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   788  	var namespaces []*corev1.Namespace
   789  
   790  	if name == "" {
   791  		var err error
   792  		namespaces, err = api.NS().Lister().List(labelSelector)
   793  		if err != nil {
   794  			return nil, err
   795  		}
   796  	} else {
   797  		namespace, err := api.NS().Lister().Get(name)
   798  		if err != nil {
   799  			return nil, err
   800  		}
   801  		namespaces = []*corev1.Namespace{namespace}
   802  	}
   803  
   804  	objects := []runtime.Object{}
   805  	for _, ns := range namespaces {
   806  		objects = append(objects, ns)
   807  	}
   808  
   809  	return objects, nil
   810  }
   811  
   812  func (api *API) getDeployments(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   813  	var err error
   814  	var deploys []*appsv1.Deployment
   815  
   816  	if namespace == "" {
   817  		deploys, err = api.Deploy().Lister().List(labelSelector)
   818  	} else if name == "" {
   819  		deploys, err = api.Deploy().Lister().Deployments(namespace).List(labelSelector)
   820  	} else {
   821  		var deploy *appsv1.Deployment
   822  		deploy, err = api.Deploy().Lister().Deployments(namespace).Get(name)
   823  		deploys = []*appsv1.Deployment{deploy}
   824  	}
   825  
   826  	if err != nil {
   827  		return nil, err
   828  	}
   829  
   830  	objects := []runtime.Object{}
   831  	for _, deploy := range deploys {
   832  		objects = append(objects, deploy)
   833  	}
   834  
   835  	return objects, nil
   836  }
   837  
   838  func (api *API) getPods(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   839  	var err error
   840  	var pods []*corev1.Pod
   841  
   842  	if namespace == "" {
   843  		pods, err = api.Pod().Lister().List(labelSelector)
   844  	} else if name == "" {
   845  		pods, err = api.Pod().Lister().Pods(namespace).List(labelSelector)
   846  	} else {
   847  		var pod *corev1.Pod
   848  		pod, err = api.Pod().Lister().Pods(namespace).Get(name)
   849  		pods = []*corev1.Pod{pod}
   850  	}
   851  
   852  	if err != nil {
   853  		return nil, err
   854  	}
   855  
   856  	objects := []runtime.Object{}
   857  	for _, pod := range pods {
   858  		if !isPendingOrRunning(pod) {
   859  			continue
   860  		}
   861  		objects = append(objects, pod)
   862  	}
   863  
   864  	return objects, nil
   865  }
   866  
   867  func (api *API) getRCs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   868  	var err error
   869  	var rcs []*corev1.ReplicationController
   870  
   871  	if namespace == "" {
   872  		rcs, err = api.RC().Lister().List(labelSelector)
   873  	} else if name == "" {
   874  		rcs, err = api.RC().Lister().ReplicationControllers(namespace).List(labelSelector)
   875  	} else {
   876  		var rc *corev1.ReplicationController
   877  		rc, err = api.RC().Lister().ReplicationControllers(namespace).Get(name)
   878  		rcs = []*corev1.ReplicationController{rc}
   879  	}
   880  
   881  	if err != nil {
   882  		return nil, err
   883  	}
   884  
   885  	objects := []runtime.Object{}
   886  	for _, rc := range rcs {
   887  		objects = append(objects, rc)
   888  	}
   889  
   890  	return objects, nil
   891  }
   892  
   893  func (api *API) getDaemonsets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   894  	var err error
   895  	var daemonsets []*appsv1.DaemonSet
   896  
   897  	if namespace == "" {
   898  		daemonsets, err = api.DS().Lister().List(labelSelector)
   899  	} else if name == "" {
   900  		daemonsets, err = api.DS().Lister().DaemonSets(namespace).List(labelSelector)
   901  	} else {
   902  		var ds *appsv1.DaemonSet
   903  		ds, err = api.DS().Lister().DaemonSets(namespace).Get(name)
   904  		daemonsets = []*appsv1.DaemonSet{ds}
   905  	}
   906  
   907  	if err != nil {
   908  		return nil, err
   909  	}
   910  
   911  	objects := []runtime.Object{}
   912  	for _, ds := range daemonsets {
   913  		objects = append(objects, ds)
   914  	}
   915  
   916  	return objects, nil
   917  }
   918  
   919  func (api *API) getStatefulsets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   920  	var err error
   921  	var statefulsets []*appsv1.StatefulSet
   922  
   923  	if namespace == "" {
   924  		statefulsets, err = api.SS().Lister().List(labelSelector)
   925  	} else if name == "" {
   926  		statefulsets, err = api.SS().Lister().StatefulSets(namespace).List(labelSelector)
   927  	} else {
   928  		var ss *appsv1.StatefulSet
   929  		ss, err = api.SS().Lister().StatefulSets(namespace).Get(name)
   930  		statefulsets = []*appsv1.StatefulSet{ss}
   931  	}
   932  
   933  	if err != nil {
   934  		return nil, err
   935  	}
   936  
   937  	objects := []runtime.Object{}
   938  	for _, ss := range statefulsets {
   939  		objects = append(objects, ss)
   940  	}
   941  
   942  	return objects, nil
   943  }
   944  
   945  func (api *API) getJobs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   946  	var err error
   947  	var jobs []*batchv1.Job
   948  
   949  	if namespace == "" {
   950  		jobs, err = api.Job().Lister().List(labelSelector)
   951  	} else if name == "" {
   952  		jobs, err = api.Job().Lister().Jobs(namespace).List(labelSelector)
   953  	} else {
   954  		var job *batchv1.Job
   955  		job, err = api.Job().Lister().Jobs(namespace).Get(name)
   956  		jobs = []*batchv1.Job{job}
   957  	}
   958  
   959  	if err != nil {
   960  		return nil, err
   961  	}
   962  
   963  	objects := []runtime.Object{}
   964  	for _, job := range jobs {
   965  		objects = append(objects, job)
   966  	}
   967  
   968  	return objects, nil
   969  }
   970  
   971  func (api *API) getServices(namespace, name string) ([]runtime.Object, error) {
   972  	services, err := api.GetServices(namespace, name)
   973  
   974  	if err != nil {
   975  		return nil, err
   976  	}
   977  
   978  	objects := []runtime.Object{}
   979  	for _, svc := range services {
   980  		objects = append(objects, svc)
   981  	}
   982  
   983  	return objects, nil
   984  }
   985  
   986  func (api *API) getCronjobs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
   987  	var err error
   988  	var cronjobs []*batchv1.CronJob
   989  
   990  	if namespace == "" {
   991  		cronjobs, err = api.CJ().Lister().List(labelSelector)
   992  	} else if name == "" {
   993  		cronjobs, err = api.CJ().Lister().CronJobs(namespace).List(labelSelector)
   994  	} else {
   995  		var cronjob *batchv1.CronJob
   996  		cronjob, err = api.CJ().Lister().CronJobs(namespace).Get(name)
   997  		cronjobs = []*batchv1.CronJob{cronjob}
   998  	}
   999  	if err != nil {
  1000  		return nil, err
  1001  	}
  1002  
  1003  	objects := []runtime.Object{}
  1004  	for _, cronjob := range cronjobs {
  1005  		objects = append(objects, cronjob)
  1006  	}
  1007  
  1008  	return objects, nil
  1009  }
  1010  
  1011  func (api *API) getReplicasets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
  1012  	var err error
  1013  	var replicasets []*appsv1.ReplicaSet
  1014  
  1015  	if namespace == "" {
  1016  		replicasets, err = api.RS().Lister().List(labelSelector)
  1017  	} else if name == "" {
  1018  		replicasets, err = api.RS().Lister().ReplicaSets(namespace).List(labelSelector)
  1019  	} else {
  1020  		var replicaset *appsv1.ReplicaSet
  1021  		replicaset, err = api.RS().Lister().ReplicaSets(namespace).Get(name)
  1022  		replicasets = []*appsv1.ReplicaSet{replicaset}
  1023  	}
  1024  	if err != nil {
  1025  		return nil, err
  1026  	}
  1027  
  1028  	objects := []runtime.Object{}
  1029  	for _, replicaset := range replicasets {
  1030  		objects = append(objects, replicaset)
  1031  	}
  1032  
  1033  	return objects, nil
  1034  }
  1035  
  1036  // GetServices returns a list of Service resources, based on input namespace and
  1037  // name.
  1038  func (api *API) GetServices(namespace, name string) ([]*corev1.Service, error) {
  1039  	var err error
  1040  	var services []*corev1.Service
  1041  
  1042  	if namespace == "" {
  1043  		services, err = api.Svc().Lister().List(labels.Everything())
  1044  	} else if name == "" {
  1045  		services, err = api.Svc().Lister().Services(namespace).List(labels.Everything())
  1046  	} else {
  1047  		var svc *corev1.Service
  1048  		svc, err = api.Svc().Lister().Services(namespace).Get(name)
  1049  		services = []*corev1.Service{svc}
  1050  	}
  1051  
  1052  	return services, err
  1053  }
  1054  
  1055  // GetServicesFor returns all Service resources which include a pod of the given
  1056  // resource object.  In other words, it returns all Services of which the given
  1057  // resource object is a part of.
  1058  func (api *API) GetServicesFor(obj runtime.Object, includeFailed bool) ([]*corev1.Service, error) {
  1059  	if svc, ok := obj.(*corev1.Service); ok {
  1060  		return []*corev1.Service{svc}, nil
  1061  	}
  1062  
  1063  	pods, err := api.GetPodsFor(obj, includeFailed)
  1064  	if err != nil {
  1065  		return nil, err
  1066  	}
  1067  	namespace, err := GetNamespaceOf(obj)
  1068  	if err != nil {
  1069  		return nil, err
  1070  	}
  1071  	allServices, err := api.GetServices(namespace, "")
  1072  	if err != nil {
  1073  		return nil, err
  1074  	}
  1075  	services := make([]*corev1.Service, 0)
  1076  	for _, svc := range allServices {
  1077  		svcPods, err := api.GetPodsFor(svc, includeFailed)
  1078  		if err != nil {
  1079  			return nil, err
  1080  		}
  1081  
  1082  		if hasOverlap(pods, svcPods) {
  1083  			services = append(services, svc)
  1084  		}
  1085  	}
  1086  	return services, nil
  1087  }
  1088  
  1089  // GetServiceProfileFor returns the service profile for a given service.  We
  1090  // first look for a matching service profile in the client's namespace.  If not
  1091  // found, we then look in the service's namespace.  If no service profile is
  1092  // found, we return the default service profile.
  1093  func (api *API) GetServiceProfileFor(svc *corev1.Service, clientNs, clusterDomain string) *spv1alpha2.ServiceProfile {
  1094  	dst := fmt.Sprintf("%s.%s.svc.%s", svc.Name, svc.Namespace, clusterDomain)
  1095  	// First attempt to lookup profile in client namespace
  1096  	if clientNs != "" {
  1097  		p, err := api.SP().Lister().ServiceProfiles(clientNs).Get(dst)
  1098  		if err == nil {
  1099  			return p
  1100  		}
  1101  		if !apierrors.IsNotFound(err) {
  1102  			log.Errorf("error getting service profile for %s in %s namespace: %s", dst, clientNs, err)
  1103  		}
  1104  	}
  1105  	// Second, attempt to lookup profile in server namespace
  1106  	if svc.Namespace != clientNs {
  1107  		p, err := api.SP().Lister().ServiceProfiles(svc.Namespace).Get(dst)
  1108  		if err == nil {
  1109  			return p
  1110  		}
  1111  		if !apierrors.IsNotFound(err) {
  1112  			log.Errorf("error getting service profile for %s in %s namespace: %s", dst, svc.Namespace, err)
  1113  		}
  1114  	}
  1115  	// Not found; return default.
  1116  	log.Debugf("no Service Profile found for '%s' -- using default", dst)
  1117  	return &spv1alpha2.ServiceProfile{
  1118  		ObjectMeta: metav1.ObjectMeta{
  1119  			Name: dst,
  1120  		},
  1121  		Spec: spv1alpha2.ServiceProfileSpec{
  1122  			Routes: []*spv1alpha2.RouteSpec{},
  1123  		},
  1124  	}
  1125  }
  1126  
  1127  func hasOverlap(as, bs []*corev1.Pod) bool {
  1128  	for _, a := range as {
  1129  		for _, b := range bs {
  1130  			if a.Name == b.Name {
  1131  				return true
  1132  			}
  1133  		}
  1134  	}
  1135  	return false
  1136  }
  1137  
  1138  func isPendingOrRunning(pod *corev1.Pod) bool {
  1139  	pending := pod.Status.Phase == corev1.PodPending
  1140  	running := pod.Status.Phase == corev1.PodRunning
  1141  	terminating := pod.DeletionTimestamp != nil
  1142  	return (pending || running) && !terminating
  1143  }
  1144  
  1145  func isFailed(pod *corev1.Pod) bool {
  1146  	return pod.Status.Phase == corev1.PodFailed
  1147  }
  1148  

View as plain text