...

Source file src/go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/transport.go

Documentation: go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp

     1  // Copyright The OpenTelemetry Authors
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
     5  
     6  import (
     7  	"context"
     8  	"io"
     9  	"net/http"
    10  	"net/http/httptrace"
    11  	"sync/atomic"
    12  	"time"
    13  
    14  	"go.opentelemetry.io/otel/metric"
    15  
    16  	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil"
    17  	"go.opentelemetry.io/otel"
    18  	"go.opentelemetry.io/otel/codes"
    19  	"go.opentelemetry.io/otel/propagation"
    20  	"go.opentelemetry.io/otel/trace"
    21  
    22  	semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
    23  )
    24  
    25  // Transport implements the http.RoundTripper interface and wraps
    26  // outbound HTTP(S) requests with a span and enriches it with metrics.
    27  type Transport struct {
    28  	rt http.RoundTripper
    29  
    30  	tracer            trace.Tracer
    31  	meter             metric.Meter
    32  	propagators       propagation.TextMapPropagator
    33  	spanStartOptions  []trace.SpanStartOption
    34  	filters           []Filter
    35  	spanNameFormatter func(string, *http.Request) string
    36  	clientTrace       func(context.Context) *httptrace.ClientTrace
    37  
    38  	requestBytesCounter  metric.Int64Counter
    39  	responseBytesCounter metric.Int64Counter
    40  	latencyMeasure       metric.Float64Histogram
    41  }
    42  
    43  var _ http.RoundTripper = &Transport{}
    44  
    45  // NewTransport wraps the provided http.RoundTripper with one that
    46  // starts a span, injects the span context into the outbound request headers,
    47  // and enriches it with metrics.
    48  //
    49  // If the provided http.RoundTripper is nil, http.DefaultTransport will be used
    50  // as the base http.RoundTripper.
    51  func NewTransport(base http.RoundTripper, opts ...Option) *Transport {
    52  	if base == nil {
    53  		base = http.DefaultTransport
    54  	}
    55  
    56  	t := Transport{
    57  		rt: base,
    58  	}
    59  
    60  	defaultOpts := []Option{
    61  		WithSpanOptions(trace.WithSpanKind(trace.SpanKindClient)),
    62  		WithSpanNameFormatter(defaultTransportFormatter),
    63  	}
    64  
    65  	c := newConfig(append(defaultOpts, opts...)...)
    66  	t.applyConfig(c)
    67  	t.createMeasures()
    68  
    69  	return &t
    70  }
    71  
    72  func (t *Transport) applyConfig(c *config) {
    73  	t.tracer = c.Tracer
    74  	t.meter = c.Meter
    75  	t.propagators = c.Propagators
    76  	t.spanStartOptions = c.SpanStartOptions
    77  	t.filters = c.Filters
    78  	t.spanNameFormatter = c.SpanNameFormatter
    79  	t.clientTrace = c.ClientTrace
    80  }
    81  
    82  func (t *Transport) createMeasures() {
    83  	var err error
    84  	t.requestBytesCounter, err = t.meter.Int64Counter(
    85  		clientRequestSize,
    86  		metric.WithUnit("By"),
    87  		metric.WithDescription("Measures the size of HTTP request messages."),
    88  	)
    89  	handleErr(err)
    90  
    91  	t.responseBytesCounter, err = t.meter.Int64Counter(
    92  		clientResponseSize,
    93  		metric.WithUnit("By"),
    94  		metric.WithDescription("Measures the size of HTTP response messages."),
    95  	)
    96  	handleErr(err)
    97  
    98  	t.latencyMeasure, err = t.meter.Float64Histogram(
    99  		clientDuration,
   100  		metric.WithUnit("ms"),
   101  		metric.WithDescription("Measures the duration of outbound HTTP requests."),
   102  	)
   103  	handleErr(err)
   104  }
   105  
   106  func defaultTransportFormatter(_ string, r *http.Request) string {
   107  	return "HTTP " + r.Method
   108  }
   109  
   110  // RoundTrip creates a Span and propagates its context via the provided request's headers
   111  // before handing the request to the configured base RoundTripper. The created span will
   112  // end when the response body is closed or when a read from the body returns io.EOF.
   113  func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
   114  	requestStartTime := time.Now()
   115  	for _, f := range t.filters {
   116  		if !f(r) {
   117  			// Simply pass through to the base RoundTripper if a filter rejects the request
   118  			return t.rt.RoundTrip(r)
   119  		}
   120  	}
   121  
   122  	tracer := t.tracer
   123  
   124  	if tracer == nil {
   125  		if span := trace.SpanFromContext(r.Context()); span.SpanContext().IsValid() {
   126  			tracer = newTracer(span.TracerProvider())
   127  		} else {
   128  			tracer = newTracer(otel.GetTracerProvider())
   129  		}
   130  	}
   131  
   132  	opts := append([]trace.SpanStartOption{}, t.spanStartOptions...) // start with the configured options
   133  
   134  	ctx, span := tracer.Start(r.Context(), t.spanNameFormatter("", r), opts...)
   135  
   136  	if t.clientTrace != nil {
   137  		ctx = httptrace.WithClientTrace(ctx, t.clientTrace(ctx))
   138  	}
   139  
   140  	labeler := &Labeler{}
   141  	ctx = injectLabeler(ctx, labeler)
   142  
   143  	r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request.
   144  
   145  	// use a body wrapper to determine the request size
   146  	var bw bodyWrapper
   147  	// if request body is nil or NoBody, we don't want to mutate the body as it
   148  	// will affect the identity of it in an unforeseeable way because we assert
   149  	// ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
   150  	if r.Body != nil && r.Body != http.NoBody {
   151  		bw.ReadCloser = r.Body
   152  		// noop to prevent nil panic. not using this record fun yet.
   153  		bw.record = func(int64) {}
   154  		r.Body = &bw
   155  	}
   156  
   157  	span.SetAttributes(semconvutil.HTTPClientRequest(r)...)
   158  	t.propagators.Inject(ctx, propagation.HeaderCarrier(r.Header))
   159  
   160  	res, err := t.rt.RoundTrip(r)
   161  	if err != nil {
   162  		span.RecordError(err)
   163  		span.SetStatus(codes.Error, err.Error())
   164  		span.End()
   165  		return res, err
   166  	}
   167  
   168  	// metrics
   169  	metricAttrs := append(labeler.Get(), semconvutil.HTTPClientRequestMetrics(r)...)
   170  	if res.StatusCode > 0 {
   171  		metricAttrs = append(metricAttrs, semconv.HTTPStatusCode(res.StatusCode))
   172  	}
   173  	o := metric.WithAttributes(metricAttrs...)
   174  	t.requestBytesCounter.Add(ctx, bw.read.Load(), o)
   175  	// For handling response bytes we leverage a callback when the client reads the http response
   176  	readRecordFunc := func(n int64) {
   177  		t.responseBytesCounter.Add(ctx, n, o)
   178  	}
   179  
   180  	// traces
   181  	span.SetAttributes(semconvutil.HTTPClientResponse(res)...)
   182  	span.SetStatus(semconvutil.HTTPClientStatus(res.StatusCode))
   183  
   184  	res.Body = newWrappedBody(span, readRecordFunc, res.Body)
   185  
   186  	// Use floating point division here for higher precision (instead of Millisecond method).
   187  	elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
   188  
   189  	t.latencyMeasure.Record(ctx, elapsedTime, o)
   190  
   191  	return res, err
   192  }
   193  
   194  // newWrappedBody returns a new and appropriately scoped *wrappedBody as an
   195  // io.ReadCloser. If the passed body implements io.Writer, the returned value
   196  // will implement io.ReadWriteCloser.
   197  func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
   198  	// The successful protocol switch responses will have a body that
   199  	// implement an io.ReadWriteCloser. Ensure this interface type continues
   200  	// to be satisfied if that is the case.
   201  	if _, ok := body.(io.ReadWriteCloser); ok {
   202  		return &wrappedBody{span: span, record: record, body: body}
   203  	}
   204  
   205  	// Remove the implementation of the io.ReadWriteCloser and only implement
   206  	// the io.ReadCloser.
   207  	return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
   208  }
   209  
   210  // wrappedBody is the response body type returned by the transport
   211  // instrumentation to complete a span. Errors encountered when using the
   212  // response body are recorded in span tracking the response.
   213  //
   214  // The span tracking the response is ended when this body is closed.
   215  //
   216  // If the response body implements the io.Writer interface (i.e. for
   217  // successful protocol switches), the wrapped body also will.
   218  type wrappedBody struct {
   219  	span     trace.Span
   220  	recorded atomic.Bool
   221  	record   func(n int64)
   222  	body     io.ReadCloser
   223  	read     atomic.Int64
   224  }
   225  
   226  var _ io.ReadWriteCloser = &wrappedBody{}
   227  
   228  func (wb *wrappedBody) Write(p []byte) (int, error) {
   229  	// This will not panic given the guard in newWrappedBody.
   230  	n, err := wb.body.(io.Writer).Write(p)
   231  	if err != nil {
   232  		wb.span.RecordError(err)
   233  		wb.span.SetStatus(codes.Error, err.Error())
   234  	}
   235  	return n, err
   236  }
   237  
   238  func (wb *wrappedBody) Read(b []byte) (int, error) {
   239  	n, err := wb.body.Read(b)
   240  	// Record the number of bytes read
   241  	wb.read.Add(int64(n))
   242  
   243  	switch err {
   244  	case nil:
   245  		// nothing to do here but fall through to the return
   246  	case io.EOF:
   247  		wb.recordBytesRead()
   248  		wb.span.End()
   249  	default:
   250  		wb.span.RecordError(err)
   251  		wb.span.SetStatus(codes.Error, err.Error())
   252  	}
   253  	return n, err
   254  }
   255  
   256  // recordBytesRead is a function that ensures the number of bytes read is recorded once and only once.
   257  func (wb *wrappedBody) recordBytesRead() {
   258  	// note: it is more performant (and equally correct) to use atomic.Bool over sync.Once here. In the event that
   259  	// two goroutines are racing to call this method, the number of bytes read will no longer increase. Using
   260  	// CompareAndSwap allows later goroutines to return quickly and not block waiting for the race winner to finish
   261  	// calling wb.record(wb.read.Load()).
   262  	if wb.recorded.CompareAndSwap(false, true) {
   263  		// Record the total number of bytes read
   264  		wb.record(wb.read.Load())
   265  	}
   266  }
   267  
   268  func (wb *wrappedBody) Close() error {
   269  	wb.recordBytesRead()
   270  	wb.span.End()
   271  	if wb.body != nil {
   272  		return wb.body.Close()
   273  	}
   274  	return nil
   275  }
   276  

View as plain text