...

Source file src/github.com/docker/distribution/notifications/metrics.go

Documentation: github.com/docker/distribution/notifications

     1  package notifications
     2  
     3  import (
     4  	"expvar"
     5  	"fmt"
     6  	"net/http"
     7  	"sync"
     8  )
     9  
    10  // EndpointMetrics track various actions taken by the endpoint, typically by
    11  // number of events. The goal of this to export it via expvar but we may find
    12  // some other future solution to be better.
    13  type EndpointMetrics struct {
    14  	Pending   int            // events pending in queue
    15  	Events    int            // total events incoming
    16  	Successes int            // total events written successfully
    17  	Failures  int            // total events failed
    18  	Errors    int            // total events errored
    19  	Statuses  map[string]int // status code histogram, per call event
    20  }
    21  
    22  // safeMetrics guards the metrics implementation with a lock and provides a
    23  // safe update function.
    24  type safeMetrics struct {
    25  	EndpointMetrics
    26  	sync.Mutex // protects statuses map
    27  }
    28  
    29  // newSafeMetrics returns safeMetrics with map allocated.
    30  func newSafeMetrics() *safeMetrics {
    31  	var sm safeMetrics
    32  	sm.Statuses = make(map[string]int)
    33  	return &sm
    34  }
    35  
    36  // httpStatusListener returns the listener for the http sink that updates the
    37  // relevant counters.
    38  func (sm *safeMetrics) httpStatusListener() httpStatusListener {
    39  	return &endpointMetricsHTTPStatusListener{
    40  		safeMetrics: sm,
    41  	}
    42  }
    43  
    44  // eventQueueListener returns a listener that maintains queue related counters.
    45  func (sm *safeMetrics) eventQueueListener() eventQueueListener {
    46  	return &endpointMetricsEventQueueListener{
    47  		safeMetrics: sm,
    48  	}
    49  }
    50  
    51  // endpointMetricsHTTPStatusListener increments counters related to http sinks
    52  // for the relevant events.
    53  type endpointMetricsHTTPStatusListener struct {
    54  	*safeMetrics
    55  }
    56  
    57  var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
    58  
    59  func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
    60  	emsl.safeMetrics.Lock()
    61  	defer emsl.safeMetrics.Unlock()
    62  	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
    63  	emsl.Successes += len(events)
    64  }
    65  
    66  func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
    67  	emsl.safeMetrics.Lock()
    68  	defer emsl.safeMetrics.Unlock()
    69  	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
    70  	emsl.Failures += len(events)
    71  }
    72  
    73  func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
    74  	emsl.safeMetrics.Lock()
    75  	defer emsl.safeMetrics.Unlock()
    76  	emsl.Errors += len(events)
    77  }
    78  
    79  // endpointMetricsEventQueueListener maintains the incoming events counter and
    80  // the queues pending count.
    81  type endpointMetricsEventQueueListener struct {
    82  	*safeMetrics
    83  }
    84  
    85  func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
    86  	eqc.Lock()
    87  	defer eqc.Unlock()
    88  	eqc.Events += len(events)
    89  	eqc.Pending += len(events)
    90  }
    91  
    92  func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
    93  	eqc.Lock()
    94  	defer eqc.Unlock()
    95  	eqc.Pending -= len(events)
    96  }
    97  
    98  // endpoints is global registry of endpoints used to report metrics to expvar
    99  var endpoints struct {
   100  	registered []*Endpoint
   101  	mu         sync.Mutex
   102  }
   103  
   104  // register places the endpoint into expvar so that stats are tracked.
   105  func register(e *Endpoint) {
   106  	endpoints.mu.Lock()
   107  	defer endpoints.mu.Unlock()
   108  
   109  	endpoints.registered = append(endpoints.registered, e)
   110  }
   111  
   112  func init() {
   113  	// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
   114  	// Ideally, we do more metrics through logging but we need some nice
   115  	// realtime metrics for queue state for now.
   116  
   117  	registry := expvar.Get("registry")
   118  
   119  	if registry == nil {
   120  		registry = expvar.NewMap("registry")
   121  	}
   122  
   123  	var notifications expvar.Map
   124  	notifications.Init()
   125  	notifications.Set("endpoints", expvar.Func(func() interface{} {
   126  		endpoints.mu.Lock()
   127  		defer endpoints.mu.Unlock()
   128  
   129  		var names []interface{}
   130  		for _, v := range endpoints.registered {
   131  			var epjson struct {
   132  				Name string `json:"name"`
   133  				URL  string `json:"url"`
   134  				EndpointConfig
   135  
   136  				Metrics EndpointMetrics
   137  			}
   138  
   139  			epjson.Name = v.Name()
   140  			epjson.URL = v.URL()
   141  			epjson.EndpointConfig = v.EndpointConfig
   142  
   143  			v.ReadMetrics(&epjson.Metrics)
   144  
   145  			names = append(names, epjson)
   146  		}
   147  
   148  		return names
   149  	}))
   150  
   151  	registry.(*expvar.Map).Set("notifications", &notifications)
   152  }
   153  

View as plain text