...

Source file src/go.opencensus.io/plugin/ocgrpc/stats_common.go

Documentation: go.opencensus.io/plugin/ocgrpc

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package ocgrpc
    17  
    18  import (
    19  	"context"
    20  	"strconv"
    21  	"strings"
    22  	"sync/atomic"
    23  	"time"
    24  
    25  	"go.opencensus.io/metric/metricdata"
    26  	ocstats "go.opencensus.io/stats"
    27  	"go.opencensus.io/stats/view"
    28  	"go.opencensus.io/tag"
    29  	"go.opencensus.io/trace"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/grpclog"
    32  	"google.golang.org/grpc/stats"
    33  	"google.golang.org/grpc/status"
    34  )
    35  
    36  type grpcInstrumentationKey string
    37  
    38  // rpcData holds the instrumentation RPC data that is needed between the start
    39  // and end of an call. It holds the info that this package needs to keep track
    40  // of between the various GRPC events.
    41  type rpcData struct {
    42  	// reqCount and respCount has to be the first words
    43  	// in order to be 64-aligned on 32-bit architectures.
    44  	sentCount, sentBytes, recvCount, recvBytes int64 // access atomically
    45  
    46  	// startTime represents the time at which TagRPC was invoked at the
    47  	// beginning of an RPC. It is an appoximation of the time when the
    48  	// application code invoked GRPC code.
    49  	startTime time.Time
    50  	method    string
    51  }
    52  
    53  // The following variables define the default hard-coded auxiliary data used by
    54  // both the default GRPC client and GRPC server metrics.
    55  var (
    56  	DefaultBytesDistribution        = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296)
    57  	DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
    58  	DefaultMessageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
    59  )
    60  
    61  // Server tags are applied to the context used to process each RPC, as well as
    62  // the measures at the end of each RPC.
    63  var (
    64  	KeyServerMethod = tag.MustNewKey("grpc_server_method")
    65  	KeyServerStatus = tag.MustNewKey("grpc_server_status")
    66  )
    67  
    68  // Client tags are applied to measures at the end of each RPC.
    69  var (
    70  	KeyClientMethod = tag.MustNewKey("grpc_client_method")
    71  	KeyClientStatus = tag.MustNewKey("grpc_client_status")
    72  )
    73  
    74  var (
    75  	rpcDataKey = grpcInstrumentationKey("opencensus-rpcData")
    76  )
    77  
    78  func methodName(fullname string) string {
    79  	return strings.TrimLeft(fullname, "/")
    80  }
    81  
    82  // statsHandleRPC processes the RPC events.
    83  func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
    84  	switch st := s.(type) {
    85  	case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
    86  		// do nothing for client
    87  	case *stats.Begin:
    88  		handleRPCBegin(ctx, st)
    89  	case *stats.OutPayload:
    90  		handleRPCOutPayload(ctx, st)
    91  	case *stats.InPayload:
    92  		handleRPCInPayload(ctx, st)
    93  	case *stats.End:
    94  		handleRPCEnd(ctx, st)
    95  	default:
    96  		grpclog.Infof("unexpected stats: %T", st)
    97  	}
    98  }
    99  
   100  func handleRPCBegin(ctx context.Context, s *stats.Begin) {
   101  	d, ok := ctx.Value(rpcDataKey).(*rpcData)
   102  	if !ok {
   103  		if grpclog.V(2) {
   104  			grpclog.Infoln("Failed to retrieve *rpcData from context.")
   105  		}
   106  	}
   107  
   108  	if s.IsClient() {
   109  		ocstats.RecordWithOptions(ctx,
   110  			ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
   111  			ocstats.WithMeasurements(ClientStartedRPCs.M(1)))
   112  	} else {
   113  		ocstats.RecordWithOptions(ctx,
   114  			ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
   115  			ocstats.WithMeasurements(ServerStartedRPCs.M(1)))
   116  	}
   117  }
   118  
   119  func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
   120  	d, ok := ctx.Value(rpcDataKey).(*rpcData)
   121  	if !ok {
   122  		if grpclog.V(2) {
   123  			grpclog.Infoln("Failed to retrieve *rpcData from context.")
   124  		}
   125  		return
   126  	}
   127  
   128  	atomic.AddInt64(&d.sentBytes, int64(s.Length))
   129  	atomic.AddInt64(&d.sentCount, 1)
   130  }
   131  
   132  func handleRPCInPayload(ctx context.Context, s *stats.InPayload) {
   133  	d, ok := ctx.Value(rpcDataKey).(*rpcData)
   134  	if !ok {
   135  		if grpclog.V(2) {
   136  			grpclog.Infoln("Failed to retrieve *rpcData from context.")
   137  		}
   138  		return
   139  	}
   140  
   141  	atomic.AddInt64(&d.recvBytes, int64(s.Length))
   142  	atomic.AddInt64(&d.recvCount, 1)
   143  }
   144  
   145  func handleRPCEnd(ctx context.Context, s *stats.End) {
   146  	d, ok := ctx.Value(rpcDataKey).(*rpcData)
   147  	if !ok {
   148  		if grpclog.V(2) {
   149  			grpclog.Infoln("Failed to retrieve *rpcData from context.")
   150  		}
   151  		return
   152  	}
   153  
   154  	elapsedTime := time.Since(d.startTime)
   155  
   156  	var st string
   157  	if s.Error != nil {
   158  		s, ok := status.FromError(s.Error)
   159  		if ok {
   160  			st = statusCodeToString(s)
   161  		}
   162  	} else {
   163  		st = "OK"
   164  	}
   165  
   166  	latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
   167  	attachments := getSpanCtxAttachment(ctx)
   168  	if s.Client {
   169  		ocstats.RecordWithOptions(ctx,
   170  			ocstats.WithTags(
   171  				tag.Upsert(KeyClientMethod, methodName(d.method)),
   172  				tag.Upsert(KeyClientStatus, st)),
   173  			ocstats.WithAttachments(attachments),
   174  			ocstats.WithMeasurements(
   175  				ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
   176  				ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
   177  				ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
   178  				ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
   179  				ClientRoundtripLatency.M(latencyMillis)))
   180  	} else {
   181  		ocstats.RecordWithOptions(ctx,
   182  			ocstats.WithTags(
   183  				tag.Upsert(KeyServerStatus, st),
   184  			),
   185  			ocstats.WithAttachments(attachments),
   186  			ocstats.WithMeasurements(
   187  				ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
   188  				ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
   189  				ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
   190  				ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
   191  				ServerLatency.M(latencyMillis)))
   192  	}
   193  }
   194  
   195  func statusCodeToString(s *status.Status) string {
   196  	// see https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
   197  	switch c := s.Code(); c {
   198  	case codes.OK:
   199  		return "OK"
   200  	case codes.Canceled:
   201  		return "CANCELLED"
   202  	case codes.Unknown:
   203  		return "UNKNOWN"
   204  	case codes.InvalidArgument:
   205  		return "INVALID_ARGUMENT"
   206  	case codes.DeadlineExceeded:
   207  		return "DEADLINE_EXCEEDED"
   208  	case codes.NotFound:
   209  		return "NOT_FOUND"
   210  	case codes.AlreadyExists:
   211  		return "ALREADY_EXISTS"
   212  	case codes.PermissionDenied:
   213  		return "PERMISSION_DENIED"
   214  	case codes.ResourceExhausted:
   215  		return "RESOURCE_EXHAUSTED"
   216  	case codes.FailedPrecondition:
   217  		return "FAILED_PRECONDITION"
   218  	case codes.Aborted:
   219  		return "ABORTED"
   220  	case codes.OutOfRange:
   221  		return "OUT_OF_RANGE"
   222  	case codes.Unimplemented:
   223  		return "UNIMPLEMENTED"
   224  	case codes.Internal:
   225  		return "INTERNAL"
   226  	case codes.Unavailable:
   227  		return "UNAVAILABLE"
   228  	case codes.DataLoss:
   229  		return "DATA_LOSS"
   230  	case codes.Unauthenticated:
   231  		return "UNAUTHENTICATED"
   232  	default:
   233  		return "CODE_" + strconv.FormatInt(int64(c), 10)
   234  	}
   235  }
   236  
   237  func getSpanCtxAttachment(ctx context.Context) metricdata.Attachments {
   238  	attachments := map[string]interface{}{}
   239  	span := trace.FromContext(ctx)
   240  	if span == nil {
   241  		return attachments
   242  	}
   243  	spanCtx := span.SpanContext()
   244  	if spanCtx.IsSampled() {
   245  		attachments[metricdata.AttachmentKeySpanContext] = spanCtx
   246  	}
   247  	return attachments
   248  }
   249  

View as plain text