1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package ocgrpc
17
18 import (
19 "context"
20 "strconv"
21 "strings"
22 "sync/atomic"
23 "time"
24
25 "go.opencensus.io/metric/metricdata"
26 ocstats "go.opencensus.io/stats"
27 "go.opencensus.io/stats/view"
28 "go.opencensus.io/tag"
29 "go.opencensus.io/trace"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/grpclog"
32 "google.golang.org/grpc/stats"
33 "google.golang.org/grpc/status"
34 )
35
36 type grpcInstrumentationKey string
37
38
39
40
41 type rpcData struct {
42
43
44 sentCount, sentBytes, recvCount, recvBytes int64
45
46
47
48
49 startTime time.Time
50 method string
51 }
52
53
54
55 var (
56 DefaultBytesDistribution = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296)
57 DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
58 DefaultMessageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
59 )
60
61
62
63 var (
64 KeyServerMethod = tag.MustNewKey("grpc_server_method")
65 KeyServerStatus = tag.MustNewKey("grpc_server_status")
66 )
67
68
69 var (
70 KeyClientMethod = tag.MustNewKey("grpc_client_method")
71 KeyClientStatus = tag.MustNewKey("grpc_client_status")
72 )
73
74 var (
75 rpcDataKey = grpcInstrumentationKey("opencensus-rpcData")
76 )
77
78 func methodName(fullname string) string {
79 return strings.TrimLeft(fullname, "/")
80 }
81
82
83 func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
84 switch st := s.(type) {
85 case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
86
87 case *stats.Begin:
88 handleRPCBegin(ctx, st)
89 case *stats.OutPayload:
90 handleRPCOutPayload(ctx, st)
91 case *stats.InPayload:
92 handleRPCInPayload(ctx, st)
93 case *stats.End:
94 handleRPCEnd(ctx, st)
95 default:
96 grpclog.Infof("unexpected stats: %T", st)
97 }
98 }
99
100 func handleRPCBegin(ctx context.Context, s *stats.Begin) {
101 d, ok := ctx.Value(rpcDataKey).(*rpcData)
102 if !ok {
103 if grpclog.V(2) {
104 grpclog.Infoln("Failed to retrieve *rpcData from context.")
105 }
106 }
107
108 if s.IsClient() {
109 ocstats.RecordWithOptions(ctx,
110 ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
111 ocstats.WithMeasurements(ClientStartedRPCs.M(1)))
112 } else {
113 ocstats.RecordWithOptions(ctx,
114 ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
115 ocstats.WithMeasurements(ServerStartedRPCs.M(1)))
116 }
117 }
118
119 func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
120 d, ok := ctx.Value(rpcDataKey).(*rpcData)
121 if !ok {
122 if grpclog.V(2) {
123 grpclog.Infoln("Failed to retrieve *rpcData from context.")
124 }
125 return
126 }
127
128 atomic.AddInt64(&d.sentBytes, int64(s.Length))
129 atomic.AddInt64(&d.sentCount, 1)
130 }
131
132 func handleRPCInPayload(ctx context.Context, s *stats.InPayload) {
133 d, ok := ctx.Value(rpcDataKey).(*rpcData)
134 if !ok {
135 if grpclog.V(2) {
136 grpclog.Infoln("Failed to retrieve *rpcData from context.")
137 }
138 return
139 }
140
141 atomic.AddInt64(&d.recvBytes, int64(s.Length))
142 atomic.AddInt64(&d.recvCount, 1)
143 }
144
145 func handleRPCEnd(ctx context.Context, s *stats.End) {
146 d, ok := ctx.Value(rpcDataKey).(*rpcData)
147 if !ok {
148 if grpclog.V(2) {
149 grpclog.Infoln("Failed to retrieve *rpcData from context.")
150 }
151 return
152 }
153
154 elapsedTime := time.Since(d.startTime)
155
156 var st string
157 if s.Error != nil {
158 s, ok := status.FromError(s.Error)
159 if ok {
160 st = statusCodeToString(s)
161 }
162 } else {
163 st = "OK"
164 }
165
166 latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
167 attachments := getSpanCtxAttachment(ctx)
168 if s.Client {
169 ocstats.RecordWithOptions(ctx,
170 ocstats.WithTags(
171 tag.Upsert(KeyClientMethod, methodName(d.method)),
172 tag.Upsert(KeyClientStatus, st)),
173 ocstats.WithAttachments(attachments),
174 ocstats.WithMeasurements(
175 ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
176 ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
177 ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
178 ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
179 ClientRoundtripLatency.M(latencyMillis)))
180 } else {
181 ocstats.RecordWithOptions(ctx,
182 ocstats.WithTags(
183 tag.Upsert(KeyServerStatus, st),
184 ),
185 ocstats.WithAttachments(attachments),
186 ocstats.WithMeasurements(
187 ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
188 ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
189 ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
190 ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
191 ServerLatency.M(latencyMillis)))
192 }
193 }
194
195 func statusCodeToString(s *status.Status) string {
196
197 switch c := s.Code(); c {
198 case codes.OK:
199 return "OK"
200 case codes.Canceled:
201 return "CANCELLED"
202 case codes.Unknown:
203 return "UNKNOWN"
204 case codes.InvalidArgument:
205 return "INVALID_ARGUMENT"
206 case codes.DeadlineExceeded:
207 return "DEADLINE_EXCEEDED"
208 case codes.NotFound:
209 return "NOT_FOUND"
210 case codes.AlreadyExists:
211 return "ALREADY_EXISTS"
212 case codes.PermissionDenied:
213 return "PERMISSION_DENIED"
214 case codes.ResourceExhausted:
215 return "RESOURCE_EXHAUSTED"
216 case codes.FailedPrecondition:
217 return "FAILED_PRECONDITION"
218 case codes.Aborted:
219 return "ABORTED"
220 case codes.OutOfRange:
221 return "OUT_OF_RANGE"
222 case codes.Unimplemented:
223 return "UNIMPLEMENTED"
224 case codes.Internal:
225 return "INTERNAL"
226 case codes.Unavailable:
227 return "UNAVAILABLE"
228 case codes.DataLoss:
229 return "DATA_LOSS"
230 case codes.Unauthenticated:
231 return "UNAUTHENTICATED"
232 default:
233 return "CODE_" + strconv.FormatInt(int64(c), 10)
234 }
235 }
236
237 func getSpanCtxAttachment(ctx context.Context) metricdata.Attachments {
238 attachments := map[string]interface{}{}
239 span := trace.FromContext(ctx)
240 if span == nil {
241 return attachments
242 }
243 spanCtx := span.SpanContext()
244 if spanCtx.IsSampled() {
245 attachments[metricdata.AttachmentKeySpanContext] = spanCtx
246 }
247 return attachments
248 }
249
View as plain text