...

Source file src/k8s.io/client-go/util/workqueue/metrics.go

Documentation: k8s.io/client-go/util/workqueue

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package workqueue
    18  
    19  import (
    20  	"sync"
    21  	"time"
    22  
    23  	"k8s.io/utils/clock"
    24  )
    25  
    26  // This file provides abstractions for setting the provider (e.g., prometheus)
    27  // of metrics.
    28  
    29  type queueMetrics interface {
    30  	add(item t)
    31  	get(item t)
    32  	done(item t)
    33  	updateUnfinishedWork()
    34  }
    35  
    36  // GaugeMetric represents a single numerical value that can arbitrarily go up
    37  // and down.
    38  type GaugeMetric interface {
    39  	Inc()
    40  	Dec()
    41  }
    42  
    43  // SettableGaugeMetric represents a single numerical value that can arbitrarily go up
    44  // and down. (Separate from GaugeMetric to preserve backwards compatibility.)
    45  type SettableGaugeMetric interface {
    46  	Set(float64)
    47  }
    48  
    49  // CounterMetric represents a single numerical value that only ever
    50  // goes up.
    51  type CounterMetric interface {
    52  	Inc()
    53  }
    54  
    55  // SummaryMetric captures individual observations.
    56  type SummaryMetric interface {
    57  	Observe(float64)
    58  }
    59  
    60  // HistogramMetric counts individual observations.
    61  type HistogramMetric interface {
    62  	Observe(float64)
    63  }
    64  
    65  type noopMetric struct{}
    66  
    67  func (noopMetric) Inc()            {}
    68  func (noopMetric) Dec()            {}
    69  func (noopMetric) Set(float64)     {}
    70  func (noopMetric) Observe(float64) {}
    71  
    72  // defaultQueueMetrics expects the caller to lock before setting any metrics.
    73  type defaultQueueMetrics struct {
    74  	clock clock.Clock
    75  
    76  	// current depth of a workqueue
    77  	depth GaugeMetric
    78  	// total number of adds handled by a workqueue
    79  	adds CounterMetric
    80  	// how long an item stays in a workqueue
    81  	latency HistogramMetric
    82  	// how long processing an item from a workqueue takes
    83  	workDuration         HistogramMetric
    84  	addTimes             map[t]time.Time
    85  	processingStartTimes map[t]time.Time
    86  
    87  	// how long have current threads been working?
    88  	unfinishedWorkSeconds   SettableGaugeMetric
    89  	longestRunningProcessor SettableGaugeMetric
    90  }
    91  
    92  func (m *defaultQueueMetrics) add(item t) {
    93  	if m == nil {
    94  		return
    95  	}
    96  
    97  	m.adds.Inc()
    98  	m.depth.Inc()
    99  	if _, exists := m.addTimes[item]; !exists {
   100  		m.addTimes[item] = m.clock.Now()
   101  	}
   102  }
   103  
   104  func (m *defaultQueueMetrics) get(item t) {
   105  	if m == nil {
   106  		return
   107  	}
   108  
   109  	m.depth.Dec()
   110  	m.processingStartTimes[item] = m.clock.Now()
   111  	if startTime, exists := m.addTimes[item]; exists {
   112  		m.latency.Observe(m.sinceInSeconds(startTime))
   113  		delete(m.addTimes, item)
   114  	}
   115  }
   116  
   117  func (m *defaultQueueMetrics) done(item t) {
   118  	if m == nil {
   119  		return
   120  	}
   121  
   122  	if startTime, exists := m.processingStartTimes[item]; exists {
   123  		m.workDuration.Observe(m.sinceInSeconds(startTime))
   124  		delete(m.processingStartTimes, item)
   125  	}
   126  }
   127  
   128  func (m *defaultQueueMetrics) updateUnfinishedWork() {
   129  	// Note that a summary metric would be better for this, but prometheus
   130  	// doesn't seem to have non-hacky ways to reset the summary metrics.
   131  	var total float64
   132  	var oldest float64
   133  	for _, t := range m.processingStartTimes {
   134  		age := m.sinceInSeconds(t)
   135  		total += age
   136  		if age > oldest {
   137  			oldest = age
   138  		}
   139  	}
   140  	m.unfinishedWorkSeconds.Set(total)
   141  	m.longestRunningProcessor.Set(oldest)
   142  }
   143  
   144  type noMetrics struct{}
   145  
   146  func (noMetrics) add(item t)            {}
   147  func (noMetrics) get(item t)            {}
   148  func (noMetrics) done(item t)           {}
   149  func (noMetrics) updateUnfinishedWork() {}
   150  
   151  // Gets the time since the specified start in seconds.
   152  func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
   153  	return m.clock.Since(start).Seconds()
   154  }
   155  
   156  type retryMetrics interface {
   157  	retry()
   158  }
   159  
   160  type defaultRetryMetrics struct {
   161  	retries CounterMetric
   162  }
   163  
   164  func (m *defaultRetryMetrics) retry() {
   165  	if m == nil {
   166  		return
   167  	}
   168  
   169  	m.retries.Inc()
   170  }
   171  
   172  // MetricsProvider generates various metrics used by the queue.
   173  type MetricsProvider interface {
   174  	NewDepthMetric(name string) GaugeMetric
   175  	NewAddsMetric(name string) CounterMetric
   176  	NewLatencyMetric(name string) HistogramMetric
   177  	NewWorkDurationMetric(name string) HistogramMetric
   178  	NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
   179  	NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
   180  	NewRetriesMetric(name string) CounterMetric
   181  }
   182  
   183  type noopMetricsProvider struct{}
   184  
   185  func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
   186  	return noopMetric{}
   187  }
   188  
   189  func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
   190  	return noopMetric{}
   191  }
   192  
   193  func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
   194  	return noopMetric{}
   195  }
   196  
   197  func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
   198  	return noopMetric{}
   199  }
   200  
   201  func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
   202  	return noopMetric{}
   203  }
   204  
   205  func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
   206  	return noopMetric{}
   207  }
   208  
   209  func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
   210  	return noopMetric{}
   211  }
   212  
   213  var globalMetricsFactory = queueMetricsFactory{
   214  	metricsProvider: noopMetricsProvider{},
   215  }
   216  
   217  type queueMetricsFactory struct {
   218  	metricsProvider MetricsProvider
   219  
   220  	onlyOnce sync.Once
   221  }
   222  
   223  func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
   224  	f.onlyOnce.Do(func() {
   225  		f.metricsProvider = mp
   226  	})
   227  }
   228  
   229  func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
   230  	mp := f.metricsProvider
   231  	if len(name) == 0 || mp == (noopMetricsProvider{}) {
   232  		return noMetrics{}
   233  	}
   234  	return &defaultQueueMetrics{
   235  		clock:                   clock,
   236  		depth:                   mp.NewDepthMetric(name),
   237  		adds:                    mp.NewAddsMetric(name),
   238  		latency:                 mp.NewLatencyMetric(name),
   239  		workDuration:            mp.NewWorkDurationMetric(name),
   240  		unfinishedWorkSeconds:   mp.NewUnfinishedWorkSecondsMetric(name),
   241  		longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
   242  		addTimes:                map[t]time.Time{},
   243  		processingStartTimes:    map[t]time.Time{},
   244  	}
   245  }
   246  
   247  func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
   248  	var ret *defaultRetryMetrics
   249  	if len(name) == 0 {
   250  		return ret
   251  	}
   252  
   253  	if provider == nil {
   254  		provider = globalMetricsFactory.metricsProvider
   255  	}
   256  
   257  	return &defaultRetryMetrics{
   258  		retries: provider.NewRetriesMetric(name),
   259  	}
   260  }
   261  
   262  // SetProvider sets the metrics provider for all subsequently created work
   263  // queues. Only the first call has an effect.
   264  func SetProvider(metricsProvider MetricsProvider) {
   265  	globalMetricsFactory.setProvider(metricsProvider)
   266  }
   267  

View as plain text