...
1 package notifications
2
3 import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "sync"
9 "time"
10 )
11
12
13
14
15 type httpSink struct {
16 url string
17
18 mu sync.Mutex
19 closed bool
20 client *http.Client
21 listeners []httpStatusListener
22
23
24
25 }
26
27
28
29 func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport *http.Transport, listeners ...httpStatusListener) *httpSink {
30 if transport == nil {
31 transport = http.DefaultTransport.(*http.Transport)
32 }
33 return &httpSink{
34 url: u,
35 listeners: listeners,
36 client: &http.Client{
37 Transport: &headerRoundTripper{
38 Transport: transport,
39 headers: headers,
40 },
41 Timeout: timeout,
42 },
43 }
44 }
45
46
47 type httpStatusListener interface {
48 success(status int, events ...Event)
49 failure(status int, events ...Event)
50 err(err error, events ...Event)
51 }
52
53
54
55
56 func (hs *httpSink) Write(events ...Event) error {
57 hs.mu.Lock()
58 defer hs.mu.Unlock()
59 defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
60
61 if hs.closed {
62 return ErrSinkClosed
63 }
64
65 envelope := Envelope{
66 Events: events,
67 }
68
69
70
71
72
73 p, err := json.MarshalIndent(envelope, "", " ")
74 if err != nil {
75 for _, listener := range hs.listeners {
76 listener.err(err, events...)
77 }
78 return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
79 }
80
81 body := bytes.NewReader(p)
82 resp, err := hs.client.Post(hs.url, EventsMediaType, body)
83 if err != nil {
84 for _, listener := range hs.listeners {
85 listener.err(err, events...)
86 }
87
88 return fmt.Errorf("%v: error posting: %v", hs, err)
89 }
90 defer resp.Body.Close()
91
92
93
94 switch {
95 case resp.StatusCode >= 200 && resp.StatusCode < 400:
96 for _, listener := range hs.listeners {
97 listener.success(resp.StatusCode, events...)
98 }
99
100
101
102
103
104 return nil
105 default:
106 for _, listener := range hs.listeners {
107 listener.failure(resp.StatusCode, events...)
108 }
109 return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
110 }
111 }
112
113
114 func (hs *httpSink) Close() error {
115 hs.mu.Lock()
116 defer hs.mu.Unlock()
117
118 if hs.closed {
119 return fmt.Errorf("httpsink: already closed")
120 }
121
122 hs.closed = true
123 return nil
124 }
125
126 func (hs *httpSink) String() string {
127 return fmt.Sprintf("httpSink{%s}", hs.url)
128 }
129
130 type headerRoundTripper struct {
131 *http.Transport
132 headers http.Header
133 }
134
135 func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
136 var nreq = *req
137 nreq.Header = make(http.Header)
138
139 merge := func(headers http.Header) {
140 for k, v := range headers {
141 nreq.Header[k] = append(nreq.Header[k], v...)
142 }
143 }
144
145 merge(req.Header)
146 merge(hrt.headers)
147
148 return hrt.Transport.RoundTrip(&nreq)
149 }
150
View as plain text