1 /* 2 * 3 * Copyright 2022 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package orca 20 21 import ( 22 "context" 23 "sync" 24 25 "google.golang.org/grpc" 26 grpcinternal "google.golang.org/grpc/internal" 27 "google.golang.org/grpc/metadata" 28 "google.golang.org/grpc/orca/internal" 29 "google.golang.org/protobuf/proto" 30 ) 31 32 // CallMetricsRecorder allows a service method handler to record per-RPC 33 // metrics. It contains all utilization-based metrics from 34 // ServerMetricsRecorder as well as additional request cost metrics. 35 type CallMetricsRecorder interface { 36 ServerMetricsRecorder 37 38 // SetRequestCost sets the relevant server metric. 39 SetRequestCost(name string, val float64) 40 // DeleteRequestCost deletes the relevant server metric to prevent it 41 // from being sent. 42 DeleteRequestCost(name string) 43 44 // SetNamedMetric sets the relevant server metric. 45 SetNamedMetric(name string, val float64) 46 // DeleteNamedMetric deletes the relevant server metric to prevent it 47 // from being sent. 48 DeleteNamedMetric(name string) 49 } 50 51 type callMetricsRecorderCtxKey struct{} 52 53 // CallMetricsRecorderFromContext returns the RPC-specific custom metrics 54 // recorder embedded in the provided RPC context. 55 // 56 // Returns nil if no custom metrics recorder is found in the provided context, 57 // which will be the case when custom metrics reporting is not enabled. 58 func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder { 59 rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper) 60 if !ok { 61 return nil 62 } 63 return rw.recorder() 64 } 65 66 // recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that 67 // concurrent calls to CallMetricsRecorderFromContext() results in only one 68 // allocation of the underlying metrics recorder, while also allowing for lazy 69 // initialization of the recorder itself. 70 type recorderWrapper struct { 71 once sync.Once 72 r CallMetricsRecorder 73 smp ServerMetricsProvider 74 } 75 76 func (rw *recorderWrapper) recorder() CallMetricsRecorder { 77 rw.once.Do(func() { 78 rw.r = newServerMetricsRecorder() 79 }) 80 return rw.r 81 } 82 83 // setTrailerMetadata adds a trailer metadata entry with key being set to 84 // `internal.TrailerMetadataKey` and value being set to the binary-encoded 85 // orca.OrcaLoadReport protobuf message. 86 // 87 // This function is called from the unary and streaming interceptors defined 88 // above. Any errors encountered here are not propagated to the caller because 89 // they are ignored there. Hence we simply log any errors encountered here at 90 // warning level, and return nothing. 91 func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) { 92 var sm *ServerMetrics 93 if rw.smp != nil { 94 sm = rw.smp.ServerMetrics() 95 sm.merge(rw.r.ServerMetrics()) 96 } else { 97 sm = rw.r.ServerMetrics() 98 } 99 100 b, err := proto.Marshal(sm.toLoadReportProto()) 101 if err != nil { 102 logger.Warningf("Failed to marshal load report: %v", err) 103 return 104 } 105 if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil { 106 logger.Warningf("Failed to set trailer metadata: %v", err) 107 } 108 } 109 110 var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) 111 112 // CallMetricsServerOption returns a server option which enables the reporting 113 // of per-RPC custom backend metrics for unary and streaming RPCs. 114 // 115 // Server applications interested in injecting custom backend metrics should 116 // pass the server option returned from this function as the first argument to 117 // grpc.NewServer(). 118 // 119 // Subsequently, server RPC handlers can retrieve a reference to the RPC 120 // specific custom metrics recorder [CallMetricsRecorder] to be used, via a call 121 // to CallMetricsRecorderFromContext(), and inject custom metrics at any time 122 // during the RPC lifecycle. 123 // 124 // The injected custom metrics will be sent as part of trailer metadata, as a 125 // binary-encoded [ORCA LoadReport] protobuf message, with the metadata key 126 // being set be "endpoint-load-metrics-bin". 127 // 128 // If a non-nil ServerMetricsProvider is provided, the gRPC server will 129 // transmit the metrics it provides, overwritten by any per-RPC metrics given 130 // to the CallMetricsRecorder. A ServerMetricsProvider is typically obtained 131 // by calling NewServerMetricsRecorder. 132 // 133 // [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15 134 func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption { 135 return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp))) 136 } 137 138 func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { 139 return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { 140 // We don't allocate the metric recorder here. It will be allocated the 141 // first time the user calls CallMetricsRecorderFromContext(). 142 rw := &recorderWrapper{smp: smp} 143 ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw) 144 145 resp, err := handler(ctxWithRecorder, req) 146 147 // It is safe to access the underlying metric recorder inside the wrapper at 148 // this point, as the user's RPC handler is done executing, and therefore 149 // there will be no more calls to CallMetricsRecorderFromContext(), which is 150 // where the metric recorder is lazy allocated. 151 if rw.r != nil { 152 rw.setTrailerMetadata(ctx) 153 } 154 return resp, err 155 } 156 } 157 158 func streamInt(smp ServerMetricsProvider) func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 159 return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 160 // We don't allocate the metric recorder here. It will be allocated the 161 // first time the user calls CallMetricsRecorderFromContext(). 162 rw := &recorderWrapper{smp: smp} 163 ws := &wrappedStream{ 164 ServerStream: ss, 165 ctx: newContextWithRecorderWrapper(ss.Context(), rw), 166 } 167 168 err := handler(srv, ws) 169 170 // It is safe to access the underlying metric recorder inside the wrapper at 171 // this point, as the user's RPC handler is done executing, and therefore 172 // there will be no more calls to CallMetricsRecorderFromContext(), which is 173 // where the metric recorder is lazy allocated. 174 if rw.r != nil { 175 rw.setTrailerMetadata(ss.Context()) 176 } 177 return err 178 } 179 } 180 181 func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context { 182 return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r) 183 } 184 185 // wrappedStream wraps the grpc.ServerStream received by the streaming 186 // interceptor. Overrides only the Context() method to return a context which 187 // contains a reference to the CallMetricsRecorder corresponding to this 188 // stream. 189 type wrappedStream struct { 190 grpc.ServerStream 191 ctx context.Context 192 } 193 194 func (w *wrappedStream) Context() context.Context { 195 return w.ctx 196 } 197