1
2
3
4 package clusterreader
5
6 import (
7 "context"
8 "errors"
9 "fmt"
10 "sync"
11
12 apierrors "k8s.io/apimachinery/pkg/api/errors"
13 "k8s.io/apimachinery/pkg/api/meta"
14 metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17 "k8s.io/apimachinery/pkg/fields"
18 "k8s.io/apimachinery/pkg/labels"
19 "k8s.io/apimachinery/pkg/runtime"
20 "k8s.io/apimachinery/pkg/runtime/schema"
21 "k8s.io/client-go/tools/pager"
22 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
23 "sigs.k8s.io/cli-utils/pkg/object"
24 "sigs.k8s.io/controller-runtime/pkg/client"
25 )
26
27
28
29
30
31
32 var genGroupKinds = map[schema.GroupKind][]schema.GroupKind{
33 schema.GroupKind{Group: "apps", Kind: "Deployment"}: {
34 {
35 Group: "apps",
36 Kind: "ReplicaSet",
37 },
38 },
39 schema.GroupKind{Group: "apps", Kind: "ReplicaSet"}: {
40 {
41 Group: "",
42 Kind: "Pod",
43 },
44 },
45 schema.GroupKind{Group: "apps", Kind: "StatefulSet"}: {
46 {
47 Group: "",
48 Kind: "Pod",
49 },
50 },
51 }
52
53
54
55
56
57
58
59 func NewCachingClusterReader(reader client.Reader, mapper meta.RESTMapper, identifiers object.ObjMetadataSet) (engine.ClusterReader, error) {
60 gvkNamespaceSet := newGnSet()
61 for _, id := range identifiers {
62
63
64 err := buildGvkNamespaceSet([]schema.GroupKind{id.GroupKind}, id.Namespace, gvkNamespaceSet)
65 if err != nil {
66 return nil, err
67 }
68 }
69
70 return &CachingClusterReader{
71 reader: reader,
72 mapper: mapper,
73 gns: gvkNamespaceSet.gvkNamespaces,
74 }, nil
75 }
76
77 func buildGvkNamespaceSet(gks []schema.GroupKind, namespace string, gvkNamespaceSet *gvkNamespaceSet) error {
78 for _, gk := range gks {
79 gvkNamespaceSet.add(gkNamespace{
80 GroupKind: gk,
81 Namespace: namespace,
82 })
83 genGKs, found := genGroupKinds[gk]
84 if found {
85 err := buildGvkNamespaceSet(genGKs, namespace, gvkNamespaceSet)
86 if err != nil {
87 return err
88 }
89 }
90 }
91 return nil
92 }
93
94 type gvkNamespaceSet struct {
95 gvkNamespaces []gkNamespace
96 seen map[gkNamespace]struct{}
97 }
98
99 func newGnSet() *gvkNamespaceSet {
100 return &gvkNamespaceSet{
101 seen: make(map[gkNamespace]struct{}),
102 }
103 }
104
105 func (g *gvkNamespaceSet) add(gn gkNamespace) {
106 if _, found := g.seen[gn]; !found {
107 g.gvkNamespaces = append(g.gvkNamespaces, gn)
108 g.seen[gn] = struct{}{}
109 }
110 }
111
112
113
114
115
116 type CachingClusterReader struct {
117 mx sync.RWMutex
118
119
120
121 reader client.Reader
122
123
124
125 mapper meta.RESTMapper
126
127
128
129
130
131 gns []gkNamespace
132
133
134
135
136 cache map[gkNamespace]cacheEntry
137 }
138
139 type cacheEntry struct {
140 resources unstructured.UnstructuredList
141 err error
142 }
143
144
145 type gkNamespace struct {
146 GroupKind schema.GroupKind
147 Namespace string
148 }
149
150
151
152 func (c *CachingClusterReader) Get(_ context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
153 c.mx.RLock()
154 defer c.mx.RUnlock()
155 gvk := obj.GetObjectKind().GroupVersionKind()
156 mapping, err := c.mapper.RESTMapping(gvk.GroupKind())
157 if err != nil {
158 return err
159 }
160 gn := gkNamespace{
161 GroupKind: gvk.GroupKind(),
162 Namespace: key.Namespace,
163 }
164 cacheEntry, found := c.cache[gn]
165 if !found {
166 return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
167 }
168
169 if cacheEntry.err != nil {
170 return cacheEntry.err
171 }
172 for _, u := range cacheEntry.resources.Items {
173 if u.GetName() == key.Name {
174 obj.Object = u.Object
175 return nil
176 }
177 }
178 return apierrors.NewNotFound(mapping.Resource.GroupResource(), key.Name)
179 }
180
181
182
183 func (c *CachingClusterReader) ListNamespaceScoped(_ context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error {
184 c.mx.RLock()
185 defer c.mx.RUnlock()
186 gvk := list.GroupVersionKind()
187 gn := gkNamespace{
188 GroupKind: gvk.GroupKind(),
189 Namespace: namespace,
190 }
191
192 cacheEntry, found := c.cache[gn]
193 if !found {
194 return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
195 }
196
197 if cacheEntry.err != nil {
198 return cacheEntry.err
199 }
200
201 var items []unstructured.Unstructured
202 for _, u := range cacheEntry.resources.Items {
203 if selector.Matches(labels.Set(u.GetLabels())) {
204 items = append(items, u)
205 }
206 }
207 list.Items = items
208 return nil
209 }
210
211
212
213
214 func (c *CachingClusterReader) ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error {
215 return c.ListNamespaceScoped(ctx, list, "", selector)
216 }
217
218
219
220 func (c *CachingClusterReader) Sync(ctx context.Context) error {
221 c.mx.Lock()
222 defer c.mx.Unlock()
223 cache := make(map[gkNamespace]cacheEntry)
224 for _, gn := range c.gns {
225 mapping, err := c.mapper.RESTMapping(gn.GroupKind)
226 if err != nil {
227 if meta.IsNoMatchError(err) {
228
229
230
231
232
233 cache[gn] = cacheEntry{
234 err: err,
235 }
236 continue
237 }
238 return err
239 }
240 ns := ""
241 if mapping.Scope == meta.RESTScopeNamespace {
242 ns = gn.Namespace
243 }
244 list, err := c.listUnstructured(ctx, mapping.GroupVersionKind, ns)
245 if err != nil {
246
247
248 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
249 return err
250 }
251
252
253
254 cache[gn] = cacheEntry{
255 err: err,
256 }
257 continue
258 }
259 cache[gn] = cacheEntry{
260 resources: *list,
261 }
262 }
263 c.cache = cache
264 return nil
265 }
266
267
268
269
270 func (c *CachingClusterReader) listUnstructured(
271 ctx context.Context,
272 gvk schema.GroupVersionKind,
273 namespace string,
274 ) (*unstructured.UnstructuredList, error) {
275 mOpts := metav1.ListOptions{}
276 mOpts.SetGroupVersionKind(gvk)
277 obj, _, err := pager.New(c.listPageFunc(namespace)).List(ctx, mOpts)
278 if err != nil {
279 return nil, err
280 }
281
282 switch t := obj.(type) {
283 case *unstructured.UnstructuredList:
284
285 return t, nil
286 case *metainternalversion.List:
287
288 u := &unstructured.UnstructuredList{}
289 u.SetGroupVersionKind(gvk)
290
291
292 if t.ResourceVersion != "" {
293 u.SetResourceVersion(t.ResourceVersion)
294 }
295 if t.SelfLink != "" {
296 u.SetSelfLink(t.SelfLink)
297 }
298 u.Items = make([]unstructured.Unstructured, len(t.Items))
299 for i, item := range t.Items {
300 ui, ok := item.(*unstructured.Unstructured)
301 if !ok {
302 return nil, fmt.Errorf("unexpected list item type: %t", item)
303 }
304 u.Items[i] = *ui
305 }
306 return u, nil
307 default:
308 return nil, fmt.Errorf("unexpected list type: %t", t)
309 }
310 }
311
312 func (c *CachingClusterReader) listPageFunc(namespace string) pager.ListPageFunc {
313 return func(ctx context.Context, mOpts metav1.ListOptions) (runtime.Object, error) {
314 mOptsCopy := mOpts
315 labelSelector, err := labels.Parse(mOpts.LabelSelector)
316 if err != nil {
317 return nil, fmt.Errorf("failed to parse label selector: %w", err)
318 }
319 fieldSelector, err := fields.ParseSelector(mOpts.FieldSelector)
320 if err != nil {
321 return nil, fmt.Errorf("failed to parse field selector: %w", err)
322 }
323 cOpts := &client.ListOptions{
324 LabelSelector: labelSelector,
325 FieldSelector: fieldSelector,
326 Namespace: namespace,
327 Limit: mOpts.Limit,
328 Continue: mOpts.Continue,
329 Raw: &mOptsCopy,
330 }
331 var list unstructured.UnstructuredList
332 list.SetGroupVersionKind(mOpts.GroupVersionKind())
333
334
335 err = c.reader.List(ctx, &list, cOpts)
336 return &list, err
337 }
338 }
339
View as plain text