...

Source file src/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/client_test.go

Documentation: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc

     1  // Copyright The OpenTelemetry Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //	http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package otlptracegrpc_test
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"net"
    22  	"strings"
    23  	"testing"
    24  	"time"
    25  
    26  	"github.com/stretchr/testify/assert"
    27  	"github.com/stretchr/testify/require"
    28  	"go.uber.org/goleak"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/encoding/gzip"
    32  	"google.golang.org/grpc/status"
    33  
    34  	"go.opentelemetry.io/otel"
    35  	"go.opentelemetry.io/otel/attribute"
    36  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
    37  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    38  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
    39  	sdktrace "go.opentelemetry.io/otel/sdk/trace"
    40  	"go.opentelemetry.io/otel/sdk/trace/tracetest"
    41  	coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
    42  	commonpb "go.opentelemetry.io/proto/otlp/common/v1"
    43  )
    44  
    45  func TestMain(m *testing.M) {
    46  	goleak.VerifyTestMain(m)
    47  }
    48  
    49  var roSpans = tracetest.SpanStubs{{Name: "Span 0"}}.Snapshots()
    50  
    51  func contextWithTimeout(parent context.Context, t *testing.T, timeout time.Duration) (context.Context, context.CancelFunc) {
    52  	d, ok := t.Deadline()
    53  	if !ok {
    54  		d = time.Now().Add(timeout)
    55  	} else {
    56  		d = d.Add(-1 * time.Millisecond)
    57  		now := time.Now()
    58  		if d.Sub(now) > timeout {
    59  			d = now.Add(timeout)
    60  		}
    61  	}
    62  	return context.WithDeadline(parent, d)
    63  }
    64  
    65  func TestNewEndToEnd(t *testing.T) {
    66  	tests := []struct {
    67  		name           string
    68  		additionalOpts []otlptracegrpc.Option
    69  	}{
    70  		{
    71  			name: "StandardExporter",
    72  		},
    73  		{
    74  			name: "WithCompressor",
    75  			additionalOpts: []otlptracegrpc.Option{
    76  				otlptracegrpc.WithCompressor(gzip.Name),
    77  			},
    78  		},
    79  		{
    80  			name: "WithServiceConfig",
    81  			additionalOpts: []otlptracegrpc.Option{
    82  				otlptracegrpc.WithServiceConfig("{}"),
    83  			},
    84  		},
    85  		{
    86  			name: "WithDialOptions",
    87  			additionalOpts: []otlptracegrpc.Option{
    88  				otlptracegrpc.WithDialOption(grpc.WithBlock()),
    89  			},
    90  		},
    91  	}
    92  
    93  	for _, test := range tests {
    94  		t.Run(test.name, func(t *testing.T) {
    95  			newExporterEndToEndTest(t, test.additionalOpts)
    96  		})
    97  	}
    98  }
    99  
   100  func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlptracegrpc.Option) *otlptrace.Exporter {
   101  	opts := []otlptracegrpc.Option{
   102  		otlptracegrpc.WithInsecure(),
   103  		otlptracegrpc.WithEndpoint(endpoint),
   104  		otlptracegrpc.WithReconnectionPeriod(50 * time.Millisecond),
   105  	}
   106  
   107  	opts = append(opts, additionalOpts...)
   108  	client := otlptracegrpc.NewClient(opts...)
   109  	exp, err := otlptrace.New(ctx, client)
   110  	if err != nil {
   111  		t.Fatalf("failed to create a new collector exporter: %v", err)
   112  	}
   113  	return exp
   114  }
   115  
   116  func newExporterEndToEndTest(t *testing.T, additionalOpts []otlptracegrpc.Option) {
   117  	mc := runMockCollector(t)
   118  
   119  	ctx := context.Background()
   120  	exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...)
   121  	t.Cleanup(func() {
   122  		ctx, cancel := contextWithTimeout(ctx, t, 10*time.Second)
   123  		defer cancel()
   124  
   125  		require.NoError(t, exp.Shutdown(ctx))
   126  	})
   127  
   128  	// RunEndToEndTest closes mc.
   129  	otlptracetest.RunEndToEndTest(ctx, t, exp, mc)
   130  }
   131  
   132  func TestExporterShutdown(t *testing.T) {
   133  	mc := runMockCollectorAtEndpoint(t, "localhost:0")
   134  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   135  
   136  	factory := func() otlptrace.Client {
   137  		return otlptracegrpc.NewClient(
   138  			otlptracegrpc.WithEndpoint(mc.endpoint),
   139  			otlptracegrpc.WithInsecure(),
   140  			otlptracegrpc.WithDialOption(grpc.WithBlock()),
   141  		)
   142  	}
   143  	otlptracetest.RunExporterShutdownTest(t, factory)
   144  }
   145  
   146  func TestNewInvokeStartThenStopManyTimes(t *testing.T) {
   147  	mc := runMockCollector(t)
   148  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   149  
   150  	ctx := context.Background()
   151  	exp := newGRPCExporter(t, ctx, mc.endpoint)
   152  	t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
   153  
   154  	// Invoke Start numerous times, should return errAlreadyStarted
   155  	for i := 0; i < 10; i++ {
   156  		if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") {
   157  			t.Fatalf("#%d unexpected Start error: %v", i, err)
   158  		}
   159  	}
   160  
   161  	if err := exp.Shutdown(ctx); err != nil {
   162  		t.Fatalf("failed to Shutdown the exporter: %v", err)
   163  	}
   164  	// Invoke Shutdown numerous times
   165  	for i := 0; i < 10; i++ {
   166  		if err := exp.Shutdown(ctx); err != nil {
   167  			t.Fatalf(`#%d got error (%v) expected none`, i, err)
   168  		}
   169  	}
   170  }
   171  
   172  // This test takes a long time to run: to skip it, run tests using: -short.
   173  func TestNewCollectorOnBadConnection(t *testing.T) {
   174  	if testing.Short() {
   175  		t.Skipf("Skipping this long running test")
   176  	}
   177  
   178  	ln, err := net.Listen("tcp", "localhost:0")
   179  	if err != nil {
   180  		t.Fatalf("Failed to grab an available port: %v", err)
   181  	}
   182  	// Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP
   183  	// However, our goal of closing it is to simulate an unavailable connection
   184  	_ = ln.Close()
   185  
   186  	_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
   187  
   188  	endpoint := fmt.Sprintf("localhost:%s", collectorPortStr)
   189  	ctx := context.Background()
   190  	exp := newGRPCExporter(t, ctx, endpoint)
   191  	require.NoError(t, exp.Shutdown(ctx))
   192  }
   193  
   194  func TestNewWithEndpoint(t *testing.T) {
   195  	mc := runMockCollector(t)
   196  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   197  
   198  	ctx := context.Background()
   199  	exp := newGRPCExporter(t, ctx, mc.endpoint)
   200  	require.NoError(t, exp.Shutdown(ctx))
   201  }
   202  
   203  func TestNewWithHeaders(t *testing.T) {
   204  	mc := runMockCollector(t)
   205  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   206  
   207  	ctx := context.Background()
   208  	exp := newGRPCExporter(t, ctx, mc.endpoint,
   209  		otlptracegrpc.WithHeaders(map[string]string{"header1": "value1"}))
   210  	t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
   211  	require.NoError(t, exp.ExportSpans(ctx, roSpans))
   212  
   213  	headers := mc.getHeaders()
   214  	require.Regexp(t, "OTel OTLP Exporter Go/1\\..*", headers.Get("user-agent"))
   215  	require.Len(t, headers.Get("header1"), 1)
   216  	assert.Equal(t, "value1", headers.Get("header1")[0])
   217  }
   218  
   219  func TestExportSpansTimeoutHonored(t *testing.T) {
   220  	ctx, cancel := contextWithTimeout(context.Background(), t, 1*time.Minute)
   221  	t.Cleanup(cancel)
   222  
   223  	mc := runMockCollector(t)
   224  	exportBlock := make(chan struct{})
   225  	mc.traceSvc.exportBlock = exportBlock
   226  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   227  
   228  	exp := newGRPCExporter(
   229  		t,
   230  		ctx,
   231  		mc.endpoint,
   232  		otlptracegrpc.WithTimeout(1*time.Nanosecond),
   233  		otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}),
   234  	)
   235  	t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
   236  
   237  	err := exp.ExportSpans(ctx, roSpans)
   238  	// Release the export so everything is cleaned up on shutdown.
   239  	close(exportBlock)
   240  
   241  	unwrapped := errors.Unwrap(err)
   242  	require.Equal(t, codes.DeadlineExceeded, status.Convert(unwrapped).Code())
   243  	require.True(t, strings.HasPrefix(err.Error(), "traces export: "), err)
   244  }
   245  
   246  func TestNewWithMultipleAttributeTypes(t *testing.T) {
   247  	mc := runMockCollector(t)
   248  
   249  	ctx, cancel := contextWithTimeout(context.Background(), t, 10*time.Second)
   250  	t.Cleanup(cancel)
   251  
   252  	exp := newGRPCExporter(t, ctx, mc.endpoint)
   253  	t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
   254  
   255  	tp := sdktrace.NewTracerProvider(
   256  		sdktrace.WithSampler(sdktrace.AlwaysSample()),
   257  		sdktrace.WithBatcher(
   258  			exp,
   259  			// add following two options to ensure flush
   260  			sdktrace.WithBatchTimeout(5*time.Second),
   261  			sdktrace.WithMaxExportBatchSize(10),
   262  		),
   263  	)
   264  	t.Cleanup(func() { require.NoError(t, tp.Shutdown(ctx)) })
   265  
   266  	tr := tp.Tracer("test-tracer")
   267  	testKvs := []attribute.KeyValue{
   268  		attribute.Int("Int", 1),
   269  		attribute.Int64("Int64", int64(3)),
   270  		attribute.Float64("Float64", 2.22),
   271  		attribute.Bool("Bool", true),
   272  		attribute.String("String", "test"),
   273  	}
   274  	_, span := tr.Start(ctx, "AlwaysSample")
   275  	span.SetAttributes(testKvs...)
   276  	span.End()
   277  
   278  	// Flush and close.
   279  	func() {
   280  		ctx, cancel := contextWithTimeout(ctx, t, 10*time.Second)
   281  		defer cancel()
   282  		require.NoError(t, tp.Shutdown(ctx))
   283  	}()
   284  
   285  	// Wait >2 cycles.
   286  	<-time.After(40 * time.Millisecond)
   287  
   288  	// Now shutdown the exporter
   289  	require.NoError(t, exp.Shutdown(ctx))
   290  
   291  	// Shutdown the collector too so that we can begin
   292  	// verification checks of expected data back.
   293  	require.NoError(t, mc.stop())
   294  
   295  	// Now verify that we only got one span
   296  	rss := mc.getSpans()
   297  	if got, want := len(rss), 1; got != want {
   298  		t.Fatalf("resource span count: got %d, want %d\n", got, want)
   299  	}
   300  
   301  	expected := []*commonpb.KeyValue{
   302  		{
   303  			Key: "Int",
   304  			Value: &commonpb.AnyValue{
   305  				Value: &commonpb.AnyValue_IntValue{
   306  					IntValue: 1,
   307  				},
   308  			},
   309  		},
   310  		{
   311  			Key: "Int64",
   312  			Value: &commonpb.AnyValue{
   313  				Value: &commonpb.AnyValue_IntValue{
   314  					IntValue: 3,
   315  				},
   316  			},
   317  		},
   318  		{
   319  			Key: "Float64",
   320  			Value: &commonpb.AnyValue{
   321  				Value: &commonpb.AnyValue_DoubleValue{
   322  					DoubleValue: 2.22,
   323  				},
   324  			},
   325  		},
   326  		{
   327  			Key: "Bool",
   328  			Value: &commonpb.AnyValue{
   329  				Value: &commonpb.AnyValue_BoolValue{
   330  					BoolValue: true,
   331  				},
   332  			},
   333  		},
   334  		{
   335  			Key: "String",
   336  			Value: &commonpb.AnyValue{
   337  				Value: &commonpb.AnyValue_StringValue{
   338  					StringValue: "test",
   339  				},
   340  			},
   341  		},
   342  	}
   343  
   344  	// Verify attributes
   345  	if !assert.Len(t, rss[0].Attributes, len(expected)) {
   346  		t.Fatalf("attributes count: got %d, want %d\n", len(rss[0].Attributes), len(expected))
   347  	}
   348  	for i, actual := range rss[0].Attributes {
   349  		if a, ok := actual.Value.Value.(*commonpb.AnyValue_DoubleValue); ok {
   350  			e, ok := expected[i].Value.Value.(*commonpb.AnyValue_DoubleValue)
   351  			if !ok {
   352  				t.Errorf("expected AnyValue_DoubleValue, got %T", expected[i].Value.Value)
   353  				continue
   354  			}
   355  			if !assert.InDelta(t, e.DoubleValue, a.DoubleValue, 0.01) {
   356  				continue
   357  			}
   358  			e.DoubleValue = a.DoubleValue
   359  		}
   360  		assert.Equal(t, expected[i], actual)
   361  	}
   362  }
   363  
   364  func TestStartErrorInvalidAddress(t *testing.T) {
   365  	client := otlptracegrpc.NewClient(
   366  		otlptracegrpc.WithInsecure(),
   367  		// Validate the connection in Start (which should return the error).
   368  		otlptracegrpc.WithDialOption(
   369  			grpc.WithBlock(),
   370  			grpc.FailOnNonTempDialError(true),
   371  		),
   372  		otlptracegrpc.WithEndpoint("invalid"),
   373  		otlptracegrpc.WithReconnectionPeriod(time.Hour),
   374  	)
   375  	err := client.Start(context.Background())
   376  	assert.EqualError(t, err, `connection error: desc = "transport: error while dialing: dial tcp: address invalid: missing port in address"`)
   377  }
   378  
   379  func TestEmptyData(t *testing.T) {
   380  	mc := runMockCollector(t)
   381  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   382  
   383  	ctx := context.Background()
   384  	exp := newGRPCExporter(t, ctx, mc.endpoint)
   385  	t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
   386  
   387  	assert.NoError(t, exp.ExportSpans(ctx, nil))
   388  }
   389  
   390  func TestPartialSuccess(t *testing.T) {
   391  	mc := runMockCollectorWithConfig(t, &mockConfig{
   392  		partial: &coltracepb.ExportTracePartialSuccess{
   393  			RejectedSpans: 2,
   394  			ErrorMessage:  "partially successful",
   395  		},
   396  	})
   397  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   398  
   399  	errs := []error{}
   400  	otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
   401  		errs = append(errs, err)
   402  	}))
   403  	ctx := context.Background()
   404  	exp := newGRPCExporter(t, ctx, mc.endpoint)
   405  	t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
   406  	require.NoError(t, exp.ExportSpans(ctx, roSpans))
   407  
   408  	require.Equal(t, 1, len(errs))
   409  	require.Contains(t, errs[0].Error(), "partially successful")
   410  	require.Contains(t, errs[0].Error(), "2 spans rejected")
   411  }
   412  
   413  func TestCustomUserAgent(t *testing.T) {
   414  	customUserAgent := "custom-user-agent"
   415  	mc := runMockCollector(t)
   416  	t.Cleanup(func() { require.NoError(t, mc.stop()) })
   417  
   418  	ctx := context.Background()
   419  	exp := newGRPCExporter(t, ctx, mc.endpoint,
   420  		otlptracegrpc.WithDialOption(grpc.WithUserAgent(customUserAgent)))
   421  	t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
   422  	require.NoError(t, exp.ExportSpans(ctx, roSpans))
   423  
   424  	headers := mc.getHeaders()
   425  	require.Contains(t, headers.Get("user-agent")[0], customUserAgent)
   426  }
   427  

View as plain text