1
16
17 package internal
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 apimeta "k8s.io/apimachinery/pkg/api/meta"
26 "k8s.io/apimachinery/pkg/fields"
27 "k8s.io/apimachinery/pkg/labels"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/runtime/schema"
30 "k8s.io/client-go/tools/cache"
31
32 "sigs.k8s.io/controller-runtime/pkg/client"
33 "sigs.k8s.io/controller-runtime/pkg/internal/field/selector"
34 )
35
36
37 var _ client.Reader = &CacheReader{}
38
39
40 type CacheReader struct {
41
42 indexer cache.Indexer
43
44
45 groupVersionKind schema.GroupVersionKind
46
47
48 scopeName apimeta.RESTScopeName
49
50
51
52
53 disableDeepCopy bool
54 }
55
56
57 func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error {
58 if c.scopeName == apimeta.RESTScopeNameRoot {
59 key.Namespace = ""
60 }
61 storeKey := objectKeyToStoreKey(key)
62
63
64 obj, exists, err := c.indexer.GetByKey(storeKey)
65 if err != nil {
66 return err
67 }
68
69
70 if !exists {
71 return apierrors.NewNotFound(schema.GroupResource{
72 Group: c.groupVersionKind.Group,
73
74 Resource: c.groupVersionKind.Kind,
75 }, key.Name)
76 }
77
78
79 if _, isObj := obj.(runtime.Object); !isObj {
80
81 return fmt.Errorf("cache contained %T, which is not an Object", obj)
82 }
83
84 if c.disableDeepCopy {
85
86
87 } else {
88
89 obj = obj.(runtime.Object).DeepCopyObject()
90 }
91
92
93
94 outVal := reflect.ValueOf(out)
95 objVal := reflect.ValueOf(obj)
96 if !objVal.Type().AssignableTo(outVal.Type()) {
97 return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
98 }
99 reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
100 if !c.disableDeepCopy {
101 out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
102 }
103
104 return nil
105 }
106
107
108 func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
109 var objs []interface{}
110 var err error
111
112 listOpts := client.ListOptions{}
113 listOpts.ApplyOptions(opts)
114
115 if listOpts.Continue != "" {
116 return fmt.Errorf("continue list option is not supported by the cache")
117 }
118
119 switch {
120 case listOpts.FieldSelector != nil:
121 requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
122 if !requiresExact {
123 return fmt.Errorf("non-exact field matches are not supported by the cache")
124 }
125
126
127
128 objs, err = byIndexes(c.indexer, listOpts.FieldSelector.Requirements(), listOpts.Namespace)
129 case listOpts.Namespace != "":
130 objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
131 default:
132 objs = c.indexer.List()
133 }
134 if err != nil {
135 return err
136 }
137 var labelSel labels.Selector
138 if listOpts.LabelSelector != nil {
139 labelSel = listOpts.LabelSelector
140 }
141
142 limitSet := listOpts.Limit > 0
143
144 runtimeObjs := make([]runtime.Object, 0, len(objs))
145 for _, item := range objs {
146
147
148 if limitSet && int64(len(runtimeObjs)) >= listOpts.Limit {
149 break
150 }
151 obj, isObj := item.(runtime.Object)
152 if !isObj {
153 return fmt.Errorf("cache contained %T, which is not an Object", item)
154 }
155 meta, err := apimeta.Accessor(obj)
156 if err != nil {
157 return err
158 }
159 if labelSel != nil {
160 lbls := labels.Set(meta.GetLabels())
161 if !labelSel.Matches(lbls) {
162 continue
163 }
164 }
165
166 var outObj runtime.Object
167 if c.disableDeepCopy || (listOpts.UnsafeDisableDeepCopy != nil && *listOpts.UnsafeDisableDeepCopy) {
168
169
170 outObj = obj
171 } else {
172 outObj = obj.DeepCopyObject()
173 outObj.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
174 }
175 runtimeObjs = append(runtimeObjs, outObj)
176 }
177 return apimeta.SetList(out, runtimeObjs)
178 }
179
180 func byIndexes(indexer cache.Indexer, requires fields.Requirements, namespace string) ([]interface{}, error) {
181 var (
182 err error
183 objs []interface{}
184 vals []string
185 )
186 indexers := indexer.GetIndexers()
187 for idx, req := range requires {
188 indexName := FieldIndexName(req.Field)
189 indexedValue := KeyToNamespacedKey(namespace, req.Value)
190 if idx == 0 {
191
192
193
194 objs, err = indexer.ByIndex(indexName, indexedValue)
195 if err != nil {
196 return nil, err
197 }
198 if len(objs) == 0 {
199 return nil, nil
200 }
201 continue
202 }
203 fn, exist := indexers[indexName]
204 if !exist {
205 return nil, fmt.Errorf("index with name %s does not exist", indexName)
206 }
207 filteredObjects := make([]interface{}, 0, len(objs))
208 for _, obj := range objs {
209 vals, err = fn(obj)
210 if err != nil {
211 return nil, err
212 }
213 for _, val := range vals {
214 if val == indexedValue {
215 filteredObjects = append(filteredObjects, obj)
216 break
217 }
218 }
219 }
220 if len(filteredObjects) == 0 {
221 return nil, nil
222 }
223 objs = filteredObjects
224 }
225 return objs, nil
226 }
227
228
229
230
231
232 func objectKeyToStoreKey(k client.ObjectKey) string {
233 if k.Namespace == "" {
234 return k.Name
235 }
236 return k.Namespace + "/" + k.Name
237 }
238
239
240
241 func FieldIndexName(field string) string {
242 return "field:" + field
243 }
244
245
246 const allNamespacesNamespace = "__all_namespaces"
247
248
249
250 func KeyToNamespacedKey(ns string, baseKey string) string {
251 if ns != "" {
252 return ns + "/" + baseKey
253 }
254 return allNamespacesNamespace + "/" + baseKey
255 }
256
View as plain text