...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package otlptracegrpc
16
17 import (
18 "context"
19 "errors"
20 "sync"
21 "time"
22
23 "google.golang.org/genproto/googleapis/rpc/errdetails"
24 "google.golang.org/grpc"
25 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/metadata"
27 "google.golang.org/grpc/status"
28
29 "go.opentelemetry.io/otel"
30 "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
31 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
32 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
33 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
34 coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
35 tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
36 )
37
38 type client struct {
39 endpoint string
40 dialOpts []grpc.DialOption
41 metadata metadata.MD
42 exportTimeout time.Duration
43 requestFunc retry.RequestFunc
44
45
46
47 stopCtx context.Context
48
49 stopFunc context.CancelFunc
50
51
52
53
54
55 ourConn bool
56 conn *grpc.ClientConn
57 tscMu sync.RWMutex
58 tsc coltracepb.TraceServiceClient
59 }
60
61
62 var _ otlptrace.Client = (*client)(nil)
63
64
65 func NewClient(opts ...Option) otlptrace.Client {
66 return newClient(opts...)
67 }
68
69 func newClient(opts ...Option) *client {
70 cfg := otlpconfig.NewGRPCConfig(asGRPCOptions(opts)...)
71
72 ctx, cancel := context.WithCancel(context.Background())
73
74 c := &client{
75 endpoint: cfg.Traces.Endpoint,
76 exportTimeout: cfg.Traces.Timeout,
77 requestFunc: cfg.RetryConfig.RequestFunc(retryable),
78 dialOpts: cfg.DialOptions,
79 stopCtx: ctx,
80 stopFunc: cancel,
81 conn: cfg.GRPCConn,
82 }
83
84 if len(cfg.Traces.Headers) > 0 {
85 c.metadata = metadata.New(cfg.Traces.Headers)
86 }
87
88 return c
89 }
90
91
92 func (c *client) Start(ctx context.Context) error {
93 if c.conn == nil {
94
95
96 conn, err := grpc.DialContext(ctx, c.endpoint, c.dialOpts...)
97 if err != nil {
98 return err
99 }
100
101
102 c.ourConn = true
103 c.conn = conn
104 }
105
106
107
108 c.tscMu.Lock()
109 c.tsc = coltracepb.NewTraceServiceClient(c.conn)
110 c.tscMu.Unlock()
111
112 return nil
113 }
114
115 var errAlreadyStopped = errors.New("the client is already stopped")
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 func (c *client) Stop(ctx context.Context) error {
133
134 err := ctx.Err()
135
136
137 acquired := make(chan struct{})
138 go func() {
139 c.tscMu.Lock()
140 close(acquired)
141 }()
142
143 select {
144 case <-ctx.Done():
145
146
147
148 c.stopFunc()
149 err = ctx.Err()
150
151
152
153
154 <-acquired
155 case <-acquired:
156 }
157
158
159 defer c.tscMu.Unlock()
160
161
162
163
164
165 if c.tsc == nil {
166 return errAlreadyStopped
167 }
168
169
170 c.tsc = nil
171
172 if c.ourConn {
173 closeErr := c.conn.Close()
174
175 if err == nil && closeErr != nil {
176 err = closeErr
177 }
178 }
179 return err
180 }
181
182 var errShutdown = errors.New("the client is shutdown")
183
184
185
186
187
188 func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
189
190
191
192
193 c.tscMu.RLock()
194 defer c.tscMu.RUnlock()
195
196 if c.tsc == nil {
197 return errShutdown
198 }
199
200 ctx, cancel := c.exportContext(ctx)
201 defer cancel()
202
203 return c.requestFunc(ctx, func(iCtx context.Context) error {
204 resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{
205 ResourceSpans: protoSpans,
206 })
207 if resp != nil && resp.PartialSuccess != nil {
208 msg := resp.PartialSuccess.GetErrorMessage()
209 n := resp.PartialSuccess.GetRejectedSpans()
210 if n != 0 || msg != "" {
211 err := internal.TracePartialSuccessError(n, msg)
212 otel.Handle(err)
213 }
214 }
215
216 if status.Code(err) == codes.OK {
217
218 return nil
219 }
220 return err
221 })
222 }
223
224
225
226
227
228
229
230 func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) {
231 var (
232 ctx context.Context
233 cancel context.CancelFunc
234 )
235
236 if c.exportTimeout > 0 {
237 ctx, cancel = context.WithTimeout(parent, c.exportTimeout)
238 } else {
239 ctx, cancel = context.WithCancel(parent)
240 }
241
242 if c.metadata.Len() > 0 {
243 ctx = metadata.NewOutgoingContext(ctx, c.metadata)
244 }
245
246
247 go func() {
248 select {
249 case <-ctx.Done():
250 case <-c.stopCtx.Done():
251
252 cancel()
253 }
254 }()
255
256 return ctx, cancel
257 }
258
259
260
261 func retryable(err error) (bool, time.Duration) {
262 s := status.Convert(err)
263 return retryableGRPCStatus(s)
264 }
265
266 func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
267 switch s.Code() {
268 case codes.Canceled,
269 codes.DeadlineExceeded,
270 codes.Aborted,
271 codes.OutOfRange,
272 codes.Unavailable,
273 codes.DataLoss:
274
275 _, d := throttleDelay(s)
276 return true, d
277 case codes.ResourceExhausted:
278
279 return throttleDelay(s)
280 }
281
282
283 return false, 0
284 }
285
286
287
288 func throttleDelay(s *status.Status) (bool, time.Duration) {
289 for _, detail := range s.Details() {
290 if t, ok := detail.(*errdetails.RetryInfo); ok {
291 return true, t.RetryDelay.AsDuration()
292 }
293 }
294 return false, 0
295 }
296
297
298 func (c *client) MarshalLog() interface{} {
299 return struct {
300 Type string
301 Endpoint string
302 }{
303 Type: "otlphttpgrpc",
304 Endpoint: c.endpoint,
305 }
306 }
307
View as plain text