...

Text file src/go.opentelemetry.io/otel/internal/shared/otlp/otlpmetric/otest/client.go.tmpl

Documentation: go.opentelemetry.io/otel/internal/shared/otlp/otlpmetric/otest

     1// Code created by gotmpl. DO NOT MODIFY.
     2// source: internal/shared/otlp/otlpmetric/otest/client.go.tmpl
     3
     4// Copyright The OpenTelemetry Authors
     5//
     6// Licensed under the Apache License, Version 2.0 (the "License");
     7// you may not use this file except in compliance with the License.
     8// You may obtain a copy of the License at
     9//
    10//     http://www.apache.org/licenses/LICENSE-2.0
    11//
    12// Unless required by applicable law or agreed to in writing, software
    13// distributed under the License is distributed on an "AS IS" BASIS,
    14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15// See the License for the specific language governing permissions and
    16// limitations under the License.
    17
    18package otest
    19
    20import (
    21	"context"
    22	"fmt"
    23	"testing"
    24	"time"
    25
    26	"github.com/google/go-cmp/cmp"
    27	"github.com/stretchr/testify/assert"
    28	"github.com/stretchr/testify/require"
    29	"google.golang.org/protobuf/proto"
    30
    31	"go.opentelemetry.io/otel"
    32	semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
    33	collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
    34	cpb "go.opentelemetry.io/proto/otlp/common/v1"
    35	mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
    36	rpb "go.opentelemetry.io/proto/otlp/resource/v1"
    37)
    38
    39var (
    40	// Sat Jan 01 2000 00:00:00 GMT+0000.
    41	start = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0))
    42	end   = start.Add(30 * time.Second)
    43
    44	kvAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
    45		Value: &cpb.AnyValue_StringValue{StringValue: "alice"},
    46	}}
    47	kvBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
    48		Value: &cpb.AnyValue_StringValue{StringValue: "bob"},
    49	}}
    50	kvSrvName = &cpb.KeyValue{Key: "service.name", Value: &cpb.AnyValue{
    51		Value: &cpb.AnyValue_StringValue{StringValue: "test server"},
    52	}}
    53	kvSrvVer = &cpb.KeyValue{Key: "service.version", Value: &cpb.AnyValue{
    54		Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"},
    55	}}
    56
    57	min, max, sum = 2.0, 4.0, 90.0
    58	hdp           = []*mpb.HistogramDataPoint{
    59		{
    60			Attributes:        []*cpb.KeyValue{kvAlice},
    61			StartTimeUnixNano: uint64(start.UnixNano()),
    62			TimeUnixNano:      uint64(end.UnixNano()),
    63			Count:             30,
    64			Sum:               &sum,
    65			ExplicitBounds:    []float64{1, 5},
    66			BucketCounts:      []uint64{0, 30, 0},
    67			Min:               &min,
    68			Max:               &max,
    69		},
    70	}
    71
    72	hist = &mpb.Histogram{
    73		AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA,
    74		DataPoints:             hdp,
    75	}
    76
    77	dPtsInt64 = []*mpb.NumberDataPoint{
    78		{
    79			Attributes:        []*cpb.KeyValue{kvAlice},
    80			StartTimeUnixNano: uint64(start.UnixNano()),
    81			TimeUnixNano:      uint64(end.UnixNano()),
    82			Value:             &mpb.NumberDataPoint_AsInt{AsInt: 1},
    83		},
    84		{
    85			Attributes:        []*cpb.KeyValue{kvBob},
    86			StartTimeUnixNano: uint64(start.UnixNano()),
    87			TimeUnixNano:      uint64(end.UnixNano()),
    88			Value:             &mpb.NumberDataPoint_AsInt{AsInt: 2},
    89		},
    90	}
    91	dPtsFloat64 = []*mpb.NumberDataPoint{
    92		{
    93			Attributes:        []*cpb.KeyValue{kvAlice},
    94			StartTimeUnixNano: uint64(start.UnixNano()),
    95			TimeUnixNano:      uint64(end.UnixNano()),
    96			Value:             &mpb.NumberDataPoint_AsDouble{AsDouble: 1.0},
    97		},
    98		{
    99			Attributes:        []*cpb.KeyValue{kvBob},
   100			StartTimeUnixNano: uint64(start.UnixNano()),
   101			TimeUnixNano:      uint64(end.UnixNano()),
   102			Value:             &mpb.NumberDataPoint_AsDouble{AsDouble: 2.0},
   103		},
   104	}
   105
   106	sumInt64 = &mpb.Sum{
   107		AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
   108		IsMonotonic:            true,
   109		DataPoints:             dPtsInt64,
   110	}
   111	sumFloat64 = &mpb.Sum{
   112		AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA,
   113		IsMonotonic:            false,
   114		DataPoints:             dPtsFloat64,
   115	}
   116
   117	gaugeInt64   = &mpb.Gauge{DataPoints: dPtsInt64}
   118	gaugeFloat64 = &mpb.Gauge{DataPoints: dPtsFloat64}
   119
   120	metrics = []*mpb.Metric{
   121		{
   122			Name:        "int64-gauge",
   123			Description: "Gauge with int64 values",
   124			Unit:        "1",
   125			Data:        &mpb.Metric_Gauge{Gauge: gaugeInt64},
   126		},
   127		{
   128			Name:        "float64-gauge",
   129			Description: "Gauge with float64 values",
   130			Unit:        "1",
   131			Data:        &mpb.Metric_Gauge{Gauge: gaugeFloat64},
   132		},
   133		{
   134			Name:        "int64-sum",
   135			Description: "Sum with int64 values",
   136			Unit:        "1",
   137			Data:        &mpb.Metric_Sum{Sum: sumInt64},
   138		},
   139		{
   140			Name:        "float64-sum",
   141			Description: "Sum with float64 values",
   142			Unit:        "1",
   143			Data:        &mpb.Metric_Sum{Sum: sumFloat64},
   144		},
   145		{
   146			Name:        "histogram",
   147			Description: "Histogram",
   148			Unit:        "1",
   149			Data:        &mpb.Metric_Histogram{Histogram: hist},
   150		},
   151	}
   152
   153	scope = &cpb.InstrumentationScope{
   154		Name:    "test/code/path",
   155		Version: "v0.1.0",
   156	}
   157	scopeMetrics = []*mpb.ScopeMetrics{
   158		{
   159			Scope:     scope,
   160			Metrics:   metrics,
   161			SchemaUrl: semconv.SchemaURL,
   162		},
   163	}
   164
   165	res = &rpb.Resource{
   166		Attributes: []*cpb.KeyValue{kvSrvName, kvSrvVer},
   167	}
   168	resourceMetrics = &mpb.ResourceMetrics{
   169		Resource:     res,
   170		ScopeMetrics: scopeMetrics,
   171		SchemaUrl:    semconv.SchemaURL,
   172	}
   173)
   174
   175type Client interface {
   176	UploadMetrics(context.Context, *mpb.ResourceMetrics) error
   177	ForceFlush(context.Context) error
   178	Shutdown(context.Context) error
   179}
   180
   181// ClientFactory is a function that when called returns a
   182// Client implementation that is connected to also returned
   183// Collector implementation. The Client is ready to upload metric data to the
   184// Collector which is ready to store that data.
   185//
   186// If resultCh is not nil, the returned Collector needs to use the responses
   187// from that channel to send back to the client for every export request.
   188type ClientFactory func(resultCh <-chan ExportResult) (Client, Collector)
   189
   190// RunClientTests runs a suite of Client integration tests. For example:
   191//
   192//	t.Run("Integration", RunClientTests(factory))
   193func RunClientTests(f ClientFactory) func(*testing.T) {
   194	return func(t *testing.T) {
   195		t.Run("ClientHonorsContextErrors", func(t *testing.T) {
   196			t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
   197				c, _ := f(nil)
   198				return c.Shutdown
   199			}))
   200
   201			t.Run("ForceFlush", testCtxErrs(func() func(context.Context) error {
   202				c, _ := f(nil)
   203				return c.ForceFlush
   204			}))
   205
   206			t.Run("UploadMetrics", testCtxErrs(func() func(context.Context) error {
   207				c, _ := f(nil)
   208				return func(ctx context.Context) error {
   209					return c.UploadMetrics(ctx, nil)
   210				}
   211			}))
   212		})
   213
   214		t.Run("ForceFlushFlushes", func(t *testing.T) {
   215			ctx := context.Background()
   216			client, collector := f(nil)
   217			require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
   218
   219			require.NoError(t, client.ForceFlush(ctx))
   220			rm := collector.Collect().Dump()
   221			// Data correctness is not important, just it was received.
   222			require.Greater(t, len(rm), 0, "no data uploaded")
   223
   224			require.NoError(t, client.Shutdown(ctx))
   225			rm = collector.Collect().Dump()
   226			assert.Len(t, rm, 0, "client did not flush all data")
   227		})
   228
   229		t.Run("UploadMetrics", func(t *testing.T) {
   230			ctx := context.Background()
   231			client, coll := f(nil)
   232
   233			require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
   234			require.NoError(t, client.Shutdown(ctx))
   235			got := coll.Collect().Dump()
   236			require.Len(t, got, 1, "upload of one ResourceMetrics")
   237			diff := cmp.Diff(got[0], resourceMetrics, cmp.Comparer(proto.Equal))
   238			if diff != "" {
   239				t.Fatalf("unexpected ResourceMetrics:\n%s", diff)
   240			}
   241		})
   242
   243		t.Run("PartialSuccess", func(t *testing.T) {
   244			const n, msg = 2, "bad data"
   245			rCh := make(chan ExportResult, 3)
   246			rCh <- ExportResult{
   247				Response: &collpb.ExportMetricsServiceResponse{
   248					PartialSuccess: &collpb.ExportMetricsPartialSuccess{
   249						RejectedDataPoints: n,
   250						ErrorMessage:       msg,
   251					},
   252				},
   253			}
   254			rCh <- ExportResult{
   255				Response: &collpb.ExportMetricsServiceResponse{
   256					PartialSuccess: &collpb.ExportMetricsPartialSuccess{
   257						// Should not be logged.
   258						RejectedDataPoints: 0,
   259						ErrorMessage:       "",
   260					},
   261				},
   262			}
   263			rCh <- ExportResult{
   264				Response: &collpb.ExportMetricsServiceResponse{},
   265			}
   266
   267			ctx := context.Background()
   268			client, _ := f(rCh)
   269
   270			defer func(orig otel.ErrorHandler) {
   271				otel.SetErrorHandler(orig)
   272			}(otel.GetErrorHandler())
   273
   274			errs := []error{}
   275			eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
   276			otel.SetErrorHandler(eh)
   277
   278			require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
   279			require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
   280			require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
   281			require.NoError(t, client.Shutdown(ctx))
   282
   283			require.Equal(t, 1, len(errs))
   284			want := fmt.Sprintf("%s (%d metric data points rejected)", msg, n)
   285			assert.ErrorContains(t, errs[0], want)
   286		})
   287	}
   288}
   289
   290func testCtxErrs(factory func() func(context.Context) error) func(t *testing.T) {
   291	return func(t *testing.T) {
   292		t.Helper()
   293		ctx, cancel := context.WithCancel(context.Background())
   294		t.Cleanup(cancel)
   295
   296		t.Run("DeadlineExceeded", func(t *testing.T) {
   297			innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
   298			t.Cleanup(innerCancel)
   299			<-innerCtx.Done()
   300
   301			f := factory()
   302			assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
   303		})
   304
   305		t.Run("Canceled", func(t *testing.T) {
   306			innerCtx, innerCancel := context.WithCancel(ctx)
   307			innerCancel()
   308
   309			f := factory()
   310			assert.ErrorIs(t, f(innerCtx), context.Canceled)
   311		})
   312	}
   313}

View as plain text