1// Code created by gotmpl. DO NOT MODIFY.
2// source: internal/shared/otlp/otlpmetric/otest/client.go.tmpl
3
4// Copyright The OpenTelemetry Authors
5//
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18package otest
19
20import (
21 "context"
22 "fmt"
23 "testing"
24 "time"
25
26 "github.com/google/go-cmp/cmp"
27 "github.com/stretchr/testify/assert"
28 "github.com/stretchr/testify/require"
29 "google.golang.org/protobuf/proto"
30
31 "go.opentelemetry.io/otel"
32 semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
33 collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
34 cpb "go.opentelemetry.io/proto/otlp/common/v1"
35 mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
36 rpb "go.opentelemetry.io/proto/otlp/resource/v1"
37)
38
39var (
40 // Sat Jan 01 2000 00:00:00 GMT+0000.
41 start = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0))
42 end = start.Add(30 * time.Second)
43
44 kvAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
45 Value: &cpb.AnyValue_StringValue{StringValue: "alice"},
46 }}
47 kvBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
48 Value: &cpb.AnyValue_StringValue{StringValue: "bob"},
49 }}
50 kvSrvName = &cpb.KeyValue{Key: "service.name", Value: &cpb.AnyValue{
51 Value: &cpb.AnyValue_StringValue{StringValue: "test server"},
52 }}
53 kvSrvVer = &cpb.KeyValue{Key: "service.version", Value: &cpb.AnyValue{
54 Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"},
55 }}
56
57 min, max, sum = 2.0, 4.0, 90.0
58 hdp = []*mpb.HistogramDataPoint{
59 {
60 Attributes: []*cpb.KeyValue{kvAlice},
61 StartTimeUnixNano: uint64(start.UnixNano()),
62 TimeUnixNano: uint64(end.UnixNano()),
63 Count: 30,
64 Sum: &sum,
65 ExplicitBounds: []float64{1, 5},
66 BucketCounts: []uint64{0, 30, 0},
67 Min: &min,
68 Max: &max,
69 },
70 }
71
72 hist = &mpb.Histogram{
73 AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA,
74 DataPoints: hdp,
75 }
76
77 dPtsInt64 = []*mpb.NumberDataPoint{
78 {
79 Attributes: []*cpb.KeyValue{kvAlice},
80 StartTimeUnixNano: uint64(start.UnixNano()),
81 TimeUnixNano: uint64(end.UnixNano()),
82 Value: &mpb.NumberDataPoint_AsInt{AsInt: 1},
83 },
84 {
85 Attributes: []*cpb.KeyValue{kvBob},
86 StartTimeUnixNano: uint64(start.UnixNano()),
87 TimeUnixNano: uint64(end.UnixNano()),
88 Value: &mpb.NumberDataPoint_AsInt{AsInt: 2},
89 },
90 }
91 dPtsFloat64 = []*mpb.NumberDataPoint{
92 {
93 Attributes: []*cpb.KeyValue{kvAlice},
94 StartTimeUnixNano: uint64(start.UnixNano()),
95 TimeUnixNano: uint64(end.UnixNano()),
96 Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 1.0},
97 },
98 {
99 Attributes: []*cpb.KeyValue{kvBob},
100 StartTimeUnixNano: uint64(start.UnixNano()),
101 TimeUnixNano: uint64(end.UnixNano()),
102 Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 2.0},
103 },
104 }
105
106 sumInt64 = &mpb.Sum{
107 AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
108 IsMonotonic: true,
109 DataPoints: dPtsInt64,
110 }
111 sumFloat64 = &mpb.Sum{
112 AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA,
113 IsMonotonic: false,
114 DataPoints: dPtsFloat64,
115 }
116
117 gaugeInt64 = &mpb.Gauge{DataPoints: dPtsInt64}
118 gaugeFloat64 = &mpb.Gauge{DataPoints: dPtsFloat64}
119
120 metrics = []*mpb.Metric{
121 {
122 Name: "int64-gauge",
123 Description: "Gauge with int64 values",
124 Unit: "1",
125 Data: &mpb.Metric_Gauge{Gauge: gaugeInt64},
126 },
127 {
128 Name: "float64-gauge",
129 Description: "Gauge with float64 values",
130 Unit: "1",
131 Data: &mpb.Metric_Gauge{Gauge: gaugeFloat64},
132 },
133 {
134 Name: "int64-sum",
135 Description: "Sum with int64 values",
136 Unit: "1",
137 Data: &mpb.Metric_Sum{Sum: sumInt64},
138 },
139 {
140 Name: "float64-sum",
141 Description: "Sum with float64 values",
142 Unit: "1",
143 Data: &mpb.Metric_Sum{Sum: sumFloat64},
144 },
145 {
146 Name: "histogram",
147 Description: "Histogram",
148 Unit: "1",
149 Data: &mpb.Metric_Histogram{Histogram: hist},
150 },
151 }
152
153 scope = &cpb.InstrumentationScope{
154 Name: "test/code/path",
155 Version: "v0.1.0",
156 }
157 scopeMetrics = []*mpb.ScopeMetrics{
158 {
159 Scope: scope,
160 Metrics: metrics,
161 SchemaUrl: semconv.SchemaURL,
162 },
163 }
164
165 res = &rpb.Resource{
166 Attributes: []*cpb.KeyValue{kvSrvName, kvSrvVer},
167 }
168 resourceMetrics = &mpb.ResourceMetrics{
169 Resource: res,
170 ScopeMetrics: scopeMetrics,
171 SchemaUrl: semconv.SchemaURL,
172 }
173)
174
175type Client interface {
176 UploadMetrics(context.Context, *mpb.ResourceMetrics) error
177 ForceFlush(context.Context) error
178 Shutdown(context.Context) error
179}
180
181// ClientFactory is a function that when called returns a
182// Client implementation that is connected to also returned
183// Collector implementation. The Client is ready to upload metric data to the
184// Collector which is ready to store that data.
185//
186// If resultCh is not nil, the returned Collector needs to use the responses
187// from that channel to send back to the client for every export request.
188type ClientFactory func(resultCh <-chan ExportResult) (Client, Collector)
189
190// RunClientTests runs a suite of Client integration tests. For example:
191//
192// t.Run("Integration", RunClientTests(factory))
193func RunClientTests(f ClientFactory) func(*testing.T) {
194 return func(t *testing.T) {
195 t.Run("ClientHonorsContextErrors", func(t *testing.T) {
196 t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
197 c, _ := f(nil)
198 return c.Shutdown
199 }))
200
201 t.Run("ForceFlush", testCtxErrs(func() func(context.Context) error {
202 c, _ := f(nil)
203 return c.ForceFlush
204 }))
205
206 t.Run("UploadMetrics", testCtxErrs(func() func(context.Context) error {
207 c, _ := f(nil)
208 return func(ctx context.Context) error {
209 return c.UploadMetrics(ctx, nil)
210 }
211 }))
212 })
213
214 t.Run("ForceFlushFlushes", func(t *testing.T) {
215 ctx := context.Background()
216 client, collector := f(nil)
217 require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
218
219 require.NoError(t, client.ForceFlush(ctx))
220 rm := collector.Collect().Dump()
221 // Data correctness is not important, just it was received.
222 require.Greater(t, len(rm), 0, "no data uploaded")
223
224 require.NoError(t, client.Shutdown(ctx))
225 rm = collector.Collect().Dump()
226 assert.Len(t, rm, 0, "client did not flush all data")
227 })
228
229 t.Run("UploadMetrics", func(t *testing.T) {
230 ctx := context.Background()
231 client, coll := f(nil)
232
233 require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
234 require.NoError(t, client.Shutdown(ctx))
235 got := coll.Collect().Dump()
236 require.Len(t, got, 1, "upload of one ResourceMetrics")
237 diff := cmp.Diff(got[0], resourceMetrics, cmp.Comparer(proto.Equal))
238 if diff != "" {
239 t.Fatalf("unexpected ResourceMetrics:\n%s", diff)
240 }
241 })
242
243 t.Run("PartialSuccess", func(t *testing.T) {
244 const n, msg = 2, "bad data"
245 rCh := make(chan ExportResult, 3)
246 rCh <- ExportResult{
247 Response: &collpb.ExportMetricsServiceResponse{
248 PartialSuccess: &collpb.ExportMetricsPartialSuccess{
249 RejectedDataPoints: n,
250 ErrorMessage: msg,
251 },
252 },
253 }
254 rCh <- ExportResult{
255 Response: &collpb.ExportMetricsServiceResponse{
256 PartialSuccess: &collpb.ExportMetricsPartialSuccess{
257 // Should not be logged.
258 RejectedDataPoints: 0,
259 ErrorMessage: "",
260 },
261 },
262 }
263 rCh <- ExportResult{
264 Response: &collpb.ExportMetricsServiceResponse{},
265 }
266
267 ctx := context.Background()
268 client, _ := f(rCh)
269
270 defer func(orig otel.ErrorHandler) {
271 otel.SetErrorHandler(orig)
272 }(otel.GetErrorHandler())
273
274 errs := []error{}
275 eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
276 otel.SetErrorHandler(eh)
277
278 require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
279 require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
280 require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
281 require.NoError(t, client.Shutdown(ctx))
282
283 require.Equal(t, 1, len(errs))
284 want := fmt.Sprintf("%s (%d metric data points rejected)", msg, n)
285 assert.ErrorContains(t, errs[0], want)
286 })
287 }
288}
289
290func testCtxErrs(factory func() func(context.Context) error) func(t *testing.T) {
291 return func(t *testing.T) {
292 t.Helper()
293 ctx, cancel := context.WithCancel(context.Background())
294 t.Cleanup(cancel)
295
296 t.Run("DeadlineExceeded", func(t *testing.T) {
297 innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
298 t.Cleanup(innerCancel)
299 <-innerCtx.Done()
300
301 f := factory()
302 assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
303 })
304
305 t.Run("Canceled", func(t *testing.T) {
306 innerCtx, innerCancel := context.WithCancel(ctx)
307 innerCancel()
308
309 f := factory()
310 assert.ErrorIs(t, f(innerCtx), context.Canceled)
311 })
312 }
313}
View as plain text