1
16
17 package disk
18
19 import (
20 "errors"
21 "io"
22 "net/http"
23 "os"
24 "path/filepath"
25 "sync"
26 "time"
27
28 openapi_v2 "github.com/google/gnostic-models/openapiv2"
29 "k8s.io/klog/v2"
30
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/version"
34 "k8s.io/client-go/discovery"
35 "k8s.io/client-go/discovery/cached/memory"
36 "k8s.io/client-go/kubernetes/scheme"
37 "k8s.io/client-go/openapi"
38 cachedopenapi "k8s.io/client-go/openapi/cached"
39 restclient "k8s.io/client-go/rest"
40 )
41
42
43
44 type CachedDiscoveryClient struct {
45 delegate discovery.DiscoveryInterface
46
47
48 cacheDirectory string
49
50
51 ttl time.Duration
52
53
54 mutex sync.Mutex
55
56
57 ourFiles map[string]struct{}
58
59 invalidated bool
60
61 fresh bool
62
63
64 openapiClient openapi.Client
65 }
66
67 var _ discovery.CachedDiscoveryInterface = &CachedDiscoveryClient{}
68
69
70 func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
71 filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
72 cachedBytes, err := d.getCachedFile(filename)
73
74 if err == nil {
75 cachedResources := &metav1.APIResourceList{}
76 if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
77 klog.V(10).Infof("returning cached discovery info from %v", filename)
78 return cachedResources, nil
79 }
80 }
81
82 liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
83 if err != nil {
84 klog.V(3).Infof("skipped caching discovery info due to %v", err)
85 return liveResources, err
86 }
87 if liveResources == nil || len(liveResources.APIResources) == 0 {
88 klog.V(3).Infof("skipped caching discovery info, no resources found")
89 return liveResources, err
90 }
91
92 if err := d.writeCachedFile(filename, liveResources); err != nil {
93 klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
94 }
95
96 return liveResources, nil
97 }
98
99
100 func (d *CachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
101 return discovery.ServerGroupsAndResources(d)
102 }
103
104
105
106 func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
107 filename := filepath.Join(d.cacheDirectory, "servergroups.json")
108 cachedBytes, err := d.getCachedFile(filename)
109
110 if err == nil {
111 cachedGroups := &metav1.APIGroupList{}
112 if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
113 klog.V(10).Infof("returning cached discovery info from %v", filename)
114 return cachedGroups, nil
115 }
116 }
117
118 liveGroups, err := d.delegate.ServerGroups()
119 if err != nil {
120 klog.V(3).Infof("skipped caching discovery info due to %v", err)
121 return liveGroups, err
122 }
123 if liveGroups == nil || len(liveGroups.Groups) == 0 {
124 klog.V(3).Infof("skipped caching discovery info, no groups found")
125 return liveGroups, err
126 }
127
128 if err := d.writeCachedFile(filename, liveGroups); err != nil {
129 klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
130 }
131
132 return liveGroups, nil
133 }
134
135 func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
136
137 d.mutex.Lock()
138 _, ourFile := d.ourFiles[filename]
139 if d.invalidated && !ourFile {
140 d.mutex.Unlock()
141 return nil, errors.New("cache invalidated")
142 }
143 d.mutex.Unlock()
144
145 file, err := os.Open(filename)
146 if err != nil {
147 return nil, err
148 }
149 defer file.Close()
150
151 fileInfo, err := file.Stat()
152 if err != nil {
153 return nil, err
154 }
155
156 if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
157 return nil, errors.New("cache expired")
158 }
159
160
161 cachedBytes, err := io.ReadAll(file)
162 if err != nil {
163 return nil, err
164 }
165
166 d.mutex.Lock()
167 defer d.mutex.Unlock()
168 d.fresh = d.fresh && ourFile
169
170 return cachedBytes, nil
171 }
172
173 func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
174 if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
175 return err
176 }
177
178 bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
179 if err != nil {
180 return err
181 }
182
183 f, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)+".")
184 if err != nil {
185 return err
186 }
187 defer os.Remove(f.Name())
188 _, err = f.Write(bytes)
189 if err != nil {
190 return err
191 }
192
193 err = os.Chmod(f.Name(), 0660)
194 if err != nil {
195 return err
196 }
197
198 name := f.Name()
199 err = f.Close()
200 if err != nil {
201 return err
202 }
203
204
205 d.mutex.Lock()
206 defer d.mutex.Unlock()
207 err = os.Rename(name, filename)
208 if err == nil {
209 d.ourFiles[filename] = struct{}{}
210 }
211 return err
212 }
213
214
215
216 func (d *CachedDiscoveryClient) RESTClient() restclient.Interface {
217 return d.delegate.RESTClient()
218 }
219
220
221
222 func (d *CachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
223 return discovery.ServerPreferredResources(d)
224 }
225
226
227
228 func (d *CachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
229 return discovery.ServerPreferredNamespacedResources(d)
230 }
231
232
233 func (d *CachedDiscoveryClient) ServerVersion() (*version.Info, error) {
234 return d.delegate.ServerVersion()
235 }
236
237
238 func (d *CachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
239 return d.delegate.OpenAPISchema()
240 }
241
242
243 func (d *CachedDiscoveryClient) OpenAPIV3() openapi.Client {
244
245 d.mutex.Lock()
246 defer d.mutex.Unlock()
247
248 if d.openapiClient == nil {
249
250
251 d.openapiClient = cachedopenapi.NewClient(d.delegate.OpenAPIV3())
252 }
253
254 return d.openapiClient
255 }
256
257
258
259 func (d *CachedDiscoveryClient) Fresh() bool {
260 d.mutex.Lock()
261 defer d.mutex.Unlock()
262
263 return d.fresh
264 }
265
266
267 func (d *CachedDiscoveryClient) Invalidate() {
268 d.mutex.Lock()
269 defer d.mutex.Unlock()
270
271 d.ourFiles = map[string]struct{}{}
272 d.fresh = true
273 d.invalidated = true
274 d.openapiClient = nil
275 if ad, ok := d.delegate.(discovery.CachedDiscoveryInterface); ok {
276 ad.Invalidate()
277 }
278 }
279
280
281
282 func (d *CachedDiscoveryClient) WithLegacy() discovery.DiscoveryInterface {
283 return d
284 }
285
286
287
288
289
290
291
292
293
294
295 func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
296 if len(httpCacheDir) > 0 {
297
298
299 config = restclient.CopyConfig(config)
300 config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
301 return newCacheRoundTripper(httpCacheDir, rt)
302 })
303 }
304
305 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
306 if err != nil {
307 return nil, err
308 }
309
310
311
312
313 return newCachedDiscoveryClient(memory.NewMemCacheClient(discoveryClient), discoveryCacheDir, ttl), nil
314 }
315
316
317 func newCachedDiscoveryClient(delegate discovery.DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
318 return &CachedDiscoveryClient{
319 delegate: delegate,
320 cacheDirectory: cacheDirectory,
321 ttl: ttl,
322 ourFiles: map[string]struct{}{},
323 fresh: true,
324 }
325 }
326
View as plain text