...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package ochttp
16
17 import (
18 "context"
19 "io"
20 "net/http"
21 "strconv"
22 "sync"
23 "time"
24
25 "go.opencensus.io/stats"
26 "go.opencensus.io/tag"
27 )
28
29
30 type statsTransport struct {
31 base http.RoundTripper
32 }
33
34
35 func (t statsTransport) RoundTrip(req *http.Request) (*http.Response, error) {
36 ctx, _ := tag.New(req.Context(),
37 tag.Upsert(KeyClientHost, req.Host),
38 tag.Upsert(Host, req.Host),
39 tag.Upsert(KeyClientPath, req.URL.Path),
40 tag.Upsert(Path, req.URL.Path),
41 tag.Upsert(KeyClientMethod, req.Method),
42 tag.Upsert(Method, req.Method))
43 req = req.WithContext(ctx)
44 track := &tracker{
45 start: time.Now(),
46 ctx: ctx,
47 }
48 if req.Body == nil {
49
50 track.reqSize = -1
51 } else if req.ContentLength > 0 {
52 track.reqSize = req.ContentLength
53 }
54 stats.Record(ctx, ClientRequestCount.M(1))
55
56
57 resp, err := t.base.RoundTrip(req)
58
59 if err != nil {
60 track.statusCode = http.StatusInternalServerError
61 track.end()
62 } else {
63 track.statusCode = resp.StatusCode
64 if req.Method != "HEAD" {
65 track.respContentLength = resp.ContentLength
66 }
67 if resp.Body == nil {
68 track.end()
69 } else {
70 track.body = resp.Body
71 resp.Body = wrappedBody(track, resp.Body)
72 }
73 }
74 return resp, err
75 }
76
77
78 func (t statsTransport) CancelRequest(req *http.Request) {
79 type canceler interface {
80 CancelRequest(*http.Request)
81 }
82 if cr, ok := t.base.(canceler); ok {
83 cr.CancelRequest(req)
84 }
85 }
86
87 type tracker struct {
88 ctx context.Context
89 respSize int64
90 respContentLength int64
91 reqSize int64
92 start time.Time
93 body io.ReadCloser
94 statusCode int
95 endOnce sync.Once
96 }
97
98 var _ io.ReadCloser = (*tracker)(nil)
99
100 func (t *tracker) end() {
101 t.endOnce.Do(func() {
102 latencyMs := float64(time.Since(t.start)) / float64(time.Millisecond)
103 respSize := t.respSize
104 if t.respSize == 0 && t.respContentLength > 0 {
105 respSize = t.respContentLength
106 }
107 m := []stats.Measurement{
108 ClientSentBytes.M(t.reqSize),
109 ClientReceivedBytes.M(respSize),
110 ClientRoundtripLatency.M(latencyMs),
111 ClientLatency.M(latencyMs),
112 ClientResponseBytes.M(t.respSize),
113 }
114 if t.reqSize >= 0 {
115 m = append(m, ClientRequestBytes.M(t.reqSize))
116 }
117
118 stats.RecordWithTags(t.ctx, []tag.Mutator{
119 tag.Upsert(StatusCode, strconv.Itoa(t.statusCode)),
120 tag.Upsert(KeyClientStatus, strconv.Itoa(t.statusCode)),
121 }, m...)
122 })
123 }
124
125 func (t *tracker) Read(b []byte) (int, error) {
126 n, err := t.body.Read(b)
127 t.respSize += int64(n)
128 switch err {
129 case nil:
130 return n, nil
131 case io.EOF:
132 t.end()
133 }
134 return n, err
135 }
136
137 func (t *tracker) Close() error {
138
139
140
141 t.end()
142 return t.body.Close()
143 }
144
View as plain text