...

Text file src/go.opentelemetry.io/otel/internal/shared/otlp/otlpmetric/otest/collector.go.tmpl

Documentation: go.opentelemetry.io/otel/internal/shared/otlp/otlpmetric/otest

     1// Code created by gotmpl. DO NOT MODIFY.
     2// source: internal/shared/otlp/otlpmetric/otest/collector.go.tmpl
     3
     4// Copyright The OpenTelemetry Authors
     5//
     6// Licensed under the Apache License, Version 2.0 (the "License");
     7// you may not use this file except in compliance with the License.
     8// You may obtain a copy of the License at
     9//
    10//     http://www.apache.org/licenses/LICENSE-2.0
    11//
    12// Unless required by applicable law or agreed to in writing, software
    13// distributed under the License is distributed on an "AS IS" BASIS,
    14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15// See the License for the specific language governing permissions and
    16// limitations under the License.
    17
    18package otest
    19
    20import (
    21	"bytes"
    22	"compress/gzip"
    23	"context"
    24	"crypto/ecdsa"
    25	"crypto/elliptic"
    26	"crypto/rand"
    27	"crypto/tls"
    28	"crypto/x509"
    29	"crypto/x509/pkix" // nolint:depguard  // This is for testing.
    30	"encoding/pem"
    31	"errors"
    32	"fmt"
    33	"io"
    34	"math/big"
    35	"net"
    36	"net/http"
    37	"net/url"
    38	"sync"
    39	"time"
    40
    41	"google.golang.org/grpc"
    42	"google.golang.org/grpc/metadata"
    43	"google.golang.org/protobuf/proto"
    44
    45	"{{ .oconfImportPath }}"
    46	collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
    47	mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
    48)
    49
    50// Collector is the collection target a Client sends metric uploads to.
    51type Collector interface {
    52	Collect() *Storage
    53}
    54
    55type ExportResult struct {
    56	Response *collpb.ExportMetricsServiceResponse
    57	Err      error
    58}
    59
    60// Storage stores uploaded OTLP metric data in their proto form.
    61type Storage struct {
    62	dataMu sync.Mutex
    63	data   []*mpb.ResourceMetrics
    64}
    65
    66// NewStorage returns a configure storage ready to store received requests.
    67func NewStorage() *Storage {
    68	return &Storage{}
    69}
    70
    71// Add adds the request to the Storage.
    72func (s *Storage) Add(request *collpb.ExportMetricsServiceRequest) {
    73	s.dataMu.Lock()
    74	defer s.dataMu.Unlock()
    75	s.data = append(s.data, request.ResourceMetrics...)
    76}
    77
    78// Dump returns all added ResourceMetrics and clears the storage.
    79func (s *Storage) Dump() []*mpb.ResourceMetrics {
    80	s.dataMu.Lock()
    81	defer s.dataMu.Unlock()
    82
    83	var data []*mpb.ResourceMetrics
    84	data, s.data = s.data, []*mpb.ResourceMetrics{}
    85	return data
    86}
    87
    88// GRPCCollector is an OTLP gRPC server that collects all requests it receives.
    89type GRPCCollector struct {
    90	collpb.UnimplementedMetricsServiceServer
    91
    92	headersMu sync.Mutex
    93	headers   metadata.MD
    94	storage   *Storage
    95
    96	resultCh <-chan ExportResult
    97	listener net.Listener
    98	srv      *grpc.Server
    99}
   100
   101// NewGRPCCollector returns a *GRPCCollector that is listening at the provided
   102// endpoint.
   103//
   104// If endpoint is an empty string, the returned collector will be listening on
   105// the localhost interface at an OS chosen port.
   106//
   107// If errCh is not nil, the collector will respond to Export calls with errors
   108// sent on that channel. This means that if errCh is not nil Export calls will
   109// block until an error is received.
   110func NewGRPCCollector(endpoint string, resultCh <-chan ExportResult) (*GRPCCollector, error) {
   111	if endpoint == "" {
   112		endpoint = "localhost:0"
   113	}
   114
   115	c := &GRPCCollector{
   116		storage:  NewStorage(),
   117		resultCh: resultCh,
   118	}
   119
   120	var err error
   121	c.listener, err = net.Listen("tcp", endpoint)
   122	if err != nil {
   123		return nil, err
   124	}
   125
   126	c.srv = grpc.NewServer()
   127	collpb.RegisterMetricsServiceServer(c.srv, c)
   128	go func() { _ = c.srv.Serve(c.listener) }()
   129
   130	return c, nil
   131}
   132
   133// Shutdown shuts down the gRPC server closing all open connections and
   134// listeners immediately.
   135func (c *GRPCCollector) Shutdown() { c.srv.Stop() }
   136
   137// Addr returns the net.Addr c is listening at.
   138func (c *GRPCCollector) Addr() net.Addr {
   139	return c.listener.Addr()
   140}
   141
   142// Collect returns the Storage holding all collected requests.
   143func (c *GRPCCollector) Collect() *Storage {
   144	return c.storage
   145}
   146
   147// Headers returns the headers received for all requests.
   148func (c *GRPCCollector) Headers() map[string][]string {
   149	// Makes a copy.
   150	c.headersMu.Lock()
   151	defer c.headersMu.Unlock()
   152	return metadata.Join(c.headers)
   153}
   154
   155// Export handles the export req.
   156func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsServiceRequest) (*collpb.ExportMetricsServiceResponse, error) {
   157	c.storage.Add(req)
   158
   159	if h, ok := metadata.FromIncomingContext(ctx); ok {
   160		c.headersMu.Lock()
   161		c.headers = metadata.Join(c.headers, h)
   162		c.headersMu.Unlock()
   163	}
   164
   165	if c.resultCh != nil {
   166		r := <-c.resultCh
   167		if r.Response == nil {
   168			return &collpb.ExportMetricsServiceResponse{}, r.Err
   169		}
   170		return r.Response, r.Err
   171	}
   172	return &collpb.ExportMetricsServiceResponse{}, nil
   173}
   174
   175var emptyExportMetricsServiceResponse = func() []byte {
   176	body := collpb.ExportMetricsServiceResponse{}
   177	r, err := proto.Marshal(&body)
   178	if err != nil {
   179		panic(err)
   180	}
   181	return r
   182}()
   183
   184type HTTPResponseError struct {
   185	Err    error
   186	Status int
   187	Header http.Header
   188}
   189
   190func (e *HTTPResponseError) Error() string {
   191	return fmt.Sprintf("%d: %s", e.Status, e.Err)
   192}
   193
   194func (e *HTTPResponseError) Unwrap() error { return e.Err }
   195
   196// HTTPCollector is an OTLP HTTP server that collects all requests it receives.
   197type HTTPCollector struct {
   198	plainTextResponse bool
   199
   200	headersMu sync.Mutex
   201	headers   http.Header
   202	storage   *Storage
   203
   204	resultCh <-chan ExportResult
   205	listener net.Listener
   206	srv      *http.Server
   207}
   208
   209// NewHTTPCollector returns a *HTTPCollector that is listening at the provided
   210// endpoint.
   211//
   212// If endpoint is an empty string, the returned collector will be listening on
   213// the localhost interface at an OS chosen port, not use TLS, and listen at the
   214// default OTLP metric endpoint path ("/v1/metrics"). If the endpoint contains
   215// a prefix of "https" the server will generate weak self-signed TLS
   216// certificates and use them to server data. If the endpoint contains a path,
   217// that path will be used instead of the default OTLP metric endpoint path.
   218//
   219// If errCh is not nil, the collector will respond to HTTP requests with errors
   220// sent on that channel. This means that if errCh is not nil Export calls will
   221// block until an error is received.
   222func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) {
   223	u, err := url.Parse(endpoint)
   224	if err != nil {
   225		return nil, err
   226	}
   227	if u.Host == "" {
   228		u.Host = "localhost:0"
   229	}
   230	if u.Path == "" {
   231		u.Path = oconf.DefaultMetricsPath
   232	}
   233
   234	c := &HTTPCollector{
   235		headers:  http.Header{},
   236		storage:  NewStorage(),
   237		resultCh: resultCh,
   238	}
   239	for _, opt := range opts {
   240		opt(c)
   241	}
   242
   243	c.listener, err = net.Listen("tcp", u.Host)
   244	if err != nil {
   245		return nil, err
   246	}
   247
   248	mux := http.NewServeMux()
   249	mux.Handle(u.Path, http.HandlerFunc(c.handler))
   250	c.srv = &http.Server{
   251		Handler:      mux,
   252		ReadTimeout:  10 * time.Second,
   253		WriteTimeout: 10 * time.Second,
   254	}
   255	if u.Scheme == "https" {
   256		cert, err := weakCertificate()
   257		if err != nil {
   258			return nil, err
   259		}
   260		c.srv.TLSConfig = &tls.Config{
   261			Certificates: []tls.Certificate{cert},
   262		}
   263		go func() { _ = c.srv.ServeTLS(c.listener, "", "") }()
   264	} else {
   265		go func() { _ = c.srv.Serve(c.listener) }()
   266	}
   267	return c, nil
   268}
   269
   270// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return
   271// a plaintext, instead of protobuf, response.
   272func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) {
   273	return func(s *HTTPCollector) {
   274		s.plainTextResponse = true
   275	}
   276}
   277
   278// Shutdown shuts down the HTTP server closing all open connections and
   279// listeners.
   280func (c *HTTPCollector) Shutdown(ctx context.Context) error {
   281	return c.srv.Shutdown(ctx)
   282}
   283
   284// Addr returns the net.Addr c is listening at.
   285func (c *HTTPCollector) Addr() net.Addr {
   286	return c.listener.Addr()
   287}
   288
   289// Collect returns the Storage holding all collected requests.
   290func (c *HTTPCollector) Collect() *Storage {
   291	return c.storage
   292}
   293
   294// Headers returns the headers received for all requests.
   295func (c *HTTPCollector) Headers() map[string][]string {
   296	// Makes a copy.
   297	c.headersMu.Lock()
   298	defer c.headersMu.Unlock()
   299	return c.headers.Clone()
   300}
   301
   302func (c *HTTPCollector) handler(w http.ResponseWriter, r *http.Request) {
   303	c.respond(w, c.record(r))
   304}
   305
   306func (c *HTTPCollector) record(r *http.Request) ExportResult {
   307	// Currently only supports protobuf.
   308	if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" {
   309		err := fmt.Errorf("content-type not supported: %s", v)
   310		return ExportResult{Err: err}
   311	}
   312
   313	body, err := c.readBody(r)
   314	if err != nil {
   315		return ExportResult{Err: err}
   316	}
   317	pbRequest := &collpb.ExportMetricsServiceRequest{}
   318	err = proto.Unmarshal(body, pbRequest)
   319	if err != nil {
   320		return ExportResult{
   321			Err: &HTTPResponseError{
   322				Err:    err,
   323				Status: http.StatusInternalServerError,
   324			},
   325		}
   326	}
   327	c.storage.Add(pbRequest)
   328
   329	c.headersMu.Lock()
   330	for k, vals := range r.Header {
   331		for _, v := range vals {
   332			c.headers.Add(k, v)
   333		}
   334	}
   335	c.headersMu.Unlock()
   336
   337	if c.resultCh != nil {
   338		return <-c.resultCh
   339	}
   340	return ExportResult{Err: err}
   341}
   342
   343func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) {
   344	var reader io.ReadCloser
   345	switch r.Header.Get("Content-Encoding") {
   346	case "gzip":
   347		reader, err = gzip.NewReader(r.Body)
   348		if err != nil {
   349			_ = reader.Close()
   350			return nil, &HTTPResponseError{
   351				Err:    err,
   352				Status: http.StatusInternalServerError,
   353			}
   354		}
   355	default:
   356		reader = r.Body
   357	}
   358
   359	defer func() {
   360		cErr := reader.Close()
   361		if err == nil && cErr != nil {
   362			err = &HTTPResponseError{
   363				Err:    cErr,
   364				Status: http.StatusInternalServerError,
   365			}
   366		}
   367	}()
   368	body, err = io.ReadAll(reader)
   369	if err != nil {
   370		err = &HTTPResponseError{
   371			Err:    err,
   372			Status: http.StatusInternalServerError,
   373		}
   374	}
   375	return body, err
   376}
   377
   378func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
   379	if resp.Err != nil {
   380		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
   381		w.Header().Set("X-Content-Type-Options", "nosniff")
   382		var e *HTTPResponseError
   383		if errors.As(resp.Err, &e) {
   384			for k, vals := range e.Header {
   385				for _, v := range vals {
   386					w.Header().Add(k, v)
   387				}
   388			}
   389			w.WriteHeader(e.Status)
   390			fmt.Fprintln(w, e.Error())
   391		} else {
   392			w.WriteHeader(http.StatusBadRequest)
   393			fmt.Fprintln(w, resp.Err.Error())
   394		}
   395		return
   396	}
   397
   398	if c.plainTextResponse {
   399		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
   400		w.WriteHeader(http.StatusOK)
   401		_, _ = w.Write([]byte("OK"))
   402		return
   403	}
   404
   405	w.Header().Set("Content-Type", "application/x-protobuf")
   406	w.WriteHeader(http.StatusOK)
   407	if resp.Response == nil {
   408		_, _ = w.Write(emptyExportMetricsServiceResponse)
   409	} else {
   410		r, err := proto.Marshal(resp.Response)
   411		if err != nil {
   412			panic(err)
   413		}
   414		_, _ = w.Write(r)
   415	}
   416}
   417
   418// Based on https://golang.org/src/crypto/tls/generate_cert.go,
   419// simplified and weakened.
   420func weakCertificate() (tls.Certificate, error) {
   421	priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
   422	if err != nil {
   423		return tls.Certificate{}, err
   424	}
   425	notBefore := time.Now()
   426	notAfter := notBefore.Add(time.Hour)
   427	max := new(big.Int).Lsh(big.NewInt(1), 128)
   428	sn, err := rand.Int(rand.Reader, max)
   429	if err != nil {
   430		return tls.Certificate{}, err
   431	}
   432	tmpl := x509.Certificate{
   433		SerialNumber:          sn,
   434		Subject:               pkix.Name{Organization: []string{"otel-go"}},
   435		NotBefore:             notBefore,
   436		NotAfter:              notAfter,
   437		KeyUsage:              x509.KeyUsageDigitalSignature,
   438		ExtKeyUsage:           []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
   439		BasicConstraintsValid: true,
   440		DNSNames:              []string{"localhost"},
   441		IPAddresses:           []net.IP{net.IPv6loopback, net.IPv4(127, 0, 0, 1)},
   442	}
   443	derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv)
   444	if err != nil {
   445		return tls.Certificate{}, err
   446	}
   447	var certBuf bytes.Buffer
   448	err = pem.Encode(&certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
   449	if err != nil {
   450		return tls.Certificate{}, err
   451	}
   452	privBytes, err := x509.MarshalPKCS8PrivateKey(priv)
   453	if err != nil {
   454		return tls.Certificate{}, err
   455	}
   456	var privBuf bytes.Buffer
   457	err = pem.Encode(&privBuf, &pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
   458	if err != nil {
   459		return tls.Certificate{}, err
   460	}
   461	return tls.X509KeyPair(certBuf.Bytes(), privBuf.Bytes())
   462}

View as plain text