...

Source file src/k8s.io/client-go/discovery/cached/memory/memcache.go

Documentation: k8s.io/client-go/discovery/cached/memory

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package memory
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"sync"
    23  	"syscall"
    24  
    25  	openapi_v2 "github.com/google/gnostic-models/openapiv2"
    26  
    27  	errorsutil "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/runtime/schema"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/version"
    32  	"k8s.io/client-go/discovery"
    33  	"k8s.io/client-go/openapi"
    34  	cachedopenapi "k8s.io/client-go/openapi/cached"
    35  	restclient "k8s.io/client-go/rest"
    36  	"k8s.io/klog/v2"
    37  )
    38  
    39  type cacheEntry struct {
    40  	resourceList *metav1.APIResourceList
    41  	err          error
    42  }
    43  
    44  // memCacheClient can Invalidate() to stay up-to-date with discovery
    45  // information.
    46  //
    47  // TODO: Switch to a watch interface. Right now it will poll after each
    48  // Invalidate() call.
    49  type memCacheClient struct {
    50  	delegate discovery.DiscoveryInterface
    51  
    52  	lock                        sync.RWMutex
    53  	groupToServerResources      map[string]*cacheEntry
    54  	groupList                   *metav1.APIGroupList
    55  	cacheValid                  bool
    56  	openapiClient               openapi.Client
    57  	receivedAggregatedDiscovery bool
    58  }
    59  
    60  // Error Constants
    61  var (
    62  	ErrCacheNotFound = errors.New("not found")
    63  )
    64  
    65  // Server returning empty ResourceList for Group/Version.
    66  type emptyResponseError struct {
    67  	gv string
    68  }
    69  
    70  func (e *emptyResponseError) Error() string {
    71  	return fmt.Sprintf("received empty response for: %s", e.gv)
    72  }
    73  
    74  var _ discovery.CachedDiscoveryInterface = &memCacheClient{}
    75  
    76  // isTransientConnectionError checks whether given error is "Connection refused" or
    77  // "Connection reset" error which usually means that apiserver is temporarily
    78  // unavailable.
    79  func isTransientConnectionError(err error) bool {
    80  	var errno syscall.Errno
    81  	if errors.As(err, &errno) {
    82  		return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET
    83  	}
    84  	return false
    85  }
    86  
    87  func isTransientError(err error) bool {
    88  	if isTransientConnectionError(err) {
    89  		return true
    90  	}
    91  
    92  	if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 {
    93  		return true
    94  	}
    95  
    96  	return errorsutil.IsTooManyRequests(err)
    97  }
    98  
    99  // ServerResourcesForGroupVersion returns the supported resources for a group and version.
   100  func (d *memCacheClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
   101  	d.lock.Lock()
   102  	defer d.lock.Unlock()
   103  	if !d.cacheValid {
   104  		if err := d.refreshLocked(); err != nil {
   105  			return nil, err
   106  		}
   107  	}
   108  	cachedVal, ok := d.groupToServerResources[groupVersion]
   109  	if !ok {
   110  		return nil, ErrCacheNotFound
   111  	}
   112  
   113  	if cachedVal.err != nil && isTransientError(cachedVal.err) {
   114  		r, err := d.serverResourcesForGroupVersion(groupVersion)
   115  		if err != nil {
   116  			// Don't log "empty response" as an error; it is a common response for metrics.
   117  			if _, emptyErr := err.(*emptyResponseError); emptyErr {
   118  				// Log at same verbosity as disk cache.
   119  				klog.V(3).Infof("%v", err)
   120  			} else {
   121  				utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err))
   122  			}
   123  		}
   124  		cachedVal = &cacheEntry{r, err}
   125  		d.groupToServerResources[groupVersion] = cachedVal
   126  	}
   127  
   128  	return cachedVal.resourceList, cachedVal.err
   129  }
   130  
   131  // ServerGroupsAndResources returns the groups and supported resources for all groups and versions.
   132  func (d *memCacheClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
   133  	return discovery.ServerGroupsAndResources(d)
   134  }
   135  
   136  // GroupsAndMaybeResources returns the list of APIGroups, and possibly the map of group/version
   137  // to resources. The returned groups will never be nil, but the resources map can be nil
   138  // if there are no cached resources.
   139  func (d *memCacheClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error) {
   140  	d.lock.Lock()
   141  	defer d.lock.Unlock()
   142  
   143  	if !d.cacheValid {
   144  		if err := d.refreshLocked(); err != nil {
   145  			return nil, nil, nil, err
   146  		}
   147  	}
   148  	// Build the resourceList from the cache?
   149  	var resourcesMap map[schema.GroupVersion]*metav1.APIResourceList
   150  	var failedGVs map[schema.GroupVersion]error
   151  	if d.receivedAggregatedDiscovery && len(d.groupToServerResources) > 0 {
   152  		resourcesMap = map[schema.GroupVersion]*metav1.APIResourceList{}
   153  		failedGVs = map[schema.GroupVersion]error{}
   154  		for gv, cacheEntry := range d.groupToServerResources {
   155  			groupVersion, err := schema.ParseGroupVersion(gv)
   156  			if err != nil {
   157  				return nil, nil, nil, fmt.Errorf("failed to parse group version (%v): %v", gv, err)
   158  			}
   159  			if cacheEntry.err != nil {
   160  				failedGVs[groupVersion] = cacheEntry.err
   161  			} else {
   162  				resourcesMap[groupVersion] = cacheEntry.resourceList
   163  			}
   164  		}
   165  	}
   166  	return d.groupList, resourcesMap, failedGVs, nil
   167  }
   168  
   169  func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) {
   170  	groups, _, _, err := d.GroupsAndMaybeResources()
   171  	if err != nil {
   172  		return nil, err
   173  	}
   174  	return groups, nil
   175  }
   176  
   177  func (d *memCacheClient) RESTClient() restclient.Interface {
   178  	return d.delegate.RESTClient()
   179  }
   180  
   181  func (d *memCacheClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
   182  	return discovery.ServerPreferredResources(d)
   183  }
   184  
   185  func (d *memCacheClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
   186  	return discovery.ServerPreferredNamespacedResources(d)
   187  }
   188  
   189  func (d *memCacheClient) ServerVersion() (*version.Info, error) {
   190  	return d.delegate.ServerVersion()
   191  }
   192  
   193  func (d *memCacheClient) OpenAPISchema() (*openapi_v2.Document, error) {
   194  	return d.delegate.OpenAPISchema()
   195  }
   196  
   197  func (d *memCacheClient) OpenAPIV3() openapi.Client {
   198  	// Must take lock since Invalidate call may modify openapiClient
   199  	d.lock.Lock()
   200  	defer d.lock.Unlock()
   201  
   202  	if d.openapiClient == nil {
   203  		d.openapiClient = cachedopenapi.NewClient(d.delegate.OpenAPIV3())
   204  	}
   205  
   206  	return d.openapiClient
   207  }
   208  
   209  func (d *memCacheClient) Fresh() bool {
   210  	d.lock.RLock()
   211  	defer d.lock.RUnlock()
   212  	// Return whether the cache is populated at all. It is still possible that
   213  	// a single entry is missing due to transient errors and the attempt to read
   214  	// that entry will trigger retry.
   215  	return d.cacheValid
   216  }
   217  
   218  // Invalidate enforces that no cached data that is older than the current time
   219  // is used.
   220  func (d *memCacheClient) Invalidate() {
   221  	d.lock.Lock()
   222  	defer d.lock.Unlock()
   223  	d.cacheValid = false
   224  	d.groupToServerResources = nil
   225  	d.groupList = nil
   226  	d.openapiClient = nil
   227  	d.receivedAggregatedDiscovery = false
   228  	if ad, ok := d.delegate.(discovery.CachedDiscoveryInterface); ok {
   229  		ad.Invalidate()
   230  	}
   231  }
   232  
   233  // refreshLocked refreshes the state of cache. The caller must hold d.lock for
   234  // writing.
   235  func (d *memCacheClient) refreshLocked() error {
   236  	// TODO: Could this multiplicative set of calls be replaced by a single call
   237  	// to ServerResources? If it's possible for more than one resulting
   238  	// APIResourceList to have the same GroupVersion, the lists would need merged.
   239  	var gl *metav1.APIGroupList
   240  	var err error
   241  
   242  	if ad, ok := d.delegate.(discovery.AggregatedDiscoveryInterface); ok {
   243  		var resources map[schema.GroupVersion]*metav1.APIResourceList
   244  		var failedGVs map[schema.GroupVersion]error
   245  		gl, resources, failedGVs, err = ad.GroupsAndMaybeResources()
   246  		if resources != nil && err == nil {
   247  			// Cache the resources.
   248  			d.groupToServerResources = map[string]*cacheEntry{}
   249  			d.groupList = gl
   250  			for gv, resources := range resources {
   251  				d.groupToServerResources[gv.String()] = &cacheEntry{resources, nil}
   252  			}
   253  			// Cache GroupVersion discovery errors
   254  			for gv, err := range failedGVs {
   255  				d.groupToServerResources[gv.String()] = &cacheEntry{nil, err}
   256  			}
   257  			d.receivedAggregatedDiscovery = true
   258  			d.cacheValid = true
   259  			return nil
   260  		}
   261  	} else {
   262  		gl, err = d.delegate.ServerGroups()
   263  	}
   264  	if err != nil || len(gl.Groups) == 0 {
   265  		utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
   266  		return err
   267  	}
   268  
   269  	wg := &sync.WaitGroup{}
   270  	resultLock := &sync.Mutex{}
   271  	rl := map[string]*cacheEntry{}
   272  	for _, g := range gl.Groups {
   273  		for _, v := range g.Versions {
   274  			gv := v.GroupVersion
   275  			wg.Add(1)
   276  			go func() {
   277  				defer wg.Done()
   278  				defer utilruntime.HandleCrash()
   279  
   280  				r, err := d.serverResourcesForGroupVersion(gv)
   281  				if err != nil {
   282  					// Don't log "empty response" as an error; it is a common response for metrics.
   283  					if _, emptyErr := err.(*emptyResponseError); emptyErr {
   284  						// Log at same verbosity as disk cache.
   285  						klog.V(3).Infof("%v", err)
   286  					} else {
   287  						utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err))
   288  					}
   289  				}
   290  
   291  				resultLock.Lock()
   292  				defer resultLock.Unlock()
   293  				rl[gv] = &cacheEntry{r, err}
   294  			}()
   295  		}
   296  	}
   297  	wg.Wait()
   298  
   299  	d.groupToServerResources, d.groupList = rl, gl
   300  	d.cacheValid = true
   301  	return nil
   302  }
   303  
   304  func (d *memCacheClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
   305  	r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
   306  	if err != nil {
   307  		return r, err
   308  	}
   309  	if len(r.APIResources) == 0 {
   310  		return r, &emptyResponseError{gv: groupVersion}
   311  	}
   312  	return r, nil
   313  }
   314  
   315  // WithLegacy returns current memory-cached discovery client;
   316  // current client does not support legacy-only discovery.
   317  func (d *memCacheClient) WithLegacy() discovery.DiscoveryInterface {
   318  	return d
   319  }
   320  
   321  // NewMemCacheClient creates a new CachedDiscoveryInterface which caches
   322  // discovery information in memory and will stay up-to-date if Invalidate is
   323  // called with regularity.
   324  //
   325  // NOTE: The client will NOT resort to live lookups on cache misses.
   326  func NewMemCacheClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface {
   327  	return &memCacheClient{
   328  		delegate:                    delegate,
   329  		groupToServerResources:      map[string]*cacheEntry{},
   330  		receivedAggregatedDiscovery: false,
   331  	}
   332  }
   333  

View as plain text