...

Source file src/k8s.io/client-go/discovery/cached/disk/cached_discovery.go

Documentation: k8s.io/client-go/discovery/cached/disk

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package disk
    18  
    19  import (
    20  	"errors"
    21  	"io"
    22  	"net/http"
    23  	"os"
    24  	"path/filepath"
    25  	"sync"
    26  	"time"
    27  
    28  	openapi_v2 "github.com/google/gnostic-models/openapiv2"
    29  	"k8s.io/klog/v2"
    30  
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/version"
    34  	"k8s.io/client-go/discovery"
    35  	"k8s.io/client-go/discovery/cached/memory"
    36  	"k8s.io/client-go/kubernetes/scheme"
    37  	"k8s.io/client-go/openapi"
    38  	cachedopenapi "k8s.io/client-go/openapi/cached"
    39  	restclient "k8s.io/client-go/rest"
    40  )
    41  
    42  // CachedDiscoveryClient implements the functions that discovery server-supported API groups,
    43  // versions and resources.
    44  type CachedDiscoveryClient struct {
    45  	delegate discovery.DiscoveryInterface
    46  
    47  	// cacheDirectory is the directory where discovery docs are held.  It must be unique per host:port combination to work well.
    48  	cacheDirectory string
    49  
    50  	// ttl is how long the cache should be considered valid
    51  	ttl time.Duration
    52  
    53  	// mutex protects the variables below
    54  	mutex sync.Mutex
    55  
    56  	// ourFiles are all filenames of cache files created by this process
    57  	ourFiles map[string]struct{}
    58  	// invalidated is true if all cache files should be ignored that are not ours (e.g. after Invalidate() was called)
    59  	invalidated bool
    60  	// fresh is true if all used cache files were ours
    61  	fresh bool
    62  
    63  	// caching openapi v3 client which wraps the delegate's client
    64  	openapiClient openapi.Client
    65  }
    66  
    67  var _ discovery.CachedDiscoveryInterface = &CachedDiscoveryClient{}
    68  
    69  // ServerResourcesForGroupVersion returns the supported resources for a group and version.
    70  func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
    71  	filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
    72  	cachedBytes, err := d.getCachedFile(filename)
    73  	// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
    74  	if err == nil {
    75  		cachedResources := &metav1.APIResourceList{}
    76  		if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
    77  			klog.V(10).Infof("returning cached discovery info from %v", filename)
    78  			return cachedResources, nil
    79  		}
    80  	}
    81  
    82  	liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
    83  	if err != nil {
    84  		klog.V(3).Infof("skipped caching discovery info due to %v", err)
    85  		return liveResources, err
    86  	}
    87  	if liveResources == nil || len(liveResources.APIResources) == 0 {
    88  		klog.V(3).Infof("skipped caching discovery info, no resources found")
    89  		return liveResources, err
    90  	}
    91  
    92  	if err := d.writeCachedFile(filename, liveResources); err != nil {
    93  		klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
    94  	}
    95  
    96  	return liveResources, nil
    97  }
    98  
    99  // ServerGroupsAndResources returns the supported groups and resources for all groups and versions.
   100  func (d *CachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
   101  	return discovery.ServerGroupsAndResources(d)
   102  }
   103  
   104  // ServerGroups returns the supported groups, with information like supported versions and the
   105  // preferred version.
   106  func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
   107  	filename := filepath.Join(d.cacheDirectory, "servergroups.json")
   108  	cachedBytes, err := d.getCachedFile(filename)
   109  	// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
   110  	if err == nil {
   111  		cachedGroups := &metav1.APIGroupList{}
   112  		if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
   113  			klog.V(10).Infof("returning cached discovery info from %v", filename)
   114  			return cachedGroups, nil
   115  		}
   116  	}
   117  
   118  	liveGroups, err := d.delegate.ServerGroups()
   119  	if err != nil {
   120  		klog.V(3).Infof("skipped caching discovery info due to %v", err)
   121  		return liveGroups, err
   122  	}
   123  	if liveGroups == nil || len(liveGroups.Groups) == 0 {
   124  		klog.V(3).Infof("skipped caching discovery info, no groups found")
   125  		return liveGroups, err
   126  	}
   127  
   128  	if err := d.writeCachedFile(filename, liveGroups); err != nil {
   129  		klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
   130  	}
   131  
   132  	return liveGroups, nil
   133  }
   134  
   135  func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
   136  	// after invalidation ignore cache files not created by this process
   137  	d.mutex.Lock()
   138  	_, ourFile := d.ourFiles[filename]
   139  	if d.invalidated && !ourFile {
   140  		d.mutex.Unlock()
   141  		return nil, errors.New("cache invalidated")
   142  	}
   143  	d.mutex.Unlock()
   144  
   145  	file, err := os.Open(filename)
   146  	if err != nil {
   147  		return nil, err
   148  	}
   149  	defer file.Close()
   150  
   151  	fileInfo, err := file.Stat()
   152  	if err != nil {
   153  		return nil, err
   154  	}
   155  
   156  	if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
   157  		return nil, errors.New("cache expired")
   158  	}
   159  
   160  	// the cache is present and its valid.  Try to read and use it.
   161  	cachedBytes, err := io.ReadAll(file)
   162  	if err != nil {
   163  		return nil, err
   164  	}
   165  
   166  	d.mutex.Lock()
   167  	defer d.mutex.Unlock()
   168  	d.fresh = d.fresh && ourFile
   169  
   170  	return cachedBytes, nil
   171  }
   172  
   173  func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
   174  	if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
   175  		return err
   176  	}
   177  
   178  	bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
   179  	if err != nil {
   180  		return err
   181  	}
   182  
   183  	f, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)+".")
   184  	if err != nil {
   185  		return err
   186  	}
   187  	defer os.Remove(f.Name())
   188  	_, err = f.Write(bytes)
   189  	if err != nil {
   190  		return err
   191  	}
   192  
   193  	err = os.Chmod(f.Name(), 0660)
   194  	if err != nil {
   195  		return err
   196  	}
   197  
   198  	name := f.Name()
   199  	err = f.Close()
   200  	if err != nil {
   201  		return err
   202  	}
   203  
   204  	// atomic rename
   205  	d.mutex.Lock()
   206  	defer d.mutex.Unlock()
   207  	err = os.Rename(name, filename)
   208  	if err == nil {
   209  		d.ourFiles[filename] = struct{}{}
   210  	}
   211  	return err
   212  }
   213  
   214  // RESTClient returns a RESTClient that is used to communicate with API server
   215  // by this client implementation.
   216  func (d *CachedDiscoveryClient) RESTClient() restclient.Interface {
   217  	return d.delegate.RESTClient()
   218  }
   219  
   220  // ServerPreferredResources returns the supported resources with the version preferred by the
   221  // server.
   222  func (d *CachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
   223  	return discovery.ServerPreferredResources(d)
   224  }
   225  
   226  // ServerPreferredNamespacedResources returns the supported namespaced resources with the
   227  // version preferred by the server.
   228  func (d *CachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
   229  	return discovery.ServerPreferredNamespacedResources(d)
   230  }
   231  
   232  // ServerVersion retrieves and parses the server's version (git version).
   233  func (d *CachedDiscoveryClient) ServerVersion() (*version.Info, error) {
   234  	return d.delegate.ServerVersion()
   235  }
   236  
   237  // OpenAPISchema retrieves and parses the swagger API schema the server supports.
   238  func (d *CachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
   239  	return d.delegate.OpenAPISchema()
   240  }
   241  
   242  // OpenAPIV3 retrieves and parses the OpenAPIV3 specs exposed by the server
   243  func (d *CachedDiscoveryClient) OpenAPIV3() openapi.Client {
   244  	// Must take lock since Invalidate call may modify openapiClient
   245  	d.mutex.Lock()
   246  	defer d.mutex.Unlock()
   247  
   248  	if d.openapiClient == nil {
   249  		// Delegate is discovery client created with special HTTP client which
   250  		// respects E-Tag cache responses to serve cache from disk.
   251  		d.openapiClient = cachedopenapi.NewClient(d.delegate.OpenAPIV3())
   252  	}
   253  
   254  	return d.openapiClient
   255  }
   256  
   257  // Fresh is supposed to tell the caller whether or not to retry if the cache
   258  // fails to find something (false = retry, true = no need to retry).
   259  func (d *CachedDiscoveryClient) Fresh() bool {
   260  	d.mutex.Lock()
   261  	defer d.mutex.Unlock()
   262  
   263  	return d.fresh
   264  }
   265  
   266  // Invalidate enforces that no cached data is used in the future that is older than the current time.
   267  func (d *CachedDiscoveryClient) Invalidate() {
   268  	d.mutex.Lock()
   269  	defer d.mutex.Unlock()
   270  
   271  	d.ourFiles = map[string]struct{}{}
   272  	d.fresh = true
   273  	d.invalidated = true
   274  	d.openapiClient = nil
   275  	if ad, ok := d.delegate.(discovery.CachedDiscoveryInterface); ok {
   276  		ad.Invalidate()
   277  	}
   278  }
   279  
   280  // WithLegacy returns current cached discovery client;
   281  // current client does not support legacy-only discovery.
   282  func (d *CachedDiscoveryClient) WithLegacy() discovery.DiscoveryInterface {
   283  	return d
   284  }
   285  
   286  // NewCachedDiscoveryClientForConfig creates a new DiscoveryClient for the given config, and wraps
   287  // the created client in a CachedDiscoveryClient. The provided configuration is updated with a
   288  // custom transport that understands cache responses.
   289  // We receive two distinct cache directories for now, in order to preserve old behavior
   290  // which makes use of the --cache-dir flag value for storing cache data from the CacheRoundTripper,
   291  // and makes use of the hardcoded destination (~/.kube/cache/discovery/...) for storing
   292  // CachedDiscoveryClient cache data. If httpCacheDir is empty, the restconfig's transport will not
   293  // be updated with a roundtripper that understands cache responses.
   294  // If discoveryCacheDir is empty, cached server resource data will be looked up in the current directory.
   295  func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
   296  	if len(httpCacheDir) > 0 {
   297  		// update the given restconfig with a custom roundtripper that
   298  		// understands how to handle cache responses.
   299  		config = restclient.CopyConfig(config)
   300  		config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
   301  			return newCacheRoundTripper(httpCacheDir, rt)
   302  		})
   303  	}
   304  
   305  	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
   306  	if err != nil {
   307  		return nil, err
   308  	}
   309  
   310  	// The delegate caches the discovery groups and resources (memcache). "ServerGroups",
   311  	// which usually only returns (and caches) the groups, can now store the resources as
   312  	// well if the server supports the newer aggregated discovery format.
   313  	return newCachedDiscoveryClient(memory.NewMemCacheClient(discoveryClient), discoveryCacheDir, ttl), nil
   314  }
   315  
   316  // NewCachedDiscoveryClient creates a new DiscoveryClient.  cacheDirectory is the directory where discovery docs are held.  It must be unique per host:port combination to work well.
   317  func newCachedDiscoveryClient(delegate discovery.DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
   318  	return &CachedDiscoveryClient{
   319  		delegate:       delegate,
   320  		cacheDirectory: cacheDirectory,
   321  		ttl:            ttl,
   322  		ourFiles:       map[string]struct{}{},
   323  		fresh:          true,
   324  	}
   325  }
   326  

View as plain text