...

Source file src/github.com/docker/distribution/notifications/endpoint.go

Documentation: github.com/docker/distribution/notifications

     1  package notifications
     2  
     3  import (
     4  	"net/http"
     5  	"time"
     6  
     7  	"github.com/docker/distribution/configuration"
     8  )
     9  
    10  // EndpointConfig covers the optional configuration parameters for an active
    11  // endpoint.
    12  type EndpointConfig struct {
    13  	Headers           http.Header
    14  	Timeout           time.Duration
    15  	Threshold         int
    16  	Backoff           time.Duration
    17  	IgnoredMediaTypes []string
    18  	Transport         *http.Transport `json:"-"`
    19  	Ignore            configuration.Ignore
    20  }
    21  
    22  // defaults set any zero-valued fields to a reasonable default.
    23  func (ec *EndpointConfig) defaults() {
    24  	if ec.Timeout <= 0 {
    25  		ec.Timeout = time.Second
    26  	}
    27  
    28  	if ec.Threshold <= 0 {
    29  		ec.Threshold = 10
    30  	}
    31  
    32  	if ec.Backoff <= 0 {
    33  		ec.Backoff = time.Second
    34  	}
    35  
    36  	if ec.Transport == nil {
    37  		ec.Transport = http.DefaultTransport.(*http.Transport)
    38  	}
    39  }
    40  
    41  // Endpoint is a reliable, queued, thread-safe sink that notify external http
    42  // services when events are written. Writes are non-blocking and always
    43  // succeed for callers but events may be queued internally.
    44  type Endpoint struct {
    45  	Sink
    46  	url  string
    47  	name string
    48  
    49  	EndpointConfig
    50  
    51  	metrics *safeMetrics
    52  }
    53  
    54  // NewEndpoint returns a running endpoint, ready to receive events.
    55  func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
    56  	var endpoint Endpoint
    57  	endpoint.name = name
    58  	endpoint.url = url
    59  	endpoint.EndpointConfig = config
    60  	endpoint.defaults()
    61  	endpoint.metrics = newSafeMetrics()
    62  
    63  	// Configures the inmemory queue, retry, http pipeline.
    64  	endpoint.Sink = newHTTPSink(
    65  		endpoint.url, endpoint.Timeout, endpoint.Headers,
    66  		endpoint.Transport, endpoint.metrics.httpStatusListener())
    67  	endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
    68  	endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
    69  	mediaTypes := append(config.Ignore.MediaTypes, config.IgnoredMediaTypes...)
    70  	endpoint.Sink = newIgnoredSink(endpoint.Sink, mediaTypes, config.Ignore.Actions)
    71  
    72  	register(&endpoint)
    73  	return &endpoint
    74  }
    75  
    76  // Name returns the name of the endpoint, generally used for debugging.
    77  func (e *Endpoint) Name() string {
    78  	return e.name
    79  }
    80  
    81  // URL returns the url of the endpoint.
    82  func (e *Endpoint) URL() string {
    83  	return e.url
    84  }
    85  
    86  // ReadMetrics populates em with metrics from the endpoint.
    87  func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
    88  	e.metrics.Lock()
    89  	defer e.metrics.Unlock()
    90  
    91  	*em = e.metrics.EndpointMetrics
    92  	// Map still need to copied in a threadsafe manner.
    93  	em.Statuses = make(map[string]int)
    94  	for k, v := range e.metrics.Statuses {
    95  		em.Statuses[k] = v
    96  	}
    97  }
    98  

View as plain text