1 package k8s
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7
8 "k8s.io/client-go/dynamic"
9 "k8s.io/client-go/rest"
10
11 spv1alpha2 "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
12 l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
13 l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions"
14 ewinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/externalworkload/v1beta1"
15 srvinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/server/v1beta2"
16 spinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile/v1alpha2"
17 "github.com/linkerd/linkerd2/pkg/k8s"
18 "github.com/prometheus/client_golang/prometheus"
19 log "github.com/sirupsen/logrus"
20 "google.golang.org/grpc/codes"
21 "google.golang.org/grpc/status"
22 appsv1 "k8s.io/api/apps/v1"
23 batchv1 "k8s.io/api/batch/v1"
24 corev1 "k8s.io/api/core/v1"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/labels"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/client-go/informers"
31 arinformers "k8s.io/client-go/informers/admissionregistration/v1"
32 appv1informers "k8s.io/client-go/informers/apps/v1"
33 batchv1informers "k8s.io/client-go/informers/batch/v1"
34 coreinformers "k8s.io/client-go/informers/core/v1"
35 discoveryinformers "k8s.io/client-go/informers/discovery/v1"
36 "k8s.io/client-go/kubernetes"
37 "k8s.io/client-go/tools/cache"
38 )
39
40
41 type API struct {
42 promGauges
43
44 Client kubernetes.Interface
45 DynamicClient dynamic.Interface
46
47 cj batchv1informers.CronJobInformer
48 cm coreinformers.ConfigMapInformer
49 deploy appv1informers.DeploymentInformer
50 ds appv1informers.DaemonSetInformer
51 endpoint coreinformers.EndpointsInformer
52 es discoveryinformers.EndpointSliceInformer
53 ew ewinformers.ExternalWorkloadInformer
54 job batchv1informers.JobInformer
55 mwc arinformers.MutatingWebhookConfigurationInformer
56 ns coreinformers.NamespaceInformer
57 pod coreinformers.PodInformer
58 rc coreinformers.ReplicationControllerInformer
59 rs appv1informers.ReplicaSetInformer
60 sp spinformers.ServiceProfileInformer
61 ss appv1informers.StatefulSetInformer
62 svc coreinformers.ServiceInformer
63 node coreinformers.NodeInformer
64 secret coreinformers.SecretInformer
65 srv srvinformers.ServerInformer
66
67 syncChecks []cache.InformerSynced
68 sharedInformers informers.SharedInformerFactory
69 l5dCrdSharedInformers l5dcrdinformer.SharedInformerFactory
70 }
71
72
73
74
75
76 func InitializeAPI(ctx context.Context, kubeConfig string, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
77 config, err := k8s.GetConfig(kubeConfig, "")
78 if err != nil {
79 return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err)
80 }
81
82 dynamicClient, err := dynamic.NewForConfig(config)
83 if err != nil {
84 return nil, err
85 }
86
87 k8sClient, err := k8s.NewAPIForConfig(config, "", []string{}, 0, 0, 0)
88 if err != nil {
89 return nil, err
90 }
91
92 return initAPI(ctx, k8sClient, dynamicClient, config, ensureClusterWideAccess, cluster, resources...)
93 }
94
95
96
97
98
99 func InitializeAPIForConfig(ctx context.Context, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
100 k8sClient, err := k8s.NewAPIForConfig(kubeConfig, "", []string{}, 0, 0, 0)
101 if err != nil {
102 return nil, err
103 }
104
105 return initAPI(ctx, k8sClient, nil, kubeConfig, ensureClusterWideAccess, cluster, resources...)
106 }
107
108 func initAPI(ctx context.Context, k8sClient *k8s.KubernetesAPI, dynamicClient dynamic.Interface, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
109
110 var err error
111
112 if ensureClusterWideAccess {
113 err := k8s.ClusterAccess(ctx, k8sClient)
114 if err != nil {
115 return nil, err
116 }
117 }
118
119
120 var l5dCrdClient *l5dcrdclient.Clientset
121 for _, res := range resources {
122 switch {
123 case res == SP:
124 err := k8s.ServiceProfilesAccess(ctx, k8sClient)
125 if err != nil {
126 return nil, err
127 }
128 case res == Srv:
129 err := k8s.ServersAccess(ctx, k8sClient)
130 if err != nil {
131 return nil, err
132 }
133 case res == ExtWorkload:
134 err := k8s.ExtWorkloadAccess(ctx, k8sClient)
135 if err != nil {
136 return nil, err
137 }
138 default:
139 continue
140 }
141 l5dCrdClient, err = NewL5DCRDClient(kubeConfig)
142 if err != nil {
143 return nil, err
144 }
145 break
146 }
147
148 api := NewClusterScopedAPI(k8sClient, dynamicClient, l5dCrdClient, cluster, resources...)
149 for _, gauge := range api.gauges {
150 if err := prometheus.Register(gauge); err != nil {
151 log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err)
152 }
153 }
154 return api, nil
155 }
156
157
158
159
160
161 func NewClusterScopedAPI(
162 k8sClient kubernetes.Interface,
163 dynamicClient dynamic.Interface,
164 l5dCrdClient l5dcrdclient.Interface,
165 cluster string,
166 resources ...APIResource,
167 ) *API {
168 sharedInformers := informers.NewSharedInformerFactory(k8sClient, ResyncTime)
169 return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...)
170 }
171
172
173
174
175
176 func NewNamespacedAPI(
177 k8sClient kubernetes.Interface,
178 dynamicClient dynamic.Interface,
179 l5dCrdClient l5dcrdclient.Interface,
180 namespace string,
181 cluster string,
182 resources ...APIResource,
183 ) *API {
184 sharedInformers := informers.NewSharedInformerFactoryWithOptions(k8sClient, ResyncTime, informers.WithNamespace(namespace))
185 return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...)
186 }
187
188
189 func newAPI(
190 k8sClient kubernetes.Interface,
191 dynamicClient dynamic.Interface,
192 l5dCrdClient l5dcrdclient.Interface,
193 sharedInformers informers.SharedInformerFactory,
194 cluster string,
195 resources ...APIResource,
196 ) *API {
197 var l5dCrdSharedInformers l5dcrdinformer.SharedInformerFactory
198 if l5dCrdClient != nil {
199 l5dCrdSharedInformers = l5dcrdinformer.NewSharedInformerFactory(l5dCrdClient, ResyncTime)
200 }
201
202 api := &API{
203 Client: k8sClient,
204 DynamicClient: dynamicClient,
205 syncChecks: make([]cache.InformerSynced, 0),
206 sharedInformers: sharedInformers,
207 l5dCrdSharedInformers: l5dCrdSharedInformers,
208 }
209
210 informerLabels := prometheus.Labels{
211 "cluster": cluster,
212 }
213
214 for _, resource := range resources {
215 switch resource {
216 case CJ:
217 api.cj = sharedInformers.Batch().V1().CronJobs()
218 api.syncChecks = append(api.syncChecks, api.cj.Informer().HasSynced)
219 api.promGauges.addInformerSize(k8s.CronJob, informerLabels, api.cj.Informer())
220 case CM:
221 api.cm = sharedInformers.Core().V1().ConfigMaps()
222 api.syncChecks = append(api.syncChecks, api.cm.Informer().HasSynced)
223 api.promGauges.addInformerSize(k8s.ConfigMap, informerLabels, api.cm.Informer())
224 case Deploy:
225 api.deploy = sharedInformers.Apps().V1().Deployments()
226 api.syncChecks = append(api.syncChecks, api.deploy.Informer().HasSynced)
227 api.promGauges.addInformerSize(k8s.Deployment, informerLabels, api.deploy.Informer())
228 case DS:
229 api.ds = sharedInformers.Apps().V1().DaemonSets()
230 api.syncChecks = append(api.syncChecks, api.ds.Informer().HasSynced)
231 api.promGauges.addInformerSize(k8s.DaemonSet, informerLabels, api.ds.Informer())
232 case Endpoint:
233 api.endpoint = sharedInformers.Core().V1().Endpoints()
234 api.syncChecks = append(api.syncChecks, api.endpoint.Informer().HasSynced)
235 api.promGauges.addInformerSize(k8s.Endpoints, informerLabels, api.endpoint.Informer())
236 case ES:
237 api.es = sharedInformers.Discovery().V1().EndpointSlices()
238 api.syncChecks = append(api.syncChecks, api.es.Informer().HasSynced)
239 api.promGauges.addInformerSize(k8s.EndpointSlices, informerLabels, api.es.Informer())
240 case ExtWorkload:
241 if l5dCrdSharedInformers == nil {
242 panic("Linkerd CRD shared informer not configured")
243 }
244 api.ew = l5dCrdSharedInformers.Externalworkload().V1beta1().ExternalWorkloads()
245 api.syncChecks = append(api.syncChecks, api.ew.Informer().HasSynced)
246 api.promGauges.addInformerSize(k8s.ExtWorkload, informerLabels, api.ew.Informer())
247 case Job:
248 api.job = sharedInformers.Batch().V1().Jobs()
249 api.syncChecks = append(api.syncChecks, api.job.Informer().HasSynced)
250 api.promGauges.addInformerSize(k8s.Job, informerLabels, api.job.Informer())
251 case MWC:
252 api.mwc = sharedInformers.Admissionregistration().V1().MutatingWebhookConfigurations()
253 api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced)
254 api.promGauges.addInformerSize(k8s.MutatingWebhookConfig, informerLabels, api.mwc.Informer())
255 case NS:
256 api.ns = sharedInformers.Core().V1().Namespaces()
257 api.syncChecks = append(api.syncChecks, api.ns.Informer().HasSynced)
258 api.promGauges.addInformerSize(k8s.Namespace, informerLabels, api.ns.Informer())
259 case Pod:
260 api.pod = sharedInformers.Core().V1().Pods()
261 api.syncChecks = append(api.syncChecks, api.pod.Informer().HasSynced)
262 api.promGauges.addInformerSize(k8s.Pod, informerLabels, api.pod.Informer())
263 case RC:
264 api.rc = sharedInformers.Core().V1().ReplicationControllers()
265 api.syncChecks = append(api.syncChecks, api.rc.Informer().HasSynced)
266 api.promGauges.addInformerSize(k8s.ReplicationController, informerLabels, api.rc.Informer())
267 case RS:
268 api.rs = sharedInformers.Apps().V1().ReplicaSets()
269 api.syncChecks = append(api.syncChecks, api.rs.Informer().HasSynced)
270 api.promGauges.addInformerSize(k8s.ReplicaSet, informerLabels, api.rs.Informer())
271 case SP:
272 if l5dCrdSharedInformers == nil {
273 panic("Linkerd CRD shared informer not configured")
274 }
275 api.sp = l5dCrdSharedInformers.Linkerd().V1alpha2().ServiceProfiles()
276 api.syncChecks = append(api.syncChecks, api.sp.Informer().HasSynced)
277 api.promGauges.addInformerSize(k8s.ServiceProfile, informerLabels, api.sp.Informer())
278 case Srv:
279 if l5dCrdSharedInformers == nil {
280 panic("Linkerd CRD shared informer not configured")
281 }
282 api.srv = l5dCrdSharedInformers.Server().V1beta2().Servers()
283 api.syncChecks = append(api.syncChecks, api.srv.Informer().HasSynced)
284 api.promGauges.addInformerSize(k8s.Server, informerLabels, api.srv.Informer())
285 case SS:
286 api.ss = sharedInformers.Apps().V1().StatefulSets()
287 api.syncChecks = append(api.syncChecks, api.ss.Informer().HasSynced)
288 api.promGauges.addInformerSize(k8s.StatefulSet, informerLabels, api.ss.Informer())
289 case Svc:
290 api.svc = sharedInformers.Core().V1().Services()
291 api.syncChecks = append(api.syncChecks, api.svc.Informer().HasSynced)
292 api.promGauges.addInformerSize(k8s.Service, informerLabels, api.svc.Informer())
293 case Node:
294 api.node = sharedInformers.Core().V1().Nodes()
295 api.syncChecks = append(api.syncChecks, api.node.Informer().HasSynced)
296 api.promGauges.addInformerSize(k8s.Node, informerLabels, api.node.Informer())
297 case Secret:
298 api.secret = sharedInformers.Core().V1().Secrets()
299 api.syncChecks = append(api.syncChecks, api.secret.Informer().HasSynced)
300 api.promGauges.addInformerSize(k8s.Secret, informerLabels, api.secret.Informer())
301 }
302 }
303 return api
304 }
305
306
307 func (api *API) Sync(stopCh <-chan struct{}) {
308 api.sharedInformers.Start(stopCh)
309
310 if api.l5dCrdSharedInformers != nil {
311 api.l5dCrdSharedInformers.Start(stopCh)
312 }
313
314 waitForCacheSync(api.syncChecks)
315 }
316
317
318 func (api *API) UnregisterGauges() {
319 api.promGauges.unregister()
320 }
321
322
323 func (api *API) NS() coreinformers.NamespaceInformer {
324 if api.ns == nil {
325 panic("NS informer not configured")
326 }
327 return api.ns
328 }
329
330
331 func (api *API) Deploy() appv1informers.DeploymentInformer {
332 if api.deploy == nil {
333 panic("Deploy informer not configured")
334 }
335 return api.deploy
336 }
337
338
339 func (api *API) DS() appv1informers.DaemonSetInformer {
340 if api.ds == nil {
341 panic("DS informer not configured")
342 }
343 return api.ds
344 }
345
346
347 func (api *API) SS() appv1informers.StatefulSetInformer {
348 if api.ss == nil {
349 panic("SS informer not configured")
350 }
351 return api.ss
352 }
353
354
355 func (api *API) RS() appv1informers.ReplicaSetInformer {
356 if api.rs == nil {
357 panic("RS informer not configured")
358 }
359 return api.rs
360 }
361
362
363 func (api *API) Pod() coreinformers.PodInformer {
364 if api.pod == nil {
365 panic("Pod informer not configured")
366 }
367 return api.pod
368 }
369
370
371
372 func (api *API) RC() coreinformers.ReplicationControllerInformer {
373 if api.rc == nil {
374 panic("RC informer not configured")
375 }
376 return api.rc
377 }
378
379
380 func (api *API) Svc() coreinformers.ServiceInformer {
381 if api.svc == nil {
382 panic("Svc informer not configured")
383 }
384 return api.svc
385 }
386
387
388 func (api *API) Endpoint() coreinformers.EndpointsInformer {
389 if api.endpoint == nil {
390 panic("Endpoint informer not configured")
391 }
392 return api.endpoint
393 }
394
395
396 func (api *API) ES() discoveryinformers.EndpointSliceInformer {
397 if api.es == nil {
398 panic("EndpointSlices informer not configured")
399 }
400 return api.es
401 }
402
403
404
405 func (api *API) ExtWorkload() ewinformers.ExternalWorkloadInformer {
406 if api.ew == nil {
407 panic("ExternalWorkload informer not configured")
408 }
409 return api.ew
410 }
411
412
413 func (api *API) CM() coreinformers.ConfigMapInformer {
414 if api.cm == nil {
415 panic("CM informer not configured")
416 }
417 return api.cm
418 }
419
420
421 func (api *API) SP() spinformers.ServiceProfileInformer {
422 if api.sp == nil {
423 panic("SP informer not configured")
424 }
425 return api.sp
426 }
427
428
429 func (api *API) Srv() srvinformers.ServerInformer {
430 if api.srv == nil {
431 panic("Srv informer not configured")
432 }
433 return api.srv
434 }
435
436
437 func (api *API) MWC() arinformers.MutatingWebhookConfigurationInformer {
438 if api.mwc == nil {
439 panic("MWC informer not configured")
440 }
441 return api.mwc
442 }
443
444
445 func (api *API) Job() batchv1informers.JobInformer {
446 if api.job == nil {
447 panic("Job informer not configured")
448 }
449 return api.job
450 }
451
452
453
454 func (api *API) SPAvailable() bool {
455 return api.sp != nil
456 }
457
458
459 func (api *API) Node() coreinformers.NodeInformer {
460 if api.node == nil {
461 panic("Node informer not configured")
462 }
463 return api.node
464 }
465
466
467 func (api *API) Secret() coreinformers.SecretInformer {
468 if api.secret == nil {
469 panic("Secret informer not configured")
470 }
471 return api.secret
472 }
473
474
475 func (api *API) CJ() batchv1informers.CronJobInformer {
476 if api.cj == nil {
477 panic("CJ informer not configured")
478 }
479 return api.cj
480 }
481
482
483
484
485
486 func (api *API) GetObjects(namespace, restype, name string, label labels.Selector) ([]runtime.Object, error) {
487 switch restype {
488 case k8s.Namespace:
489 return api.getNamespaces(name, label)
490 case k8s.CronJob:
491 return api.getCronjobs(namespace, name, label)
492 case k8s.DaemonSet:
493 return api.getDaemonsets(namespace, name, label)
494 case k8s.Deployment:
495 return api.getDeployments(namespace, name, label)
496 case k8s.Job:
497 return api.getJobs(namespace, name, label)
498 case k8s.Pod:
499 return api.getPods(namespace, name, label)
500 case k8s.ReplicationController:
501 return api.getRCs(namespace, name, label)
502 case k8s.ReplicaSet:
503 return api.getReplicasets(namespace, name, label)
504 case k8s.Service:
505 return api.getServices(namespace, name)
506 case k8s.StatefulSet:
507 return api.getStatefulsets(namespace, name, label)
508 default:
509 return nil, status.Errorf(codes.Unimplemented, "unimplemented resource type: %s", restype)
510 }
511 }
512
513
514
515 func (api *API) KindSupported(restype string) bool {
516 switch restype {
517 case k8s.Namespace:
518 return api.ns != nil
519 case k8s.CronJob:
520 return api.cj != nil
521 case k8s.DaemonSet:
522 return api.ds != nil
523 case k8s.Deployment:
524 return api.deploy != nil
525 case k8s.Job:
526 return api.job != nil
527 case k8s.Pod:
528 return api.pod != nil
529 case k8s.ReplicationController:
530 return api.rc != nil
531 case k8s.ReplicaSet:
532 return api.rs != nil
533 case k8s.Service:
534 return api.svc != nil
535 case k8s.StatefulSet:
536 return api.ss != nil
537 default:
538 return false
539 }
540 }
541
542
543
544
545
546
547 func (api *API) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod, retry bool) (string, string) {
548 ownerRefs := pod.GetOwnerReferences()
549 if len(ownerRefs) == 0 {
550
551 return k8s.Pod, pod.Name
552 } else if len(ownerRefs) > 1 {
553 log.Debugf("unexpected owner reference count (%d): %+v", len(ownerRefs), ownerRefs)
554 return k8s.Pod, pod.Name
555 }
556
557 parent := ownerRefs[0]
558 var parentObj metav1.Object
559 var err error
560 switch parent.Kind {
561 case "Job":
562 parentObj, err = api.Job().Lister().Jobs(pod.Namespace).Get(parent.Name)
563 if err != nil {
564 log.Warnf("failed to retrieve job from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
565 if retry {
566 parentObj, err = api.Client.BatchV1().Jobs(pod.Namespace).Get(ctx, parent.Name, metav1.GetOptions{})
567 if err != nil {
568 log.Warnf("failed to retrieve job from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
569 }
570 }
571 }
572 case "ReplicaSet":
573 rsObj, err := api.RS().Lister().ReplicaSets(pod.Namespace).Get(parent.Name)
574 if err != nil {
575 log.Warnf("failed to retrieve replicaset from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
576 if retry {
577 rsObj, err = api.Client.AppsV1().ReplicaSets(pod.Namespace).Get(ctx, parent.Name, metav1.GetOptions{})
578 if err != nil {
579 log.Warnf("failed to retrieve replicaset from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
580 }
581 }
582 }
583
584 if rsObj == nil || !isValidRSParent(rsObj.GetObjectMeta()) {
585 return strings.ToLower(parent.Kind), parent.Name
586 }
587 parentObj = rsObj
588
589 default:
590 return strings.ToLower(parent.Kind), parent.Name
591 }
592
593 if err == nil && len(parentObj.GetOwnerReferences()) == 1 {
594 grandParent := parentObj.GetOwnerReferences()[0]
595 return strings.ToLower(grandParent.Kind), grandParent.Name
596 }
597 return strings.ToLower(parent.Kind), parent.Name
598 }
599
600
601
602 func (api *API) GetPodsFor(obj runtime.Object, includeFailed bool) ([]*corev1.Pod, error) {
603 var namespace string
604 var selector labels.Selector
605 var ownerUID types.UID
606 var err error
607
608 pods := []*corev1.Pod{}
609 switch typed := obj.(type) {
610 case *corev1.Namespace:
611 namespace = typed.Name
612 selector = labels.Everything()
613
614 case *batchv1.CronJob:
615 namespace = typed.Namespace
616 selector = labels.Everything()
617 jobs, err := api.Job().Lister().Jobs(namespace).List(selector)
618 if err != nil {
619 return nil, err
620 }
621 for _, job := range jobs {
622 if isOwner(typed.UID, job.GetOwnerReferences()) {
623 jobPods, err := api.GetPodsFor(job, includeFailed)
624 if err != nil {
625 return nil, err
626 }
627 pods = append(pods, jobPods...)
628 }
629 }
630 return pods, nil
631
632 case *appsv1.DaemonSet:
633 namespace = typed.Namespace
634 selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
635 ownerUID = typed.UID
636
637 case *appsv1.Deployment:
638 namespace = typed.Namespace
639 selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
640 ret, err := api.RS().Lister().ReplicaSets(namespace).List(selector)
641 if err != nil {
642 return nil, err
643 }
644 for _, rs := range ret {
645 if isOwner(typed.UID, rs.GetOwnerReferences()) {
646 podsRS, err := api.GetPodsFor(rs, includeFailed)
647 if err != nil {
648 return nil, err
649 }
650 pods = append(pods, podsRS...)
651 }
652 }
653 return pods, nil
654
655 case *appsv1.ReplicaSet:
656 namespace = typed.Namespace
657 selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
658 ownerUID = typed.UID
659
660 case *batchv1.Job:
661 namespace = typed.Namespace
662 selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
663 ownerUID = typed.UID
664
665 case *corev1.ReplicationController:
666 namespace = typed.Namespace
667 selector = labels.Set(typed.Spec.Selector).AsSelector()
668 ownerUID = typed.UID
669
670 case *corev1.Service:
671 if typed.Spec.Type == corev1.ServiceTypeExternalName {
672 return []*corev1.Pod{}, nil
673 }
674 namespace = typed.Namespace
675 if typed.Spec.Selector == nil {
676 selector = labels.Nothing()
677 } else {
678 selector = labels.Set(typed.Spec.Selector).AsSelector()
679 }
680
681 case *appsv1.StatefulSet:
682 namespace = typed.Namespace
683 selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
684 ownerUID = typed.UID
685
686 case *corev1.Pod:
687
688
689 namespace = typed.Namespace
690 pod, err := api.Pod().Lister().Pods(typed.Namespace).Get(typed.Name)
691 if err != nil {
692 return nil, err
693 }
694 pods = []*corev1.Pod{pod}
695
696 default:
697 return nil, fmt.Errorf("Cannot get object selector: %v", obj)
698 }
699
700
701
702 if len(pods) == 0 {
703 pods, err = api.Pod().Lister().Pods(namespace).List(selector)
704 if err != nil {
705 return nil, err
706 }
707 }
708
709 allPods := []*corev1.Pod{}
710 for _, pod := range pods {
711 if isPendingOrRunning(pod) || (includeFailed && isFailed(pod)) {
712 if ownerUID == "" || isOwner(ownerUID, pod.GetOwnerReferences()) {
713 allPods = append(allPods, pod)
714 }
715 }
716 }
717 return allPods, nil
718 }
719
720 func isOwner(u types.UID, ownerRefs []metav1.OwnerReference) bool {
721 for _, or := range ownerRefs {
722 if u == or.UID {
723 return true
724 }
725 }
726 return false
727 }
728
729
730 func GetNameAndNamespaceOf(obj runtime.Object) (string, string, error) {
731 switch typed := obj.(type) {
732 case *corev1.Namespace:
733 return typed.Name, typed.Name, nil
734
735 case *batchv1.CronJob:
736 return typed.Name, typed.Namespace, nil
737
738 case *appsv1.DaemonSet:
739 return typed.Name, typed.Namespace, nil
740
741 case *appsv1.Deployment:
742 return typed.Name, typed.Namespace, nil
743
744 case *batchv1.Job:
745 return typed.Name, typed.Namespace, nil
746
747 case *appsv1.ReplicaSet:
748 return typed.Name, typed.Namespace, nil
749
750 case *corev1.ReplicationController:
751 return typed.Name, typed.Namespace, nil
752
753 case *corev1.Service:
754 return typed.Name, typed.Namespace, nil
755
756 case *appsv1.StatefulSet:
757 return typed.Name, typed.Namespace, nil
758
759 case *corev1.Pod:
760 return typed.Name, typed.Namespace, nil
761
762 default:
763 return "", "", fmt.Errorf("Cannot determine object type: %v", obj)
764 }
765 }
766
767
768 func GetNameOf(obj runtime.Object) (string, error) {
769 name, _, err := GetNameAndNamespaceOf(obj)
770 if err != nil {
771 return "", err
772 }
773 return name, nil
774 }
775
776
777 func GetNamespaceOf(obj runtime.Object) (string, error) {
778 _, namespace, err := GetNameAndNamespaceOf(obj)
779 if err != nil {
780 return "", err
781 }
782 return namespace, nil
783 }
784
785
786
787 func (api *API) getNamespaces(name string, labelSelector labels.Selector) ([]runtime.Object, error) {
788 var namespaces []*corev1.Namespace
789
790 if name == "" {
791 var err error
792 namespaces, err = api.NS().Lister().List(labelSelector)
793 if err != nil {
794 return nil, err
795 }
796 } else {
797 namespace, err := api.NS().Lister().Get(name)
798 if err != nil {
799 return nil, err
800 }
801 namespaces = []*corev1.Namespace{namespace}
802 }
803
804 objects := []runtime.Object{}
805 for _, ns := range namespaces {
806 objects = append(objects, ns)
807 }
808
809 return objects, nil
810 }
811
812 func (api *API) getDeployments(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
813 var err error
814 var deploys []*appsv1.Deployment
815
816 if namespace == "" {
817 deploys, err = api.Deploy().Lister().List(labelSelector)
818 } else if name == "" {
819 deploys, err = api.Deploy().Lister().Deployments(namespace).List(labelSelector)
820 } else {
821 var deploy *appsv1.Deployment
822 deploy, err = api.Deploy().Lister().Deployments(namespace).Get(name)
823 deploys = []*appsv1.Deployment{deploy}
824 }
825
826 if err != nil {
827 return nil, err
828 }
829
830 objects := []runtime.Object{}
831 for _, deploy := range deploys {
832 objects = append(objects, deploy)
833 }
834
835 return objects, nil
836 }
837
838 func (api *API) getPods(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
839 var err error
840 var pods []*corev1.Pod
841
842 if namespace == "" {
843 pods, err = api.Pod().Lister().List(labelSelector)
844 } else if name == "" {
845 pods, err = api.Pod().Lister().Pods(namespace).List(labelSelector)
846 } else {
847 var pod *corev1.Pod
848 pod, err = api.Pod().Lister().Pods(namespace).Get(name)
849 pods = []*corev1.Pod{pod}
850 }
851
852 if err != nil {
853 return nil, err
854 }
855
856 objects := []runtime.Object{}
857 for _, pod := range pods {
858 if !isPendingOrRunning(pod) {
859 continue
860 }
861 objects = append(objects, pod)
862 }
863
864 return objects, nil
865 }
866
867 func (api *API) getRCs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
868 var err error
869 var rcs []*corev1.ReplicationController
870
871 if namespace == "" {
872 rcs, err = api.RC().Lister().List(labelSelector)
873 } else if name == "" {
874 rcs, err = api.RC().Lister().ReplicationControllers(namespace).List(labelSelector)
875 } else {
876 var rc *corev1.ReplicationController
877 rc, err = api.RC().Lister().ReplicationControllers(namespace).Get(name)
878 rcs = []*corev1.ReplicationController{rc}
879 }
880
881 if err != nil {
882 return nil, err
883 }
884
885 objects := []runtime.Object{}
886 for _, rc := range rcs {
887 objects = append(objects, rc)
888 }
889
890 return objects, nil
891 }
892
893 func (api *API) getDaemonsets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
894 var err error
895 var daemonsets []*appsv1.DaemonSet
896
897 if namespace == "" {
898 daemonsets, err = api.DS().Lister().List(labelSelector)
899 } else if name == "" {
900 daemonsets, err = api.DS().Lister().DaemonSets(namespace).List(labelSelector)
901 } else {
902 var ds *appsv1.DaemonSet
903 ds, err = api.DS().Lister().DaemonSets(namespace).Get(name)
904 daemonsets = []*appsv1.DaemonSet{ds}
905 }
906
907 if err != nil {
908 return nil, err
909 }
910
911 objects := []runtime.Object{}
912 for _, ds := range daemonsets {
913 objects = append(objects, ds)
914 }
915
916 return objects, nil
917 }
918
919 func (api *API) getStatefulsets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
920 var err error
921 var statefulsets []*appsv1.StatefulSet
922
923 if namespace == "" {
924 statefulsets, err = api.SS().Lister().List(labelSelector)
925 } else if name == "" {
926 statefulsets, err = api.SS().Lister().StatefulSets(namespace).List(labelSelector)
927 } else {
928 var ss *appsv1.StatefulSet
929 ss, err = api.SS().Lister().StatefulSets(namespace).Get(name)
930 statefulsets = []*appsv1.StatefulSet{ss}
931 }
932
933 if err != nil {
934 return nil, err
935 }
936
937 objects := []runtime.Object{}
938 for _, ss := range statefulsets {
939 objects = append(objects, ss)
940 }
941
942 return objects, nil
943 }
944
945 func (api *API) getJobs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
946 var err error
947 var jobs []*batchv1.Job
948
949 if namespace == "" {
950 jobs, err = api.Job().Lister().List(labelSelector)
951 } else if name == "" {
952 jobs, err = api.Job().Lister().Jobs(namespace).List(labelSelector)
953 } else {
954 var job *batchv1.Job
955 job, err = api.Job().Lister().Jobs(namespace).Get(name)
956 jobs = []*batchv1.Job{job}
957 }
958
959 if err != nil {
960 return nil, err
961 }
962
963 objects := []runtime.Object{}
964 for _, job := range jobs {
965 objects = append(objects, job)
966 }
967
968 return objects, nil
969 }
970
971 func (api *API) getServices(namespace, name string) ([]runtime.Object, error) {
972 services, err := api.GetServices(namespace, name)
973
974 if err != nil {
975 return nil, err
976 }
977
978 objects := []runtime.Object{}
979 for _, svc := range services {
980 objects = append(objects, svc)
981 }
982
983 return objects, nil
984 }
985
986 func (api *API) getCronjobs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
987 var err error
988 var cronjobs []*batchv1.CronJob
989
990 if namespace == "" {
991 cronjobs, err = api.CJ().Lister().List(labelSelector)
992 } else if name == "" {
993 cronjobs, err = api.CJ().Lister().CronJobs(namespace).List(labelSelector)
994 } else {
995 var cronjob *batchv1.CronJob
996 cronjob, err = api.CJ().Lister().CronJobs(namespace).Get(name)
997 cronjobs = []*batchv1.CronJob{cronjob}
998 }
999 if err != nil {
1000 return nil, err
1001 }
1002
1003 objects := []runtime.Object{}
1004 for _, cronjob := range cronjobs {
1005 objects = append(objects, cronjob)
1006 }
1007
1008 return objects, nil
1009 }
1010
1011 func (api *API) getReplicasets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
1012 var err error
1013 var replicasets []*appsv1.ReplicaSet
1014
1015 if namespace == "" {
1016 replicasets, err = api.RS().Lister().List(labelSelector)
1017 } else if name == "" {
1018 replicasets, err = api.RS().Lister().ReplicaSets(namespace).List(labelSelector)
1019 } else {
1020 var replicaset *appsv1.ReplicaSet
1021 replicaset, err = api.RS().Lister().ReplicaSets(namespace).Get(name)
1022 replicasets = []*appsv1.ReplicaSet{replicaset}
1023 }
1024 if err != nil {
1025 return nil, err
1026 }
1027
1028 objects := []runtime.Object{}
1029 for _, replicaset := range replicasets {
1030 objects = append(objects, replicaset)
1031 }
1032
1033 return objects, nil
1034 }
1035
1036
1037
1038 func (api *API) GetServices(namespace, name string) ([]*corev1.Service, error) {
1039 var err error
1040 var services []*corev1.Service
1041
1042 if namespace == "" {
1043 services, err = api.Svc().Lister().List(labels.Everything())
1044 } else if name == "" {
1045 services, err = api.Svc().Lister().Services(namespace).List(labels.Everything())
1046 } else {
1047 var svc *corev1.Service
1048 svc, err = api.Svc().Lister().Services(namespace).Get(name)
1049 services = []*corev1.Service{svc}
1050 }
1051
1052 return services, err
1053 }
1054
1055
1056
1057
1058 func (api *API) GetServicesFor(obj runtime.Object, includeFailed bool) ([]*corev1.Service, error) {
1059 if svc, ok := obj.(*corev1.Service); ok {
1060 return []*corev1.Service{svc}, nil
1061 }
1062
1063 pods, err := api.GetPodsFor(obj, includeFailed)
1064 if err != nil {
1065 return nil, err
1066 }
1067 namespace, err := GetNamespaceOf(obj)
1068 if err != nil {
1069 return nil, err
1070 }
1071 allServices, err := api.GetServices(namespace, "")
1072 if err != nil {
1073 return nil, err
1074 }
1075 services := make([]*corev1.Service, 0)
1076 for _, svc := range allServices {
1077 svcPods, err := api.GetPodsFor(svc, includeFailed)
1078 if err != nil {
1079 return nil, err
1080 }
1081
1082 if hasOverlap(pods, svcPods) {
1083 services = append(services, svc)
1084 }
1085 }
1086 return services, nil
1087 }
1088
1089
1090
1091
1092
1093 func (api *API) GetServiceProfileFor(svc *corev1.Service, clientNs, clusterDomain string) *spv1alpha2.ServiceProfile {
1094 dst := fmt.Sprintf("%s.%s.svc.%s", svc.Name, svc.Namespace, clusterDomain)
1095
1096 if clientNs != "" {
1097 p, err := api.SP().Lister().ServiceProfiles(clientNs).Get(dst)
1098 if err == nil {
1099 return p
1100 }
1101 if !apierrors.IsNotFound(err) {
1102 log.Errorf("error getting service profile for %s in %s namespace: %s", dst, clientNs, err)
1103 }
1104 }
1105
1106 if svc.Namespace != clientNs {
1107 p, err := api.SP().Lister().ServiceProfiles(svc.Namespace).Get(dst)
1108 if err == nil {
1109 return p
1110 }
1111 if !apierrors.IsNotFound(err) {
1112 log.Errorf("error getting service profile for %s in %s namespace: %s", dst, svc.Namespace, err)
1113 }
1114 }
1115
1116 log.Debugf("no Service Profile found for '%s' -- using default", dst)
1117 return &spv1alpha2.ServiceProfile{
1118 ObjectMeta: metav1.ObjectMeta{
1119 Name: dst,
1120 },
1121 Spec: spv1alpha2.ServiceProfileSpec{
1122 Routes: []*spv1alpha2.RouteSpec{},
1123 },
1124 }
1125 }
1126
1127 func hasOverlap(as, bs []*corev1.Pod) bool {
1128 for _, a := range as {
1129 for _, b := range bs {
1130 if a.Name == b.Name {
1131 return true
1132 }
1133 }
1134 }
1135 return false
1136 }
1137
1138 func isPendingOrRunning(pod *corev1.Pod) bool {
1139 pending := pod.Status.Phase == corev1.PodPending
1140 running := pod.Status.Phase == corev1.PodRunning
1141 terminating := pod.DeletionTimestamp != nil
1142 return (pending || running) && !terminating
1143 }
1144
1145 func isFailed(pod *corev1.Pod) bool {
1146 return pod.Status.Phase == corev1.PodFailed
1147 }
1148
View as plain text