1 package k8s
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7
8 "github.com/linkerd/linkerd2/pkg/k8s"
9 "github.com/prometheus/client_golang/prometheus"
10 log "github.com/sirupsen/logrus"
11 appsv1 "k8s.io/api/apps/v1"
12 batchv1 "k8s.io/api/batch/v1"
13 corev1 "k8s.io/api/core/v1"
14 "k8s.io/apimachinery/pkg/api/meta"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/apimachinery/pkg/labels"
17 "k8s.io/apimachinery/pkg/runtime"
18 "k8s.io/client-go/informers"
19 "k8s.io/client-go/metadata"
20 "k8s.io/client-go/metadata/metadatainformer"
21 "k8s.io/client-go/rest"
22 "k8s.io/client-go/tools/cache"
23 )
24
25
26 type MetadataAPI struct {
27 promGauges
28
29 client metadata.Interface
30 inf map[APIResource]informers.GenericInformer
31 syncChecks []cache.InformerSynced
32 sharedInformers metadatainformer.SharedInformerFactory
33 }
34
35
36
37 func InitializeMetadataAPI(kubeConfig string, cluster string, resources ...APIResource) (*MetadataAPI, error) {
38 config, err := k8s.GetConfig(kubeConfig, "")
39 if err != nil {
40 return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err)
41 }
42 return InitializeMetadataAPIForConfig(config, cluster, resources...)
43 }
44
45 func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, cluster string, resources ...APIResource) (*MetadataAPI, error) {
46 client, err := metadata.NewForConfig(kubeConfig)
47 if err != nil {
48 return nil, err
49 }
50
51 api, err := newClusterScopedMetadataAPI(client, cluster, resources...)
52 if err != nil {
53 return nil, err
54 }
55
56 for _, gauge := range api.gauges {
57 if err := prometheus.Register(gauge); err != nil {
58 log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err)
59 }
60 }
61 return api, nil
62
63 }
64
65 func newClusterScopedMetadataAPI(
66 metadataClient metadata.Interface,
67 cluster string,
68 resources ...APIResource,
69 ) (*MetadataAPI, error) {
70 sharedInformers := metadatainformer.NewFilteredSharedInformerFactory(
71 metadataClient,
72 ResyncTime,
73 metav1.NamespaceAll,
74 nil,
75 )
76
77 api := &MetadataAPI{
78 client: metadataClient,
79 inf: make(map[APIResource]informers.GenericInformer),
80 syncChecks: make([]cache.InformerSynced, 0),
81 sharedInformers: sharedInformers,
82 }
83
84 informerLabels := prometheus.Labels{
85 "cluster": cluster,
86 }
87
88 for _, resource := range resources {
89 if err := api.addInformer(resource, informerLabels); err != nil {
90 return nil, err
91 }
92 }
93 return api, nil
94 }
95
96
97 func (api *MetadataAPI) Sync(stopCh <-chan struct{}) {
98 api.sharedInformers.Start(stopCh)
99
100 waitForCacheSync(api.syncChecks)
101 }
102
103
104 func (api *MetadataAPI) UnregisterGauges() {
105 api.promGauges.unregister()
106 }
107
108 func (api *MetadataAPI) getLister(res APIResource) (cache.GenericLister, error) {
109 inf, ok := api.inf[res]
110 if !ok {
111 return nil, fmt.Errorf("metadata informer (%v) not configured", res)
112 }
113
114 return inf.Lister(), nil
115 }
116
117
118
119
120 func (api *MetadataAPI) Get(res APIResource, name string) (*metav1.PartialObjectMetadata, error) {
121 ls, err := api.getLister(res)
122 if err != nil {
123 return nil, err
124 }
125
126 obj, err := ls.Get(name)
127 if err != nil {
128 return nil, err
129 }
130
131
132
133 nsMeta, ok := obj.(*metav1.PartialObjectMetadata)
134 if !ok {
135 return nil, fmt.Errorf("couldn't convert obj %v to PartialObjectMetadata", obj)
136 }
137
138 return nsMeta, nil
139 }
140
141 func (api *MetadataAPI) getByNamespace(res APIResource, ns, name string) (*metav1.PartialObjectMetadata, error) {
142 ls, err := api.getLister(res)
143 if err != nil {
144 return nil, err
145 }
146
147 obj, err := ls.ByNamespace(ns).Get(name)
148 if err != nil {
149 return nil, err
150 }
151
152 nsMeta, ok := obj.(*metav1.PartialObjectMetadata)
153 if !ok {
154 return nil, fmt.Errorf("couldn't convert obj %v to PartialObjectMetadata", obj)
155 }
156
157 return nsMeta, nil
158 }
159
160
161
162
163 func (api *MetadataAPI) GetByNamespaceFiltered(
164 restype APIResource,
165 ns string,
166 name string,
167 labelSelector labels.Selector,
168 ) ([]*metav1.PartialObjectMetadata, error) {
169 ls, err := api.getLister(restype)
170 if err != nil {
171 return nil, err
172 }
173
174 var objs []runtime.Object
175 if ns == "" {
176 objs, err = ls.List(labelSelector)
177 } else if name == "" {
178 objs, err = ls.ByNamespace(ns).List(labelSelector)
179 } else {
180 var obj runtime.Object
181 obj, err = ls.ByNamespace(ns).Get(name)
182 objs = []runtime.Object{obj}
183 }
184
185 if err != nil {
186 return nil, err
187 }
188
189 objMetas := []*metav1.PartialObjectMetadata{}
190 for _, obj := range objs {
191
192
193 objMeta, ok := obj.(*metav1.PartialObjectMetadata)
194 if !ok {
195 return nil, fmt.Errorf("couldn't convert obj %v to PartialObjectMetadata", obj)
196 }
197 gvk, err := restype.GVK()
198 if err != nil {
199 return nil, err
200 }
201
202
203
204 objMeta.SetGroupVersionKind(gvk)
205 objMetas = append(objMetas, objMeta)
206 }
207
208 return objMetas, nil
209 }
210
211
212
213
214
215
216 func (api *MetadataAPI) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod, retry bool) (string, string, error) {
217 ownerRefs := pod.GetOwnerReferences()
218 if len(ownerRefs) == 0 {
219
220 return "pod", pod.Name, nil
221 } else if len(ownerRefs) > 1 {
222 log.Debugf("unexpected owner reference count (%d): %+v", len(ownerRefs), ownerRefs)
223 return "pod", pod.Name, nil
224 }
225
226 parent := ownerRefs[0]
227 var parentObj metav1.Object
228 var err error
229 switch parent.Kind {
230 case "Job":
231 parentObj, err = api.getByNamespace(Job, pod.Namespace, parent.Name)
232 if err != nil {
233 log.Warnf("failed to retrieve job from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
234 if retry {
235 parentObj, err = api.client.
236 Resource(batchv1.SchemeGroupVersion.WithResource("jobs")).
237 Namespace(pod.Namespace).
238 Get(ctx, parent.Name, metav1.GetOptions{})
239 if err != nil {
240 log.Warnf("failed to retrieve job from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
241 }
242 }
243 }
244 case "ReplicaSet":
245 var rsObj *metav1.PartialObjectMetadata
246 rsObj, err = api.getByNamespace(RS, pod.Namespace, parent.Name)
247 if err != nil {
248 log.Warnf("failed to retrieve replicaset from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
249 if retry {
250 rsObj, err = api.client.
251 Resource(appsv1.SchemeGroupVersion.WithResource("replicasets")).
252 Namespace(pod.Namespace).
253 Get(ctx, parent.Name, metav1.GetOptions{})
254 if err != nil {
255 log.Warnf("failed to retrieve replicaset from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
256 }
257 }
258 }
259
260 if rsObj == nil || !isValidRSParent(rsObj) {
261 return strings.ToLower(parent.Kind), parent.Name, nil
262 }
263 parentObj = rsObj
264 default:
265 return strings.ToLower(parent.Kind), parent.Name, nil
266 }
267
268 if err == nil && len(parentObj.GetOwnerReferences()) == 1 {
269 grandParent := parentObj.GetOwnerReferences()[0]
270 return strings.ToLower(grandParent.Kind), grandParent.Name, nil
271 }
272 return strings.ToLower(parent.Kind), parent.Name, nil
273 }
274
275 func (api *MetadataAPI) addInformer(res APIResource, informerLabels prometheus.Labels) error {
276 gvk, err := res.GVK()
277 if err != nil {
278 return err
279 }
280 gvr, _ := meta.UnsafeGuessKindToResource(gvk)
281 inf := api.sharedInformers.ForResource(gvr)
282 api.syncChecks = append(api.syncChecks, inf.Informer().HasSynced)
283 api.promGauges.addInformerSize(strings.ToLower(gvk.Kind), informerLabels, inf.Informer())
284 api.inf[res] = inf
285
286 return nil
287 }
288
View as plain text