1
16
17 package memory
18
19 import (
20 "errors"
21 "fmt"
22 "sync"
23 "syscall"
24
25 openapi_v2 "github.com/google/gnostic-models/openapiv2"
26
27 errorsutil "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime/schema"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/version"
32 "k8s.io/client-go/discovery"
33 "k8s.io/client-go/openapi"
34 cachedopenapi "k8s.io/client-go/openapi/cached"
35 restclient "k8s.io/client-go/rest"
36 "k8s.io/klog/v2"
37 )
38
39 type cacheEntry struct {
40 resourceList *metav1.APIResourceList
41 err error
42 }
43
44
45
46
47
48
49 type memCacheClient struct {
50 delegate discovery.DiscoveryInterface
51
52 lock sync.RWMutex
53 groupToServerResources map[string]*cacheEntry
54 groupList *metav1.APIGroupList
55 cacheValid bool
56 openapiClient openapi.Client
57 receivedAggregatedDiscovery bool
58 }
59
60
61 var (
62 ErrCacheNotFound = errors.New("not found")
63 )
64
65
66 type emptyResponseError struct {
67 gv string
68 }
69
70 func (e *emptyResponseError) Error() string {
71 return fmt.Sprintf("received empty response for: %s", e.gv)
72 }
73
74 var _ discovery.CachedDiscoveryInterface = &memCacheClient{}
75
76
77
78
79 func isTransientConnectionError(err error) bool {
80 var errno syscall.Errno
81 if errors.As(err, &errno) {
82 return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET
83 }
84 return false
85 }
86
87 func isTransientError(err error) bool {
88 if isTransientConnectionError(err) {
89 return true
90 }
91
92 if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 {
93 return true
94 }
95
96 return errorsutil.IsTooManyRequests(err)
97 }
98
99
100 func (d *memCacheClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
101 d.lock.Lock()
102 defer d.lock.Unlock()
103 if !d.cacheValid {
104 if err := d.refreshLocked(); err != nil {
105 return nil, err
106 }
107 }
108 cachedVal, ok := d.groupToServerResources[groupVersion]
109 if !ok {
110 return nil, ErrCacheNotFound
111 }
112
113 if cachedVal.err != nil && isTransientError(cachedVal.err) {
114 r, err := d.serverResourcesForGroupVersion(groupVersion)
115 if err != nil {
116
117 if _, emptyErr := err.(*emptyResponseError); emptyErr {
118
119 klog.V(3).Infof("%v", err)
120 } else {
121 utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err))
122 }
123 }
124 cachedVal = &cacheEntry{r, err}
125 d.groupToServerResources[groupVersion] = cachedVal
126 }
127
128 return cachedVal.resourceList, cachedVal.err
129 }
130
131
132 func (d *memCacheClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
133 return discovery.ServerGroupsAndResources(d)
134 }
135
136
137
138
139 func (d *memCacheClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error) {
140 d.lock.Lock()
141 defer d.lock.Unlock()
142
143 if !d.cacheValid {
144 if err := d.refreshLocked(); err != nil {
145 return nil, nil, nil, err
146 }
147 }
148
149 var resourcesMap map[schema.GroupVersion]*metav1.APIResourceList
150 var failedGVs map[schema.GroupVersion]error
151 if d.receivedAggregatedDiscovery && len(d.groupToServerResources) > 0 {
152 resourcesMap = map[schema.GroupVersion]*metav1.APIResourceList{}
153 failedGVs = map[schema.GroupVersion]error{}
154 for gv, cacheEntry := range d.groupToServerResources {
155 groupVersion, err := schema.ParseGroupVersion(gv)
156 if err != nil {
157 return nil, nil, nil, fmt.Errorf("failed to parse group version (%v): %v", gv, err)
158 }
159 if cacheEntry.err != nil {
160 failedGVs[groupVersion] = cacheEntry.err
161 } else {
162 resourcesMap[groupVersion] = cacheEntry.resourceList
163 }
164 }
165 }
166 return d.groupList, resourcesMap, failedGVs, nil
167 }
168
169 func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) {
170 groups, _, _, err := d.GroupsAndMaybeResources()
171 if err != nil {
172 return nil, err
173 }
174 return groups, nil
175 }
176
177 func (d *memCacheClient) RESTClient() restclient.Interface {
178 return d.delegate.RESTClient()
179 }
180
181 func (d *memCacheClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
182 return discovery.ServerPreferredResources(d)
183 }
184
185 func (d *memCacheClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
186 return discovery.ServerPreferredNamespacedResources(d)
187 }
188
189 func (d *memCacheClient) ServerVersion() (*version.Info, error) {
190 return d.delegate.ServerVersion()
191 }
192
193 func (d *memCacheClient) OpenAPISchema() (*openapi_v2.Document, error) {
194 return d.delegate.OpenAPISchema()
195 }
196
197 func (d *memCacheClient) OpenAPIV3() openapi.Client {
198
199 d.lock.Lock()
200 defer d.lock.Unlock()
201
202 if d.openapiClient == nil {
203 d.openapiClient = cachedopenapi.NewClient(d.delegate.OpenAPIV3())
204 }
205
206 return d.openapiClient
207 }
208
209 func (d *memCacheClient) Fresh() bool {
210 d.lock.RLock()
211 defer d.lock.RUnlock()
212
213
214
215 return d.cacheValid
216 }
217
218
219
220 func (d *memCacheClient) Invalidate() {
221 d.lock.Lock()
222 defer d.lock.Unlock()
223 d.cacheValid = false
224 d.groupToServerResources = nil
225 d.groupList = nil
226 d.openapiClient = nil
227 d.receivedAggregatedDiscovery = false
228 if ad, ok := d.delegate.(discovery.CachedDiscoveryInterface); ok {
229 ad.Invalidate()
230 }
231 }
232
233
234
235 func (d *memCacheClient) refreshLocked() error {
236
237
238
239 var gl *metav1.APIGroupList
240 var err error
241
242 if ad, ok := d.delegate.(discovery.AggregatedDiscoveryInterface); ok {
243 var resources map[schema.GroupVersion]*metav1.APIResourceList
244 var failedGVs map[schema.GroupVersion]error
245 gl, resources, failedGVs, err = ad.GroupsAndMaybeResources()
246 if resources != nil && err == nil {
247
248 d.groupToServerResources = map[string]*cacheEntry{}
249 d.groupList = gl
250 for gv, resources := range resources {
251 d.groupToServerResources[gv.String()] = &cacheEntry{resources, nil}
252 }
253
254 for gv, err := range failedGVs {
255 d.groupToServerResources[gv.String()] = &cacheEntry{nil, err}
256 }
257 d.receivedAggregatedDiscovery = true
258 d.cacheValid = true
259 return nil
260 }
261 } else {
262 gl, err = d.delegate.ServerGroups()
263 }
264 if err != nil || len(gl.Groups) == 0 {
265 utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
266 return err
267 }
268
269 wg := &sync.WaitGroup{}
270 resultLock := &sync.Mutex{}
271 rl := map[string]*cacheEntry{}
272 for _, g := range gl.Groups {
273 for _, v := range g.Versions {
274 gv := v.GroupVersion
275 wg.Add(1)
276 go func() {
277 defer wg.Done()
278 defer utilruntime.HandleCrash()
279
280 r, err := d.serverResourcesForGroupVersion(gv)
281 if err != nil {
282
283 if _, emptyErr := err.(*emptyResponseError); emptyErr {
284
285 klog.V(3).Infof("%v", err)
286 } else {
287 utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err))
288 }
289 }
290
291 resultLock.Lock()
292 defer resultLock.Unlock()
293 rl[gv] = &cacheEntry{r, err}
294 }()
295 }
296 }
297 wg.Wait()
298
299 d.groupToServerResources, d.groupList = rl, gl
300 d.cacheValid = true
301 return nil
302 }
303
304 func (d *memCacheClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
305 r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
306 if err != nil {
307 return r, err
308 }
309 if len(r.APIResources) == 0 {
310 return r, &emptyResponseError{gv: groupVersion}
311 }
312 return r, nil
313 }
314
315
316
317 func (d *memCacheClient) WithLegacy() discovery.DiscoveryInterface {
318 return d
319 }
320
321
322
323
324
325
326 func NewMemCacheClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface {
327 return &memCacheClient{
328 delegate: delegate,
329 groupToServerResources: map[string]*cacheEntry{},
330 receivedAggregatedDiscovery: false,
331 }
332 }
333
View as plain text