...

Source file src/k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics/cache.go

Documentation: k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package metrics
    18  
    19  import (
    20  	"math"
    21  	"sync"
    22  
    23  	"k8s.io/apimachinery/pkg/types"
    24  	endpointsliceutil "k8s.io/endpointslice/util"
    25  )
    26  
    27  // NewCache returns a new Cache with the specified endpointsPerSlice.
    28  func NewCache(endpointsPerSlice int32) *Cache {
    29  	return &Cache{
    30  		maxEndpointsPerSlice: endpointsPerSlice,
    31  		cache:                map[types.NamespacedName]*EndpointPortCache{},
    32  	}
    33  }
    34  
    35  // Cache tracks values for total numbers of desired endpoints as well as the
    36  // efficiency of EndpointSlice endpoints distribution.
    37  type Cache struct {
    38  	// maxEndpointsPerSlice references the maximum number of endpoints that
    39  	// should be added to an EndpointSlice.
    40  	maxEndpointsPerSlice int32
    41  
    42  	// lock protects changes to numEndpoints and cache.
    43  	lock sync.Mutex
    44  	// numEndpoints represents the total number of endpoints stored in
    45  	// EndpointSlices.
    46  	numEndpoints int
    47  	// cache stores a EndpointPortCache grouped by NamespacedNames representing
    48  	// Services.
    49  	cache map[types.NamespacedName]*EndpointPortCache
    50  }
    51  
    52  // EndpointPortCache tracks values for total numbers of desired endpoints as well
    53  // as the efficiency of EndpointSlice endpoints distribution for each unique
    54  // Service Port combination.
    55  type EndpointPortCache struct {
    56  	items map[endpointsliceutil.PortMapKey]EfficiencyInfo
    57  }
    58  
    59  // EfficiencyInfo stores the number of Endpoints and Slices for calculating
    60  // total numbers of desired endpoints and the efficiency of EndpointSlice
    61  // endpoints distribution.
    62  type EfficiencyInfo struct {
    63  	Endpoints int
    64  	Slices    int
    65  }
    66  
    67  // NewEndpointPortCache initializes and returns a new EndpointPortCache.
    68  func NewEndpointPortCache() *EndpointPortCache {
    69  	return &EndpointPortCache{
    70  		items: map[endpointsliceutil.PortMapKey]EfficiencyInfo{},
    71  	}
    72  }
    73  
    74  // Set updates the EndpointPortCache to contain the provided EfficiencyInfo
    75  // for the provided PortMapKey.
    76  func (spc *EndpointPortCache) Set(pmKey endpointsliceutil.PortMapKey, eInfo EfficiencyInfo) {
    77  	spc.items[pmKey] = eInfo
    78  }
    79  
    80  // numEndpoints returns the total number of endpoints represented by a
    81  // EndpointPortCache.
    82  func (spc *EndpointPortCache) numEndpoints() int {
    83  	num := 0
    84  	for _, eInfo := range spc.items {
    85  		num += eInfo.Endpoints
    86  	}
    87  	return num
    88  }
    89  
    90  // UpdateEndpointPortCache updates a EndpointPortCache in the global cache for a
    91  // given Service and updates the corresponding metrics.
    92  // Parameters:
    93  // * endpointsNN refers to a NamespacedName representing the Endpoints resource.
    94  // * epCache refers to a EndpointPortCache for the specified Endpoints reosource.
    95  func (c *Cache) UpdateEndpointPortCache(endpointsNN types.NamespacedName, epCache *EndpointPortCache) {
    96  	c.lock.Lock()
    97  	defer c.lock.Unlock()
    98  
    99  	prevNumEndpoints := 0
   100  	if existingEPCache, ok := c.cache[endpointsNN]; ok {
   101  		prevNumEndpoints = existingEPCache.numEndpoints()
   102  	}
   103  
   104  	currNumEndpoints := epCache.numEndpoints()
   105  	// To keep numEndpoints up to date, add the difference between the number of
   106  	// endpoints in the provided spCache and any spCache it might be replacing.
   107  	c.numEndpoints = c.numEndpoints + currNumEndpoints - prevNumEndpoints
   108  
   109  	c.cache[endpointsNN] = epCache
   110  	c.updateMetrics()
   111  }
   112  
   113  // DeleteEndpoints removes references to an Endpoints resource from the global
   114  // cache and updates the corresponding metrics.
   115  func (c *Cache) DeleteEndpoints(endpointsNN types.NamespacedName) {
   116  	c.lock.Lock()
   117  	defer c.lock.Unlock()
   118  
   119  	if spCache, ok := c.cache[endpointsNN]; ok {
   120  		c.numEndpoints = c.numEndpoints - spCache.numEndpoints()
   121  		delete(c.cache, endpointsNN)
   122  		c.updateMetrics()
   123  	}
   124  }
   125  
   126  // metricsUpdate stores a desired and actual number of EndpointSlices.
   127  type metricsUpdate struct {
   128  	desired, actual int
   129  }
   130  
   131  // desiredAndActualSlices returns a metricsUpdate with the desired and actual
   132  // number of EndpointSlices given the current values in the cache.
   133  // Must be called holding lock.
   134  func (c *Cache) desiredAndActualSlices() metricsUpdate {
   135  	mUpdate := metricsUpdate{}
   136  	for _, spCache := range c.cache {
   137  		for _, eInfo := range spCache.items {
   138  			mUpdate.actual += eInfo.Slices
   139  			mUpdate.desired += numDesiredSlices(eInfo.Endpoints, int(c.maxEndpointsPerSlice))
   140  		}
   141  	}
   142  	return mUpdate
   143  }
   144  
   145  // updateMetrics updates metrics with the values from this Cache.
   146  // Must be called holding lock.
   147  func (c *Cache) updateMetrics() {
   148  	mUpdate := c.desiredAndActualSlices()
   149  	NumEndpointSlices.WithLabelValues().Set(float64(mUpdate.actual))
   150  	DesiredEndpointSlices.WithLabelValues().Set(float64(mUpdate.desired))
   151  	EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))
   152  }
   153  
   154  // numDesiredSlices calculates the number of EndpointSlices that would exist
   155  // with ideal endpoint distribution.
   156  func numDesiredSlices(numEndpoints, maxPerSlice int) int {
   157  	return int(math.Ceil(float64(numEndpoints) / float64(maxPerSlice)))
   158  }
   159  

View as plain text