...

Source file src/github.com/grpc-ecosystem/go-grpc-prometheus/server_metrics.go

Documentation: github.com/grpc-ecosystem/go-grpc-prometheus

     1  package grpc_prometheus
     2  
     3  import (
     4  	prom "github.com/prometheus/client_golang/prometheus"
     5  	"golang.org/x/net/context"
     6  	"google.golang.org/grpc"
     7  	"google.golang.org/grpc/status"
     8  )
     9  
    10  // ServerMetrics represents a collection of metrics to be registered on a
    11  // Prometheus metrics registry for a gRPC server.
    12  type ServerMetrics struct {
    13  	serverStartedCounter          *prom.CounterVec
    14  	serverHandledCounter          *prom.CounterVec
    15  	serverStreamMsgReceived       *prom.CounterVec
    16  	serverStreamMsgSent           *prom.CounterVec
    17  	serverHandledHistogramEnabled bool
    18  	serverHandledHistogramOpts    prom.HistogramOpts
    19  	serverHandledHistogram        *prom.HistogramVec
    20  }
    21  
    22  // NewServerMetrics returns a ServerMetrics object. Use a new instance of
    23  // ServerMetrics when not using the default Prometheus metrics registry, for
    24  // example when wanting to control which metrics are added to a registry as
    25  // opposed to automatically adding metrics via init functions.
    26  func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
    27  	opts := counterOptions(counterOpts)
    28  	return &ServerMetrics{
    29  		serverStartedCounter: prom.NewCounterVec(
    30  			opts.apply(prom.CounterOpts{
    31  				Name: "grpc_server_started_total",
    32  				Help: "Total number of RPCs started on the server.",
    33  			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
    34  		serverHandledCounter: prom.NewCounterVec(
    35  			opts.apply(prom.CounterOpts{
    36  				Name: "grpc_server_handled_total",
    37  				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
    38  			}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
    39  		serverStreamMsgReceived: prom.NewCounterVec(
    40  			opts.apply(prom.CounterOpts{
    41  				Name: "grpc_server_msg_received_total",
    42  				Help: "Total number of RPC stream messages received on the server.",
    43  			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
    44  		serverStreamMsgSent: prom.NewCounterVec(
    45  			opts.apply(prom.CounterOpts{
    46  				Name: "grpc_server_msg_sent_total",
    47  				Help: "Total number of gRPC stream messages sent by the server.",
    48  			}), []string{"grpc_type", "grpc_service", "grpc_method"}),
    49  		serverHandledHistogramEnabled: false,
    50  		serverHandledHistogramOpts: prom.HistogramOpts{
    51  			Name:    "grpc_server_handling_seconds",
    52  			Help:    "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
    53  			Buckets: prom.DefBuckets,
    54  		},
    55  		serverHandledHistogram: nil,
    56  	}
    57  }
    58  
    59  // EnableHandlingTimeHistogram enables histograms being registered when
    60  // registering the ServerMetrics on a Prometheus registry. Histograms can be
    61  // expensive on Prometheus servers. It takes options to configure histogram
    62  // options such as the defined buckets.
    63  func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
    64  	for _, o := range opts {
    65  		o(&m.serverHandledHistogramOpts)
    66  	}
    67  	if !m.serverHandledHistogramEnabled {
    68  		m.serverHandledHistogram = prom.NewHistogramVec(
    69  			m.serverHandledHistogramOpts,
    70  			[]string{"grpc_type", "grpc_service", "grpc_method"},
    71  		)
    72  	}
    73  	m.serverHandledHistogramEnabled = true
    74  }
    75  
    76  // Describe sends the super-set of all possible descriptors of metrics
    77  // collected by this Collector to the provided channel and returns once
    78  // the last descriptor has been sent.
    79  func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
    80  	m.serverStartedCounter.Describe(ch)
    81  	m.serverHandledCounter.Describe(ch)
    82  	m.serverStreamMsgReceived.Describe(ch)
    83  	m.serverStreamMsgSent.Describe(ch)
    84  	if m.serverHandledHistogramEnabled {
    85  		m.serverHandledHistogram.Describe(ch)
    86  	}
    87  }
    88  
    89  // Collect is called by the Prometheus registry when collecting
    90  // metrics. The implementation sends each collected metric via the
    91  // provided channel and returns once the last metric has been sent.
    92  func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
    93  	m.serverStartedCounter.Collect(ch)
    94  	m.serverHandledCounter.Collect(ch)
    95  	m.serverStreamMsgReceived.Collect(ch)
    96  	m.serverStreamMsgSent.Collect(ch)
    97  	if m.serverHandledHistogramEnabled {
    98  		m.serverHandledHistogram.Collect(ch)
    99  	}
   100  }
   101  
   102  // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
   103  func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
   104  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
   105  		monitor := newServerReporter(m, Unary, info.FullMethod)
   106  		monitor.ReceivedMessage()
   107  		resp, err := handler(ctx, req)
   108  		st, _ := status.FromError(err)
   109  		monitor.Handled(st.Code())
   110  		if err == nil {
   111  			monitor.SentMessage()
   112  		}
   113  		return resp, err
   114  	}
   115  }
   116  
   117  // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
   118  func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   119  	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   120  		monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
   121  		err := handler(srv, &monitoredServerStream{ss, monitor})
   122  		st, _ := status.FromError(err)
   123  		monitor.Handled(st.Code())
   124  		return err
   125  	}
   126  }
   127  
   128  // InitializeMetrics initializes all metrics, with their appropriate null
   129  // value, for all gRPC methods registered on a gRPC server. This is useful, to
   130  // ensure that all metrics exist when collecting and querying.
   131  func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
   132  	serviceInfo := server.GetServiceInfo()
   133  	for serviceName, info := range serviceInfo {
   134  		for _, mInfo := range info.Methods {
   135  			preRegisterMethod(m, serviceName, &mInfo)
   136  		}
   137  	}
   138  }
   139  
   140  func streamRPCType(info *grpc.StreamServerInfo) grpcType {
   141  	if info.IsClientStream && !info.IsServerStream {
   142  		return ClientStream
   143  	} else if !info.IsClientStream && info.IsServerStream {
   144  		return ServerStream
   145  	}
   146  	return BidiStream
   147  }
   148  
   149  // monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
   150  type monitoredServerStream struct {
   151  	grpc.ServerStream
   152  	monitor *serverReporter
   153  }
   154  
   155  func (s *monitoredServerStream) SendMsg(m interface{}) error {
   156  	err := s.ServerStream.SendMsg(m)
   157  	if err == nil {
   158  		s.monitor.SentMessage()
   159  	}
   160  	return err
   161  }
   162  
   163  func (s *monitoredServerStream) RecvMsg(m interface{}) error {
   164  	err := s.ServerStream.RecvMsg(m)
   165  	if err == nil {
   166  		s.monitor.ReceivedMessage()
   167  	}
   168  	return err
   169  }
   170  
   171  // preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
   172  func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
   173  	methodName := mInfo.Name
   174  	methodType := string(typeFromMethodInfo(mInfo))
   175  	// These are just references (no increments), as just referencing will create the labels but not set values.
   176  	metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
   177  	metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
   178  	metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
   179  	if metrics.serverHandledHistogramEnabled {
   180  		metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
   181  	}
   182  	for _, code := range allCodes {
   183  		metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
   184  	}
   185  }
   186  

View as plain text