1// Code created by gotmpl. DO NOT MODIFY.
2// source: internal/shared/otlp/otlpmetric/otest/collector.go.tmpl
3
4// Copyright The OpenTelemetry Authors
5//
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18package otest
19
20import (
21 "bytes"
22 "compress/gzip"
23 "context"
24 "crypto/ecdsa"
25 "crypto/elliptic"
26 "crypto/rand"
27 "crypto/tls"
28 "crypto/x509"
29 "crypto/x509/pkix" // nolint:depguard // This is for testing.
30 "encoding/pem"
31 "errors"
32 "fmt"
33 "io"
34 "math/big"
35 "net"
36 "net/http"
37 "net/url"
38 "sync"
39 "time"
40
41 "google.golang.org/grpc"
42 "google.golang.org/grpc/metadata"
43 "google.golang.org/protobuf/proto"
44
45 "{{ .oconfImportPath }}"
46 collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
47 mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
48)
49
50// Collector is the collection target a Client sends metric uploads to.
51type Collector interface {
52 Collect() *Storage
53}
54
55type ExportResult struct {
56 Response *collpb.ExportMetricsServiceResponse
57 Err error
58}
59
60// Storage stores uploaded OTLP metric data in their proto form.
61type Storage struct {
62 dataMu sync.Mutex
63 data []*mpb.ResourceMetrics
64}
65
66// NewStorage returns a configure storage ready to store received requests.
67func NewStorage() *Storage {
68 return &Storage{}
69}
70
71// Add adds the request to the Storage.
72func (s *Storage) Add(request *collpb.ExportMetricsServiceRequest) {
73 s.dataMu.Lock()
74 defer s.dataMu.Unlock()
75 s.data = append(s.data, request.ResourceMetrics...)
76}
77
78// Dump returns all added ResourceMetrics and clears the storage.
79func (s *Storage) Dump() []*mpb.ResourceMetrics {
80 s.dataMu.Lock()
81 defer s.dataMu.Unlock()
82
83 var data []*mpb.ResourceMetrics
84 data, s.data = s.data, []*mpb.ResourceMetrics{}
85 return data
86}
87
88// GRPCCollector is an OTLP gRPC server that collects all requests it receives.
89type GRPCCollector struct {
90 collpb.UnimplementedMetricsServiceServer
91
92 headersMu sync.Mutex
93 headers metadata.MD
94 storage *Storage
95
96 resultCh <-chan ExportResult
97 listener net.Listener
98 srv *grpc.Server
99}
100
101// NewGRPCCollector returns a *GRPCCollector that is listening at the provided
102// endpoint.
103//
104// If endpoint is an empty string, the returned collector will be listening on
105// the localhost interface at an OS chosen port.
106//
107// If errCh is not nil, the collector will respond to Export calls with errors
108// sent on that channel. This means that if errCh is not nil Export calls will
109// block until an error is received.
110func NewGRPCCollector(endpoint string, resultCh <-chan ExportResult) (*GRPCCollector, error) {
111 if endpoint == "" {
112 endpoint = "localhost:0"
113 }
114
115 c := &GRPCCollector{
116 storage: NewStorage(),
117 resultCh: resultCh,
118 }
119
120 var err error
121 c.listener, err = net.Listen("tcp", endpoint)
122 if err != nil {
123 return nil, err
124 }
125
126 c.srv = grpc.NewServer()
127 collpb.RegisterMetricsServiceServer(c.srv, c)
128 go func() { _ = c.srv.Serve(c.listener) }()
129
130 return c, nil
131}
132
133// Shutdown shuts down the gRPC server closing all open connections and
134// listeners immediately.
135func (c *GRPCCollector) Shutdown() { c.srv.Stop() }
136
137// Addr returns the net.Addr c is listening at.
138func (c *GRPCCollector) Addr() net.Addr {
139 return c.listener.Addr()
140}
141
142// Collect returns the Storage holding all collected requests.
143func (c *GRPCCollector) Collect() *Storage {
144 return c.storage
145}
146
147// Headers returns the headers received for all requests.
148func (c *GRPCCollector) Headers() map[string][]string {
149 // Makes a copy.
150 c.headersMu.Lock()
151 defer c.headersMu.Unlock()
152 return metadata.Join(c.headers)
153}
154
155// Export handles the export req.
156func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsServiceRequest) (*collpb.ExportMetricsServiceResponse, error) {
157 c.storage.Add(req)
158
159 if h, ok := metadata.FromIncomingContext(ctx); ok {
160 c.headersMu.Lock()
161 c.headers = metadata.Join(c.headers, h)
162 c.headersMu.Unlock()
163 }
164
165 if c.resultCh != nil {
166 r := <-c.resultCh
167 if r.Response == nil {
168 return &collpb.ExportMetricsServiceResponse{}, r.Err
169 }
170 return r.Response, r.Err
171 }
172 return &collpb.ExportMetricsServiceResponse{}, nil
173}
174
175var emptyExportMetricsServiceResponse = func() []byte {
176 body := collpb.ExportMetricsServiceResponse{}
177 r, err := proto.Marshal(&body)
178 if err != nil {
179 panic(err)
180 }
181 return r
182}()
183
184type HTTPResponseError struct {
185 Err error
186 Status int
187 Header http.Header
188}
189
190func (e *HTTPResponseError) Error() string {
191 return fmt.Sprintf("%d: %s", e.Status, e.Err)
192}
193
194func (e *HTTPResponseError) Unwrap() error { return e.Err }
195
196// HTTPCollector is an OTLP HTTP server that collects all requests it receives.
197type HTTPCollector struct {
198 plainTextResponse bool
199
200 headersMu sync.Mutex
201 headers http.Header
202 storage *Storage
203
204 resultCh <-chan ExportResult
205 listener net.Listener
206 srv *http.Server
207}
208
209// NewHTTPCollector returns a *HTTPCollector that is listening at the provided
210// endpoint.
211//
212// If endpoint is an empty string, the returned collector will be listening on
213// the localhost interface at an OS chosen port, not use TLS, and listen at the
214// default OTLP metric endpoint path ("/v1/metrics"). If the endpoint contains
215// a prefix of "https" the server will generate weak self-signed TLS
216// certificates and use them to server data. If the endpoint contains a path,
217// that path will be used instead of the default OTLP metric endpoint path.
218//
219// If errCh is not nil, the collector will respond to HTTP requests with errors
220// sent on that channel. This means that if errCh is not nil Export calls will
221// block until an error is received.
222func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) {
223 u, err := url.Parse(endpoint)
224 if err != nil {
225 return nil, err
226 }
227 if u.Host == "" {
228 u.Host = "localhost:0"
229 }
230 if u.Path == "" {
231 u.Path = oconf.DefaultMetricsPath
232 }
233
234 c := &HTTPCollector{
235 headers: http.Header{},
236 storage: NewStorage(),
237 resultCh: resultCh,
238 }
239 for _, opt := range opts {
240 opt(c)
241 }
242
243 c.listener, err = net.Listen("tcp", u.Host)
244 if err != nil {
245 return nil, err
246 }
247
248 mux := http.NewServeMux()
249 mux.Handle(u.Path, http.HandlerFunc(c.handler))
250 c.srv = &http.Server{
251 Handler: mux,
252 ReadTimeout: 10 * time.Second,
253 WriteTimeout: 10 * time.Second,
254 }
255 if u.Scheme == "https" {
256 cert, err := weakCertificate()
257 if err != nil {
258 return nil, err
259 }
260 c.srv.TLSConfig = &tls.Config{
261 Certificates: []tls.Certificate{cert},
262 }
263 go func() { _ = c.srv.ServeTLS(c.listener, "", "") }()
264 } else {
265 go func() { _ = c.srv.Serve(c.listener) }()
266 }
267 return c, nil
268}
269
270// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return
271// a plaintext, instead of protobuf, response.
272func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) {
273 return func(s *HTTPCollector) {
274 s.plainTextResponse = true
275 }
276}
277
278// Shutdown shuts down the HTTP server closing all open connections and
279// listeners.
280func (c *HTTPCollector) Shutdown(ctx context.Context) error {
281 return c.srv.Shutdown(ctx)
282}
283
284// Addr returns the net.Addr c is listening at.
285func (c *HTTPCollector) Addr() net.Addr {
286 return c.listener.Addr()
287}
288
289// Collect returns the Storage holding all collected requests.
290func (c *HTTPCollector) Collect() *Storage {
291 return c.storage
292}
293
294// Headers returns the headers received for all requests.
295func (c *HTTPCollector) Headers() map[string][]string {
296 // Makes a copy.
297 c.headersMu.Lock()
298 defer c.headersMu.Unlock()
299 return c.headers.Clone()
300}
301
302func (c *HTTPCollector) handler(w http.ResponseWriter, r *http.Request) {
303 c.respond(w, c.record(r))
304}
305
306func (c *HTTPCollector) record(r *http.Request) ExportResult {
307 // Currently only supports protobuf.
308 if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" {
309 err := fmt.Errorf("content-type not supported: %s", v)
310 return ExportResult{Err: err}
311 }
312
313 body, err := c.readBody(r)
314 if err != nil {
315 return ExportResult{Err: err}
316 }
317 pbRequest := &collpb.ExportMetricsServiceRequest{}
318 err = proto.Unmarshal(body, pbRequest)
319 if err != nil {
320 return ExportResult{
321 Err: &HTTPResponseError{
322 Err: err,
323 Status: http.StatusInternalServerError,
324 },
325 }
326 }
327 c.storage.Add(pbRequest)
328
329 c.headersMu.Lock()
330 for k, vals := range r.Header {
331 for _, v := range vals {
332 c.headers.Add(k, v)
333 }
334 }
335 c.headersMu.Unlock()
336
337 if c.resultCh != nil {
338 return <-c.resultCh
339 }
340 return ExportResult{Err: err}
341}
342
343func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) {
344 var reader io.ReadCloser
345 switch r.Header.Get("Content-Encoding") {
346 case "gzip":
347 reader, err = gzip.NewReader(r.Body)
348 if err != nil {
349 _ = reader.Close()
350 return nil, &HTTPResponseError{
351 Err: err,
352 Status: http.StatusInternalServerError,
353 }
354 }
355 default:
356 reader = r.Body
357 }
358
359 defer func() {
360 cErr := reader.Close()
361 if err == nil && cErr != nil {
362 err = &HTTPResponseError{
363 Err: cErr,
364 Status: http.StatusInternalServerError,
365 }
366 }
367 }()
368 body, err = io.ReadAll(reader)
369 if err != nil {
370 err = &HTTPResponseError{
371 Err: err,
372 Status: http.StatusInternalServerError,
373 }
374 }
375 return body, err
376}
377
378func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
379 if resp.Err != nil {
380 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
381 w.Header().Set("X-Content-Type-Options", "nosniff")
382 var e *HTTPResponseError
383 if errors.As(resp.Err, &e) {
384 for k, vals := range e.Header {
385 for _, v := range vals {
386 w.Header().Add(k, v)
387 }
388 }
389 w.WriteHeader(e.Status)
390 fmt.Fprintln(w, e.Error())
391 } else {
392 w.WriteHeader(http.StatusBadRequest)
393 fmt.Fprintln(w, resp.Err.Error())
394 }
395 return
396 }
397
398 if c.plainTextResponse {
399 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
400 w.WriteHeader(http.StatusOK)
401 _, _ = w.Write([]byte("OK"))
402 return
403 }
404
405 w.Header().Set("Content-Type", "application/x-protobuf")
406 w.WriteHeader(http.StatusOK)
407 if resp.Response == nil {
408 _, _ = w.Write(emptyExportMetricsServiceResponse)
409 } else {
410 r, err := proto.Marshal(resp.Response)
411 if err != nil {
412 panic(err)
413 }
414 _, _ = w.Write(r)
415 }
416}
417
418// Based on https://golang.org/src/crypto/tls/generate_cert.go,
419// simplified and weakened.
420func weakCertificate() (tls.Certificate, error) {
421 priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
422 if err != nil {
423 return tls.Certificate{}, err
424 }
425 notBefore := time.Now()
426 notAfter := notBefore.Add(time.Hour)
427 max := new(big.Int).Lsh(big.NewInt(1), 128)
428 sn, err := rand.Int(rand.Reader, max)
429 if err != nil {
430 return tls.Certificate{}, err
431 }
432 tmpl := x509.Certificate{
433 SerialNumber: sn,
434 Subject: pkix.Name{Organization: []string{"otel-go"}},
435 NotBefore: notBefore,
436 NotAfter: notAfter,
437 KeyUsage: x509.KeyUsageDigitalSignature,
438 ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
439 BasicConstraintsValid: true,
440 DNSNames: []string{"localhost"},
441 IPAddresses: []net.IP{net.IPv6loopback, net.IPv4(127, 0, 0, 1)},
442 }
443 derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv)
444 if err != nil {
445 return tls.Certificate{}, err
446 }
447 var certBuf bytes.Buffer
448 err = pem.Encode(&certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
449 if err != nil {
450 return tls.Certificate{}, err
451 }
452 privBytes, err := x509.MarshalPKCS8PrivateKey(priv)
453 if err != nil {
454 return tls.Certificate{}, err
455 }
456 var privBuf bytes.Buffer
457 err = pem.Encode(&privBuf, &pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
458 if err != nil {
459 return tls.Certificate{}, err
460 }
461 return tls.X509KeyPair(certBuf.Bytes(), privBuf.Bytes())
462}
View as plain text