1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package otlptracegrpc_test
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "net"
22 "strings"
23 "testing"
24 "time"
25
26 "github.com/stretchr/testify/assert"
27 "github.com/stretchr/testify/require"
28 "go.uber.org/goleak"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/encoding/gzip"
32 "google.golang.org/grpc/status"
33
34 "go.opentelemetry.io/otel"
35 "go.opentelemetry.io/otel/attribute"
36 "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
37 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
38 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
39 sdktrace "go.opentelemetry.io/otel/sdk/trace"
40 "go.opentelemetry.io/otel/sdk/trace/tracetest"
41 coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
42 commonpb "go.opentelemetry.io/proto/otlp/common/v1"
43 )
44
45 func TestMain(m *testing.M) {
46 goleak.VerifyTestMain(m)
47 }
48
49 var roSpans = tracetest.SpanStubs{{Name: "Span 0"}}.Snapshots()
50
51 func contextWithTimeout(parent context.Context, t *testing.T, timeout time.Duration) (context.Context, context.CancelFunc) {
52 d, ok := t.Deadline()
53 if !ok {
54 d = time.Now().Add(timeout)
55 } else {
56 d = d.Add(-1 * time.Millisecond)
57 now := time.Now()
58 if d.Sub(now) > timeout {
59 d = now.Add(timeout)
60 }
61 }
62 return context.WithDeadline(parent, d)
63 }
64
65 func TestNewEndToEnd(t *testing.T) {
66 tests := []struct {
67 name string
68 additionalOpts []otlptracegrpc.Option
69 }{
70 {
71 name: "StandardExporter",
72 },
73 {
74 name: "WithCompressor",
75 additionalOpts: []otlptracegrpc.Option{
76 otlptracegrpc.WithCompressor(gzip.Name),
77 },
78 },
79 {
80 name: "WithServiceConfig",
81 additionalOpts: []otlptracegrpc.Option{
82 otlptracegrpc.WithServiceConfig("{}"),
83 },
84 },
85 {
86 name: "WithDialOptions",
87 additionalOpts: []otlptracegrpc.Option{
88 otlptracegrpc.WithDialOption(grpc.WithBlock()),
89 },
90 },
91 }
92
93 for _, test := range tests {
94 t.Run(test.name, func(t *testing.T) {
95 newExporterEndToEndTest(t, test.additionalOpts)
96 })
97 }
98 }
99
100 func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlptracegrpc.Option) *otlptrace.Exporter {
101 opts := []otlptracegrpc.Option{
102 otlptracegrpc.WithInsecure(),
103 otlptracegrpc.WithEndpoint(endpoint),
104 otlptracegrpc.WithReconnectionPeriod(50 * time.Millisecond),
105 }
106
107 opts = append(opts, additionalOpts...)
108 client := otlptracegrpc.NewClient(opts...)
109 exp, err := otlptrace.New(ctx, client)
110 if err != nil {
111 t.Fatalf("failed to create a new collector exporter: %v", err)
112 }
113 return exp
114 }
115
116 func newExporterEndToEndTest(t *testing.T, additionalOpts []otlptracegrpc.Option) {
117 mc := runMockCollector(t)
118
119 ctx := context.Background()
120 exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...)
121 t.Cleanup(func() {
122 ctx, cancel := contextWithTimeout(ctx, t, 10*time.Second)
123 defer cancel()
124
125 require.NoError(t, exp.Shutdown(ctx))
126 })
127
128
129 otlptracetest.RunEndToEndTest(ctx, t, exp, mc)
130 }
131
132 func TestExporterShutdown(t *testing.T) {
133 mc := runMockCollectorAtEndpoint(t, "localhost:0")
134 t.Cleanup(func() { require.NoError(t, mc.stop()) })
135
136 factory := func() otlptrace.Client {
137 return otlptracegrpc.NewClient(
138 otlptracegrpc.WithEndpoint(mc.endpoint),
139 otlptracegrpc.WithInsecure(),
140 otlptracegrpc.WithDialOption(grpc.WithBlock()),
141 )
142 }
143 otlptracetest.RunExporterShutdownTest(t, factory)
144 }
145
146 func TestNewInvokeStartThenStopManyTimes(t *testing.T) {
147 mc := runMockCollector(t)
148 t.Cleanup(func() { require.NoError(t, mc.stop()) })
149
150 ctx := context.Background()
151 exp := newGRPCExporter(t, ctx, mc.endpoint)
152 t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
153
154
155 for i := 0; i < 10; i++ {
156 if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") {
157 t.Fatalf("#%d unexpected Start error: %v", i, err)
158 }
159 }
160
161 if err := exp.Shutdown(ctx); err != nil {
162 t.Fatalf("failed to Shutdown the exporter: %v", err)
163 }
164
165 for i := 0; i < 10; i++ {
166 if err := exp.Shutdown(ctx); err != nil {
167 t.Fatalf(`#%d got error (%v) expected none`, i, err)
168 }
169 }
170 }
171
172
173 func TestNewCollectorOnBadConnection(t *testing.T) {
174 if testing.Short() {
175 t.Skipf("Skipping this long running test")
176 }
177
178 ln, err := net.Listen("tcp", "localhost:0")
179 if err != nil {
180 t.Fatalf("Failed to grab an available port: %v", err)
181 }
182
183
184 _ = ln.Close()
185
186 _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
187
188 endpoint := fmt.Sprintf("localhost:%s", collectorPortStr)
189 ctx := context.Background()
190 exp := newGRPCExporter(t, ctx, endpoint)
191 require.NoError(t, exp.Shutdown(ctx))
192 }
193
194 func TestNewWithEndpoint(t *testing.T) {
195 mc := runMockCollector(t)
196 t.Cleanup(func() { require.NoError(t, mc.stop()) })
197
198 ctx := context.Background()
199 exp := newGRPCExporter(t, ctx, mc.endpoint)
200 require.NoError(t, exp.Shutdown(ctx))
201 }
202
203 func TestNewWithHeaders(t *testing.T) {
204 mc := runMockCollector(t)
205 t.Cleanup(func() { require.NoError(t, mc.stop()) })
206
207 ctx := context.Background()
208 exp := newGRPCExporter(t, ctx, mc.endpoint,
209 otlptracegrpc.WithHeaders(map[string]string{"header1": "value1"}))
210 t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
211 require.NoError(t, exp.ExportSpans(ctx, roSpans))
212
213 headers := mc.getHeaders()
214 require.Regexp(t, "OTel OTLP Exporter Go/1\\..*", headers.Get("user-agent"))
215 require.Len(t, headers.Get("header1"), 1)
216 assert.Equal(t, "value1", headers.Get("header1")[0])
217 }
218
219 func TestExportSpansTimeoutHonored(t *testing.T) {
220 ctx, cancel := contextWithTimeout(context.Background(), t, 1*time.Minute)
221 t.Cleanup(cancel)
222
223 mc := runMockCollector(t)
224 exportBlock := make(chan struct{})
225 mc.traceSvc.exportBlock = exportBlock
226 t.Cleanup(func() { require.NoError(t, mc.stop()) })
227
228 exp := newGRPCExporter(
229 t,
230 ctx,
231 mc.endpoint,
232 otlptracegrpc.WithTimeout(1*time.Nanosecond),
233 otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}),
234 )
235 t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
236
237 err := exp.ExportSpans(ctx, roSpans)
238
239 close(exportBlock)
240
241 unwrapped := errors.Unwrap(err)
242 require.Equal(t, codes.DeadlineExceeded, status.Convert(unwrapped).Code())
243 require.True(t, strings.HasPrefix(err.Error(), "traces export: "), err)
244 }
245
246 func TestNewWithMultipleAttributeTypes(t *testing.T) {
247 mc := runMockCollector(t)
248
249 ctx, cancel := contextWithTimeout(context.Background(), t, 10*time.Second)
250 t.Cleanup(cancel)
251
252 exp := newGRPCExporter(t, ctx, mc.endpoint)
253 t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
254
255 tp := sdktrace.NewTracerProvider(
256 sdktrace.WithSampler(sdktrace.AlwaysSample()),
257 sdktrace.WithBatcher(
258 exp,
259
260 sdktrace.WithBatchTimeout(5*time.Second),
261 sdktrace.WithMaxExportBatchSize(10),
262 ),
263 )
264 t.Cleanup(func() { require.NoError(t, tp.Shutdown(ctx)) })
265
266 tr := tp.Tracer("test-tracer")
267 testKvs := []attribute.KeyValue{
268 attribute.Int("Int", 1),
269 attribute.Int64("Int64", int64(3)),
270 attribute.Float64("Float64", 2.22),
271 attribute.Bool("Bool", true),
272 attribute.String("String", "test"),
273 }
274 _, span := tr.Start(ctx, "AlwaysSample")
275 span.SetAttributes(testKvs...)
276 span.End()
277
278
279 func() {
280 ctx, cancel := contextWithTimeout(ctx, t, 10*time.Second)
281 defer cancel()
282 require.NoError(t, tp.Shutdown(ctx))
283 }()
284
285
286 <-time.After(40 * time.Millisecond)
287
288
289 require.NoError(t, exp.Shutdown(ctx))
290
291
292
293 require.NoError(t, mc.stop())
294
295
296 rss := mc.getSpans()
297 if got, want := len(rss), 1; got != want {
298 t.Fatalf("resource span count: got %d, want %d\n", got, want)
299 }
300
301 expected := []*commonpb.KeyValue{
302 {
303 Key: "Int",
304 Value: &commonpb.AnyValue{
305 Value: &commonpb.AnyValue_IntValue{
306 IntValue: 1,
307 },
308 },
309 },
310 {
311 Key: "Int64",
312 Value: &commonpb.AnyValue{
313 Value: &commonpb.AnyValue_IntValue{
314 IntValue: 3,
315 },
316 },
317 },
318 {
319 Key: "Float64",
320 Value: &commonpb.AnyValue{
321 Value: &commonpb.AnyValue_DoubleValue{
322 DoubleValue: 2.22,
323 },
324 },
325 },
326 {
327 Key: "Bool",
328 Value: &commonpb.AnyValue{
329 Value: &commonpb.AnyValue_BoolValue{
330 BoolValue: true,
331 },
332 },
333 },
334 {
335 Key: "String",
336 Value: &commonpb.AnyValue{
337 Value: &commonpb.AnyValue_StringValue{
338 StringValue: "test",
339 },
340 },
341 },
342 }
343
344
345 if !assert.Len(t, rss[0].Attributes, len(expected)) {
346 t.Fatalf("attributes count: got %d, want %d\n", len(rss[0].Attributes), len(expected))
347 }
348 for i, actual := range rss[0].Attributes {
349 if a, ok := actual.Value.Value.(*commonpb.AnyValue_DoubleValue); ok {
350 e, ok := expected[i].Value.Value.(*commonpb.AnyValue_DoubleValue)
351 if !ok {
352 t.Errorf("expected AnyValue_DoubleValue, got %T", expected[i].Value.Value)
353 continue
354 }
355 if !assert.InDelta(t, e.DoubleValue, a.DoubleValue, 0.01) {
356 continue
357 }
358 e.DoubleValue = a.DoubleValue
359 }
360 assert.Equal(t, expected[i], actual)
361 }
362 }
363
364 func TestStartErrorInvalidAddress(t *testing.T) {
365 client := otlptracegrpc.NewClient(
366 otlptracegrpc.WithInsecure(),
367
368 otlptracegrpc.WithDialOption(
369 grpc.WithBlock(),
370 grpc.FailOnNonTempDialError(true),
371 ),
372 otlptracegrpc.WithEndpoint("invalid"),
373 otlptracegrpc.WithReconnectionPeriod(time.Hour),
374 )
375 err := client.Start(context.Background())
376 assert.EqualError(t, err, `connection error: desc = "transport: error while dialing: dial tcp: address invalid: missing port in address"`)
377 }
378
379 func TestEmptyData(t *testing.T) {
380 mc := runMockCollector(t)
381 t.Cleanup(func() { require.NoError(t, mc.stop()) })
382
383 ctx := context.Background()
384 exp := newGRPCExporter(t, ctx, mc.endpoint)
385 t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
386
387 assert.NoError(t, exp.ExportSpans(ctx, nil))
388 }
389
390 func TestPartialSuccess(t *testing.T) {
391 mc := runMockCollectorWithConfig(t, &mockConfig{
392 partial: &coltracepb.ExportTracePartialSuccess{
393 RejectedSpans: 2,
394 ErrorMessage: "partially successful",
395 },
396 })
397 t.Cleanup(func() { require.NoError(t, mc.stop()) })
398
399 errs := []error{}
400 otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
401 errs = append(errs, err)
402 }))
403 ctx := context.Background()
404 exp := newGRPCExporter(t, ctx, mc.endpoint)
405 t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
406 require.NoError(t, exp.ExportSpans(ctx, roSpans))
407
408 require.Equal(t, 1, len(errs))
409 require.Contains(t, errs[0].Error(), "partially successful")
410 require.Contains(t, errs[0].Error(), "2 spans rejected")
411 }
412
413 func TestCustomUserAgent(t *testing.T) {
414 customUserAgent := "custom-user-agent"
415 mc := runMockCollector(t)
416 t.Cleanup(func() { require.NoError(t, mc.stop()) })
417
418 ctx := context.Background()
419 exp := newGRPCExporter(t, ctx, mc.endpoint,
420 otlptracegrpc.WithDialOption(grpc.WithUserAgent(customUserAgent)))
421 t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
422 require.NoError(t, exp.ExportSpans(ctx, roSpans))
423
424 headers := mc.getHeaders()
425 require.Contains(t, headers.Get("user-agent")[0], customUserAgent)
426 }
427
View as plain text