1
2
3
4 package otelhttp
5
6 import (
7 "context"
8 "io"
9 "net/http"
10 "net/http/httptrace"
11 "sync/atomic"
12 "time"
13
14 "go.opentelemetry.io/otel/metric"
15
16 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil"
17 "go.opentelemetry.io/otel"
18 "go.opentelemetry.io/otel/codes"
19 "go.opentelemetry.io/otel/propagation"
20 "go.opentelemetry.io/otel/trace"
21
22 semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
23 )
24
25
26
27 type Transport struct {
28 rt http.RoundTripper
29
30 tracer trace.Tracer
31 meter metric.Meter
32 propagators propagation.TextMapPropagator
33 spanStartOptions []trace.SpanStartOption
34 filters []Filter
35 spanNameFormatter func(string, *http.Request) string
36 clientTrace func(context.Context) *httptrace.ClientTrace
37
38 requestBytesCounter metric.Int64Counter
39 responseBytesCounter metric.Int64Counter
40 latencyMeasure metric.Float64Histogram
41 }
42
43 var _ http.RoundTripper = &Transport{}
44
45
46
47
48
49
50
51 func NewTransport(base http.RoundTripper, opts ...Option) *Transport {
52 if base == nil {
53 base = http.DefaultTransport
54 }
55
56 t := Transport{
57 rt: base,
58 }
59
60 defaultOpts := []Option{
61 WithSpanOptions(trace.WithSpanKind(trace.SpanKindClient)),
62 WithSpanNameFormatter(defaultTransportFormatter),
63 }
64
65 c := newConfig(append(defaultOpts, opts...)...)
66 t.applyConfig(c)
67 t.createMeasures()
68
69 return &t
70 }
71
72 func (t *Transport) applyConfig(c *config) {
73 t.tracer = c.Tracer
74 t.meter = c.Meter
75 t.propagators = c.Propagators
76 t.spanStartOptions = c.SpanStartOptions
77 t.filters = c.Filters
78 t.spanNameFormatter = c.SpanNameFormatter
79 t.clientTrace = c.ClientTrace
80 }
81
82 func (t *Transport) createMeasures() {
83 var err error
84 t.requestBytesCounter, err = t.meter.Int64Counter(
85 clientRequestSize,
86 metric.WithUnit("By"),
87 metric.WithDescription("Measures the size of HTTP request messages."),
88 )
89 handleErr(err)
90
91 t.responseBytesCounter, err = t.meter.Int64Counter(
92 clientResponseSize,
93 metric.WithUnit("By"),
94 metric.WithDescription("Measures the size of HTTP response messages."),
95 )
96 handleErr(err)
97
98 t.latencyMeasure, err = t.meter.Float64Histogram(
99 clientDuration,
100 metric.WithUnit("ms"),
101 metric.WithDescription("Measures the duration of outbound HTTP requests."),
102 )
103 handleErr(err)
104 }
105
106 func defaultTransportFormatter(_ string, r *http.Request) string {
107 return "HTTP " + r.Method
108 }
109
110
111
112
113 func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
114 requestStartTime := time.Now()
115 for _, f := range t.filters {
116 if !f(r) {
117
118 return t.rt.RoundTrip(r)
119 }
120 }
121
122 tracer := t.tracer
123
124 if tracer == nil {
125 if span := trace.SpanFromContext(r.Context()); span.SpanContext().IsValid() {
126 tracer = newTracer(span.TracerProvider())
127 } else {
128 tracer = newTracer(otel.GetTracerProvider())
129 }
130 }
131
132 opts := append([]trace.SpanStartOption{}, t.spanStartOptions...)
133
134 ctx, span := tracer.Start(r.Context(), t.spanNameFormatter("", r), opts...)
135
136 if t.clientTrace != nil {
137 ctx = httptrace.WithClientTrace(ctx, t.clientTrace(ctx))
138 }
139
140 labeler := &Labeler{}
141 ctx = injectLabeler(ctx, labeler)
142
143 r = r.Clone(ctx)
144
145
146 var bw bodyWrapper
147
148
149
150 if r.Body != nil && r.Body != http.NoBody {
151 bw.ReadCloser = r.Body
152
153 bw.record = func(int64) {}
154 r.Body = &bw
155 }
156
157 span.SetAttributes(semconvutil.HTTPClientRequest(r)...)
158 t.propagators.Inject(ctx, propagation.HeaderCarrier(r.Header))
159
160 res, err := t.rt.RoundTrip(r)
161 if err != nil {
162 span.RecordError(err)
163 span.SetStatus(codes.Error, err.Error())
164 span.End()
165 return res, err
166 }
167
168
169 metricAttrs := append(labeler.Get(), semconvutil.HTTPClientRequestMetrics(r)...)
170 if res.StatusCode > 0 {
171 metricAttrs = append(metricAttrs, semconv.HTTPStatusCode(res.StatusCode))
172 }
173 o := metric.WithAttributes(metricAttrs...)
174 t.requestBytesCounter.Add(ctx, bw.read.Load(), o)
175
176 readRecordFunc := func(n int64) {
177 t.responseBytesCounter.Add(ctx, n, o)
178 }
179
180
181 span.SetAttributes(semconvutil.HTTPClientResponse(res)...)
182 span.SetStatus(semconvutil.HTTPClientStatus(res.StatusCode))
183
184 res.Body = newWrappedBody(span, readRecordFunc, res.Body)
185
186
187 elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
188
189 t.latencyMeasure.Record(ctx, elapsedTime, o)
190
191 return res, err
192 }
193
194
195
196
197 func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
198
199
200
201 if _, ok := body.(io.ReadWriteCloser); ok {
202 return &wrappedBody{span: span, record: record, body: body}
203 }
204
205
206
207 return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
208 }
209
210
211
212
213
214
215
216
217
218 type wrappedBody struct {
219 span trace.Span
220 recorded atomic.Bool
221 record func(n int64)
222 body io.ReadCloser
223 read atomic.Int64
224 }
225
226 var _ io.ReadWriteCloser = &wrappedBody{}
227
228 func (wb *wrappedBody) Write(p []byte) (int, error) {
229
230 n, err := wb.body.(io.Writer).Write(p)
231 if err != nil {
232 wb.span.RecordError(err)
233 wb.span.SetStatus(codes.Error, err.Error())
234 }
235 return n, err
236 }
237
238 func (wb *wrappedBody) Read(b []byte) (int, error) {
239 n, err := wb.body.Read(b)
240
241 wb.read.Add(int64(n))
242
243 switch err {
244 case nil:
245
246 case io.EOF:
247 wb.recordBytesRead()
248 wb.span.End()
249 default:
250 wb.span.RecordError(err)
251 wb.span.SetStatus(codes.Error, err.Error())
252 }
253 return n, err
254 }
255
256
257 func (wb *wrappedBody) recordBytesRead() {
258
259
260
261
262 if wb.recorded.CompareAndSwap(false, true) {
263
264 wb.record(wb.read.Load())
265 }
266 }
267
268 func (wb *wrappedBody) Close() error {
269 wb.recordBytesRead()
270 wb.span.End()
271 if wb.body != nil {
272 return wb.body.Close()
273 }
274 return nil
275 }
276
View as plain text