...

Source file src/github.com/docker/distribution/notifications/http.go

Documentation: github.com/docker/distribution/notifications

     1  package notifications
     2  
     3  import (
     4  	"bytes"
     5  	"encoding/json"
     6  	"fmt"
     7  	"net/http"
     8  	"sync"
     9  	"time"
    10  )
    11  
    12  // httpSink implements a single-flight, http notification endpoint. This is
    13  // very lightweight in that it only makes an attempt at an http request.
    14  // Reliability should be provided by the caller.
    15  type httpSink struct {
    16  	url string
    17  
    18  	mu        sync.Mutex
    19  	closed    bool
    20  	client    *http.Client
    21  	listeners []httpStatusListener
    22  
    23  	// TODO(stevvooe): Allow one to configure the media type accepted by this
    24  	// sink and choose the serialization based on that.
    25  }
    26  
    27  // newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
    28  // sinks for increased reliability.
    29  func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport *http.Transport, listeners ...httpStatusListener) *httpSink {
    30  	if transport == nil {
    31  		transport = http.DefaultTransport.(*http.Transport)
    32  	}
    33  	return &httpSink{
    34  		url:       u,
    35  		listeners: listeners,
    36  		client: &http.Client{
    37  			Transport: &headerRoundTripper{
    38  				Transport: transport,
    39  				headers:   headers,
    40  			},
    41  			Timeout: timeout,
    42  		},
    43  	}
    44  }
    45  
    46  // httpStatusListener is called on various outcomes of sending notifications.
    47  type httpStatusListener interface {
    48  	success(status int, events ...Event)
    49  	failure(status int, events ...Event)
    50  	err(err error, events ...Event)
    51  }
    52  
    53  // Accept makes an attempt to notify the endpoint, returning an error if it
    54  // fails. It is the caller's responsibility to retry on error. The events are
    55  // accepted or rejected as a group.
    56  func (hs *httpSink) Write(events ...Event) error {
    57  	hs.mu.Lock()
    58  	defer hs.mu.Unlock()
    59  	defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
    60  
    61  	if hs.closed {
    62  		return ErrSinkClosed
    63  	}
    64  
    65  	envelope := Envelope{
    66  		Events: events,
    67  	}
    68  
    69  	// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
    70  	// retry but we are going to do it to keep the code simple. It is likely
    71  	// we could change the event struct to manage its own buffer.
    72  
    73  	p, err := json.MarshalIndent(envelope, "", "   ")
    74  	if err != nil {
    75  		for _, listener := range hs.listeners {
    76  			listener.err(err, events...)
    77  		}
    78  		return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
    79  	}
    80  
    81  	body := bytes.NewReader(p)
    82  	resp, err := hs.client.Post(hs.url, EventsMediaType, body)
    83  	if err != nil {
    84  		for _, listener := range hs.listeners {
    85  			listener.err(err, events...)
    86  		}
    87  
    88  		return fmt.Errorf("%v: error posting: %v", hs, err)
    89  	}
    90  	defer resp.Body.Close()
    91  
    92  	// The notifier will treat any 2xx or 3xx response as accepted by the
    93  	// endpoint.
    94  	switch {
    95  	case resp.StatusCode >= 200 && resp.StatusCode < 400:
    96  		for _, listener := range hs.listeners {
    97  			listener.success(resp.StatusCode, events...)
    98  		}
    99  
   100  		// TODO(stevvooe): This is a little accepting: we may want to support
   101  		// unsupported media type responses with retries using the correct
   102  		// media type. There may also be cases that will never work.
   103  
   104  		return nil
   105  	default:
   106  		for _, listener := range hs.listeners {
   107  			listener.failure(resp.StatusCode, events...)
   108  		}
   109  		return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
   110  	}
   111  }
   112  
   113  // Close the endpoint
   114  func (hs *httpSink) Close() error {
   115  	hs.mu.Lock()
   116  	defer hs.mu.Unlock()
   117  
   118  	if hs.closed {
   119  		return fmt.Errorf("httpsink: already closed")
   120  	}
   121  
   122  	hs.closed = true
   123  	return nil
   124  }
   125  
   126  func (hs *httpSink) String() string {
   127  	return fmt.Sprintf("httpSink{%s}", hs.url)
   128  }
   129  
   130  type headerRoundTripper struct {
   131  	*http.Transport // must be transport to support CancelRequest
   132  	headers         http.Header
   133  }
   134  
   135  func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
   136  	var nreq = *req
   137  	nreq.Header = make(http.Header)
   138  
   139  	merge := func(headers http.Header) {
   140  		for k, v := range headers {
   141  			nreq.Header[k] = append(nreq.Header[k], v...)
   142  		}
   143  	}
   144  
   145  	merge(req.Header)
   146  	merge(hrt.headers)
   147  
   148  	return hrt.Transport.RoundTrip(&nreq)
   149  }
   150  

View as plain text