...

Source file src/k8s.io/component-base/metrics/prometheusextension/weighted_histogram.go

Documentation: k8s.io/component-base/metrics/prometheusextension

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package prometheusextension
    18  
    19  import (
    20  	"fmt"
    21  	"math"
    22  	"sort"
    23  	"sync"
    24  
    25  	"github.com/prometheus/client_golang/prometheus"
    26  	dto "github.com/prometheus/client_model/go"
    27  )
    28  
    29  // WeightedHistogram generalizes Histogram: each observation has
    30  // an associated _weight_. For a given `x` and `N`,
    31  // `1` call on `ObserveWithWeight(x, N)` has the same meaning as
    32  // `N` calls on `ObserveWithWeight(x, 1)`.
    33  // The weighted sum might differ slightly due to the use of
    34  // floating point, although the implementation takes some steps
    35  // to mitigate that.
    36  // If every weight were 1,
    37  // this would be the same as the existing Histogram abstraction.
    38  type WeightedHistogram interface {
    39  	prometheus.Metric
    40  	prometheus.Collector
    41  	WeightedObserver
    42  }
    43  
    44  // WeightedObserver generalizes the Observer interface.
    45  type WeightedObserver interface {
    46  	// Set the variable to the given value with the given weight.
    47  	ObserveWithWeight(value float64, weight uint64)
    48  }
    49  
    50  // WeightedHistogramOpts is the same as for an ordinary Histogram
    51  type WeightedHistogramOpts = prometheus.HistogramOpts
    52  
    53  // NewWeightedHistogram creates a new WeightedHistogram
    54  func NewWeightedHistogram(opts WeightedHistogramOpts) (WeightedHistogram, error) {
    55  	desc := prometheus.NewDesc(
    56  		prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
    57  		wrapWeightedHelp(opts.Help),
    58  		nil,
    59  		opts.ConstLabels,
    60  	)
    61  	return newWeightedHistogram(desc, opts)
    62  }
    63  
    64  func wrapWeightedHelp(given string) string {
    65  	return "EXPERIMENTAL: " + given
    66  }
    67  
    68  func newWeightedHistogram(desc *prometheus.Desc, opts WeightedHistogramOpts, variableLabelValues ...string) (*weightedHistogram, error) {
    69  	if len(opts.Buckets) == 0 {
    70  		opts.Buckets = prometheus.DefBuckets
    71  	}
    72  
    73  	for i, upperBound := range opts.Buckets {
    74  		if i < len(opts.Buckets)-1 {
    75  			if upperBound >= opts.Buckets[i+1] {
    76  				return nil, fmt.Errorf(
    77  					"histogram buckets must be in increasing order: %f >= %f",
    78  					upperBound, opts.Buckets[i+1],
    79  				)
    80  			}
    81  		} else {
    82  			if math.IsInf(upperBound, +1) {
    83  				// The +Inf bucket is implicit. Remove it here.
    84  				opts.Buckets = opts.Buckets[:i]
    85  			}
    86  		}
    87  	}
    88  	upperBounds := make([]float64, len(opts.Buckets))
    89  	copy(upperBounds, opts.Buckets)
    90  
    91  	return &weightedHistogram{
    92  		desc:                desc,
    93  		variableLabelValues: variableLabelValues,
    94  		upperBounds:         upperBounds,
    95  		buckets:             make([]uint64, len(upperBounds)+1),
    96  		hotCount:            initialHotCount,
    97  	}, nil
    98  }
    99  
   100  type weightedHistogram struct {
   101  	desc                *prometheus.Desc
   102  	variableLabelValues []string
   103  	upperBounds         []float64 // exclusive of +Inf
   104  
   105  	lock sync.Mutex // applies to all the following
   106  
   107  	// buckets is longer by one than upperBounds.
   108  	// For 0 <= idx < len(upperBounds), buckets[idx] holds the
   109  	// accumulated time.Duration that value has been <=
   110  	// upperBounds[idx] but not <= upperBounds[idx-1].
   111  	// buckets[len(upperBounds)] holds the accumulated
   112  	// time.Duration when value fit in no other bucket.
   113  	buckets []uint64
   114  
   115  	// sumHot + sumCold is the weighted sum of value.
   116  	// Rather than risk loss of precision in one
   117  	// float64, we do this sum hierarchically.  Many successive
   118  	// increments are added into sumHot; once in a while
   119  	// the magnitude of sumHot is compared to the magnitude
   120  	// of sumCold and, if the ratio is high enough,
   121  	// sumHot is transferred into sumCold.
   122  	sumHot  float64
   123  	sumCold float64
   124  
   125  	transferThreshold float64 // = math.Abs(sumCold) / 2^26 (that's about half of the bits of precision in a float64)
   126  
   127  	// hotCount is used to decide when to consider dumping sumHot into sumCold.
   128  	// hotCount counts upward from initialHotCount to zero.
   129  	hotCount int
   130  }
   131  
   132  // initialHotCount is the negative of the number of terms
   133  // that are summed into sumHot before considering whether
   134  // to transfer to sumCold.  This only has to be big enough
   135  // to make the extra floating point operations occur in a
   136  // distinct minority of cases.
   137  const initialHotCount = -15
   138  
   139  var _ WeightedHistogram = &weightedHistogram{}
   140  var _ prometheus.Metric = &weightedHistogram{}
   141  var _ prometheus.Collector = &weightedHistogram{}
   142  
   143  func (sh *weightedHistogram) ObserveWithWeight(value float64, weight uint64) {
   144  	idx := sort.SearchFloat64s(sh.upperBounds, value)
   145  	sh.lock.Lock()
   146  	defer sh.lock.Unlock()
   147  	sh.updateLocked(idx, value, weight)
   148  }
   149  
   150  func (sh *weightedHistogram) observeWithWeightLocked(value float64, weight uint64) {
   151  	idx := sort.SearchFloat64s(sh.upperBounds, value)
   152  	sh.updateLocked(idx, value, weight)
   153  }
   154  
   155  func (sh *weightedHistogram) updateLocked(idx int, value float64, weight uint64) {
   156  	sh.buckets[idx] += weight
   157  	newSumHot := sh.sumHot + float64(weight)*value
   158  	sh.hotCount++
   159  	if sh.hotCount >= 0 {
   160  		sh.hotCount = initialHotCount
   161  		if math.Abs(newSumHot) > sh.transferThreshold {
   162  			newSumCold := sh.sumCold + newSumHot
   163  			sh.sumCold = newSumCold
   164  			sh.transferThreshold = math.Abs(newSumCold / 67108864)
   165  			sh.sumHot = 0
   166  			return
   167  		}
   168  	}
   169  	sh.sumHot = newSumHot
   170  }
   171  
   172  func (sh *weightedHistogram) Desc() *prometheus.Desc {
   173  	return sh.desc
   174  }
   175  
   176  func (sh *weightedHistogram) Write(dest *dto.Metric) error {
   177  	count, sum, buckets := func() (uint64, float64, map[float64]uint64) {
   178  		sh.lock.Lock()
   179  		defer sh.lock.Unlock()
   180  		nBounds := len(sh.upperBounds)
   181  		buckets := make(map[float64]uint64, nBounds)
   182  		var count uint64
   183  		for idx, upperBound := range sh.upperBounds {
   184  			count += sh.buckets[idx]
   185  			buckets[upperBound] = count
   186  		}
   187  		count += sh.buckets[nBounds]
   188  		return count, sh.sumHot + sh.sumCold, buckets
   189  	}()
   190  	metric, err := prometheus.NewConstHistogram(sh.desc, count, sum, buckets, sh.variableLabelValues...)
   191  	if err != nil {
   192  		return err
   193  	}
   194  	return metric.Write(dest)
   195  }
   196  
   197  func (sh *weightedHistogram) Describe(ch chan<- *prometheus.Desc) {
   198  	ch <- sh.desc
   199  }
   200  
   201  func (sh *weightedHistogram) Collect(ch chan<- prometheus.Metric) {
   202  	ch <- sh
   203  }
   204  

View as plain text