...

Source file src/google.golang.org/grpc/orca/call_metrics.go

Documentation: google.golang.org/grpc/orca

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package orca
    20  
    21  import (
    22  	"context"
    23  	"sync"
    24  
    25  	"google.golang.org/grpc"
    26  	grpcinternal "google.golang.org/grpc/internal"
    27  	"google.golang.org/grpc/metadata"
    28  	"google.golang.org/grpc/orca/internal"
    29  	"google.golang.org/protobuf/proto"
    30  )
    31  
    32  // CallMetricsRecorder allows a service method handler to record per-RPC
    33  // metrics.  It contains all utilization-based metrics from
    34  // ServerMetricsRecorder as well as additional request cost metrics.
    35  type CallMetricsRecorder interface {
    36  	ServerMetricsRecorder
    37  
    38  	// SetRequestCost sets the relevant server metric.
    39  	SetRequestCost(name string, val float64)
    40  	// DeleteRequestCost deletes the relevant server metric to prevent it
    41  	// from being sent.
    42  	DeleteRequestCost(name string)
    43  
    44  	// SetNamedMetric sets the relevant server metric.
    45  	SetNamedMetric(name string, val float64)
    46  	// DeleteNamedMetric deletes the relevant server metric to prevent it
    47  	// from being sent.
    48  	DeleteNamedMetric(name string)
    49  }
    50  
    51  type callMetricsRecorderCtxKey struct{}
    52  
    53  // CallMetricsRecorderFromContext returns the RPC-specific custom metrics
    54  // recorder embedded in the provided RPC context.
    55  //
    56  // Returns nil if no custom metrics recorder is found in the provided context,
    57  // which will be the case when custom metrics reporting is not enabled.
    58  func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder {
    59  	rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper)
    60  	if !ok {
    61  		return nil
    62  	}
    63  	return rw.recorder()
    64  }
    65  
    66  // recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that
    67  // concurrent calls to CallMetricsRecorderFromContext() results in only one
    68  // allocation of the underlying metrics recorder, while also allowing for lazy
    69  // initialization of the recorder itself.
    70  type recorderWrapper struct {
    71  	once sync.Once
    72  	r    CallMetricsRecorder
    73  	smp  ServerMetricsProvider
    74  }
    75  
    76  func (rw *recorderWrapper) recorder() CallMetricsRecorder {
    77  	rw.once.Do(func() {
    78  		rw.r = newServerMetricsRecorder()
    79  	})
    80  	return rw.r
    81  }
    82  
    83  // setTrailerMetadata adds a trailer metadata entry with key being set to
    84  // `internal.TrailerMetadataKey` and value being set to the binary-encoded
    85  // orca.OrcaLoadReport protobuf message.
    86  //
    87  // This function is called from the unary and streaming interceptors defined
    88  // above. Any errors encountered here are not propagated to the caller because
    89  // they are ignored there. Hence we simply log any errors encountered here at
    90  // warning level, and return nothing.
    91  func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) {
    92  	var sm *ServerMetrics
    93  	if rw.smp != nil {
    94  		sm = rw.smp.ServerMetrics()
    95  		sm.merge(rw.r.ServerMetrics())
    96  	} else {
    97  		sm = rw.r.ServerMetrics()
    98  	}
    99  
   100  	b, err := proto.Marshal(sm.toLoadReportProto())
   101  	if err != nil {
   102  		logger.Warningf("Failed to marshal load report: %v", err)
   103  		return
   104  	}
   105  	if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil {
   106  		logger.Warningf("Failed to set trailer metadata: %v", err)
   107  	}
   108  }
   109  
   110  var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
   111  
   112  // CallMetricsServerOption returns a server option which enables the reporting
   113  // of per-RPC custom backend metrics for unary and streaming RPCs.
   114  //
   115  // Server applications interested in injecting custom backend metrics should
   116  // pass the server option returned from this function as the first argument to
   117  // grpc.NewServer().
   118  //
   119  // Subsequently, server RPC handlers can retrieve a reference to the RPC
   120  // specific custom metrics recorder [CallMetricsRecorder] to be used, via a call
   121  // to CallMetricsRecorderFromContext(), and inject custom metrics at any time
   122  // during the RPC lifecycle.
   123  //
   124  // The injected custom metrics will be sent as part of trailer metadata, as a
   125  // binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
   126  // being set be "endpoint-load-metrics-bin".
   127  //
   128  // If a non-nil ServerMetricsProvider is provided, the gRPC server will
   129  // transmit the metrics it provides, overwritten by any per-RPC metrics given
   130  // to the CallMetricsRecorder.  A ServerMetricsProvider is typically obtained
   131  // by calling NewServerMetricsRecorder.
   132  //
   133  // [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
   134  func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption {
   135  	return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp)))
   136  }
   137  
   138  func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
   139  	return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
   140  		// We don't allocate the metric recorder here. It will be allocated the
   141  		// first time the user calls CallMetricsRecorderFromContext().
   142  		rw := &recorderWrapper{smp: smp}
   143  		ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)
   144  
   145  		resp, err := handler(ctxWithRecorder, req)
   146  
   147  		// It is safe to access the underlying metric recorder inside the wrapper at
   148  		// this point, as the user's RPC handler is done executing, and therefore
   149  		// there will be no more calls to CallMetricsRecorderFromContext(), which is
   150  		// where the metric recorder is lazy allocated.
   151  		if rw.r != nil {
   152  			rw.setTrailerMetadata(ctx)
   153  		}
   154  		return resp, err
   155  	}
   156  }
   157  
   158  func streamInt(smp ServerMetricsProvider) func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   159  	return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   160  		// We don't allocate the metric recorder here. It will be allocated the
   161  		// first time the user calls CallMetricsRecorderFromContext().
   162  		rw := &recorderWrapper{smp: smp}
   163  		ws := &wrappedStream{
   164  			ServerStream: ss,
   165  			ctx:          newContextWithRecorderWrapper(ss.Context(), rw),
   166  		}
   167  
   168  		err := handler(srv, ws)
   169  
   170  		// It is safe to access the underlying metric recorder inside the wrapper at
   171  		// this point, as the user's RPC handler is done executing, and therefore
   172  		// there will be no more calls to CallMetricsRecorderFromContext(), which is
   173  		// where the metric recorder is lazy allocated.
   174  		if rw.r != nil {
   175  			rw.setTrailerMetadata(ss.Context())
   176  		}
   177  		return err
   178  	}
   179  }
   180  
   181  func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context {
   182  	return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r)
   183  }
   184  
   185  // wrappedStream wraps the grpc.ServerStream received by the streaming
   186  // interceptor. Overrides only the Context() method to return a context which
   187  // contains a reference to the CallMetricsRecorder corresponding to this
   188  // stream.
   189  type wrappedStream struct {
   190  	grpc.ServerStream
   191  	ctx context.Context
   192  }
   193  
   194  func (w *wrappedStream) Context() context.Context {
   195  	return w.ctx
   196  }
   197  

View as plain text