...

Source file src/github.com/grpc-ecosystem/go-grpc-prometheus/client_metrics.go

Documentation: github.com/grpc-ecosystem/go-grpc-prometheus

     1  package grpc_prometheus
     2  
     3  import (
     4  	"io"
     5  
     6  	prom "github.com/prometheus/client_golang/prometheus"
     7  	"golang.org/x/net/context"
     8  	"google.golang.org/grpc"
     9  	"google.golang.org/grpc/codes"
    10  	"google.golang.org/grpc/status"
    11  )
    12  
    13  // ClientMetrics represents a collection of metrics to be registered on a
    14  // Prometheus metrics registry for a gRPC client.
    15  type ClientMetrics struct {
    16  	clientStartedCounter          *prom.CounterVec
    17  	clientHandledCounter          *prom.CounterVec
    18  	clientStreamMsgReceived       *prom.CounterVec
    19  	clientStreamMsgSent           *prom.CounterVec
    20  	clientHandledHistogramEnabled bool
    21  	clientHandledHistogramOpts    prom.HistogramOpts
    22  	clientHandledHistogram        *prom.HistogramVec
    23  }
    24  
    25  // NewClientMetrics returns a ClientMetrics object. Use a new instance of
    26  // ClientMetrics when not using the default Prometheus metrics registry, for
    27  // example when wanting to control which metrics are added to a registry as
    28  // opposed to automatically adding metrics via init functions.
    29  func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
    30  	opts := counterOptions(counterOpts)
    31  	return &ClientMetrics{
    32  		clientStartedCounter: prom.NewCounterVec(
    33  			opts.apply(prom.CounterOpts{
    34  				Name: "grpc_client_started_total",
    35  				Help: "Total number of RPCs started on the client.",
    36  			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
    37  
    38  		clientHandledCounter: prom.NewCounterVec(
    39  			opts.apply(prom.CounterOpts{
    40  				Name: "grpc_client_handled_total",
    41  				Help: "Total number of RPCs completed by the client, regardless of success or failure.",
    42  			}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
    43  
    44  		clientStreamMsgReceived: prom.NewCounterVec(
    45  			opts.apply(prom.CounterOpts{
    46  				Name: "grpc_client_msg_received_total",
    47  				Help: "Total number of RPC stream messages received by the client.",
    48  			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
    49  
    50  		clientStreamMsgSent: prom.NewCounterVec(
    51  			opts.apply(prom.CounterOpts{
    52  				Name: "grpc_client_msg_sent_total",
    53  				Help: "Total number of gRPC stream messages sent by the client.",
    54  			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
    55  
    56  		clientHandledHistogramEnabled: false,
    57  		clientHandledHistogramOpts: prom.HistogramOpts{
    58  			Name:    "grpc_client_handling_seconds",
    59  			Help:    "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
    60  			Buckets: prom.DefBuckets,
    61  		},
    62  		clientHandledHistogram: nil,
    63  	}
    64  }
    65  
    66  // Describe sends the super-set of all possible descriptors of metrics
    67  // collected by this Collector to the provided channel and returns once
    68  // the last descriptor has been sent.
    69  func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
    70  	m.clientStartedCounter.Describe(ch)
    71  	m.clientHandledCounter.Describe(ch)
    72  	m.clientStreamMsgReceived.Describe(ch)
    73  	m.clientStreamMsgSent.Describe(ch)
    74  	if m.clientHandledHistogramEnabled {
    75  		m.clientHandledHistogram.Describe(ch)
    76  	}
    77  }
    78  
    79  // Collect is called by the Prometheus registry when collecting
    80  // metrics. The implementation sends each collected metric via the
    81  // provided channel and returns once the last metric has been sent.
    82  func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
    83  	m.clientStartedCounter.Collect(ch)
    84  	m.clientHandledCounter.Collect(ch)
    85  	m.clientStreamMsgReceived.Collect(ch)
    86  	m.clientStreamMsgSent.Collect(ch)
    87  	if m.clientHandledHistogramEnabled {
    88  		m.clientHandledHistogram.Collect(ch)
    89  	}
    90  }
    91  
    92  // EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
    93  // Histogram metrics can be very expensive for Prometheus to retain and query.
    94  func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
    95  	for _, o := range opts {
    96  		o(&m.clientHandledHistogramOpts)
    97  	}
    98  	if !m.clientHandledHistogramEnabled {
    99  		m.clientHandledHistogram = prom.NewHistogramVec(
   100  			m.clientHandledHistogramOpts,
   101  			[]string{"grpc_type", "grpc_service", "grpc_method"},
   102  		)
   103  	}
   104  	m.clientHandledHistogramEnabled = true
   105  }
   106  
   107  // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
   108  func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   109  	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   110  		monitor := newClientReporter(m, Unary, method)
   111  		monitor.SentMessage()
   112  		err := invoker(ctx, method, req, reply, cc, opts...)
   113  		if err != nil {
   114  			monitor.ReceivedMessage()
   115  		}
   116  		st, _ := status.FromError(err)
   117  		monitor.Handled(st.Code())
   118  		return err
   119  	}
   120  }
   121  
   122  // StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
   123  func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
   124  	return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
   125  		monitor := newClientReporter(m, clientStreamType(desc), method)
   126  		clientStream, err := streamer(ctx, desc, cc, method, opts...)
   127  		if err != nil {
   128  			st, _ := status.FromError(err)
   129  			monitor.Handled(st.Code())
   130  			return nil, err
   131  		}
   132  		return &monitoredClientStream{clientStream, monitor}, nil
   133  	}
   134  }
   135  
   136  func clientStreamType(desc *grpc.StreamDesc) grpcType {
   137  	if desc.ClientStreams && !desc.ServerStreams {
   138  		return ClientStream
   139  	} else if !desc.ClientStreams && desc.ServerStreams {
   140  		return ServerStream
   141  	}
   142  	return BidiStream
   143  }
   144  
   145  // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
   146  type monitoredClientStream struct {
   147  	grpc.ClientStream
   148  	monitor *clientReporter
   149  }
   150  
   151  func (s *monitoredClientStream) SendMsg(m interface{}) error {
   152  	err := s.ClientStream.SendMsg(m)
   153  	if err == nil {
   154  		s.monitor.SentMessage()
   155  	}
   156  	return err
   157  }
   158  
   159  func (s *monitoredClientStream) RecvMsg(m interface{}) error {
   160  	err := s.ClientStream.RecvMsg(m)
   161  	if err == nil {
   162  		s.monitor.ReceivedMessage()
   163  	} else if err == io.EOF {
   164  		s.monitor.Handled(codes.OK)
   165  	} else {
   166  		st, _ := status.FromError(err)
   167  		s.monitor.Handled(st.Code())
   168  	}
   169  	return err
   170  }
   171  

View as plain text