1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package zpages
17
18 import (
19 "fmt"
20 "io"
21 "log"
22 "math"
23 "net/http"
24 "sort"
25 "sync"
26 "text/tabwriter"
27 "time"
28
29 "go.opencensus.io/plugin/ocgrpc"
30 "go.opencensus.io/stats/view"
31 )
32
33 const bytesPerKb = 1024
34
35 var (
36 programStartTime = time.Now()
37 mu sync.Mutex
38 snaps = make(map[methodKey]*statSnapshot)
39
40
41
42
43 viewType = map[*view.View]bool{
44 ocgrpc.ClientCompletedRPCsView: false,
45 ocgrpc.ClientSentBytesPerRPCView: false,
46 ocgrpc.ClientSentMessagesPerRPCView: false,
47 ocgrpc.ClientReceivedBytesPerRPCView: false,
48 ocgrpc.ClientReceivedMessagesPerRPCView: false,
49 ocgrpc.ClientRoundtripLatencyView: false,
50 ocgrpc.ServerCompletedRPCsView: true,
51 ocgrpc.ServerReceivedBytesPerRPCView: true,
52 ocgrpc.ServerReceivedMessagesPerRPCView: true,
53 ocgrpc.ServerSentBytesPerRPCView: true,
54 ocgrpc.ServerSentMessagesPerRPCView: true,
55 ocgrpc.ServerLatencyView: true,
56 }
57 )
58
59 func registerRPCViews() {
60 views := make([]*view.View, 0, len(viewType))
61 for v := range viewType {
62 views = append(views, v)
63 }
64 if err := view.Register(views...); err != nil {
65 log.Printf("error subscribing to views: %v", err)
66 }
67 view.RegisterExporter(snapExporter{})
68 }
69
70 func rpczHandler(w http.ResponseWriter, r *http.Request) {
71 w.Header().Set("Content-Type", "text/html; charset=utf-8")
72 WriteHTMLRpczPage(w)
73 }
74
75
76 func WriteHTMLRpczPage(w io.Writer) {
77 if err := headerTemplate.Execute(w, headerData{Title: "RPC Stats"}); err != nil {
78 log.Printf("zpages: executing template: %v", err)
79 }
80 WriteHTMLRpczSummary(w)
81 if err := footerTemplate.Execute(w, nil); err != nil {
82 log.Printf("zpages: executing template: %v", err)
83 }
84 }
85
86
87
88
89 func WriteHTMLRpczSummary(w io.Writer) {
90 mu.Lock()
91 if err := statsTemplate.Execute(w, getStatsPage()); err != nil {
92 log.Printf("zpages: executing template: %v", err)
93 }
94 mu.Unlock()
95 }
96
97
98 func WriteTextRpczPage(w io.Writer) {
99 mu.Lock()
100 defer mu.Unlock()
101 page := getStatsPage()
102
103 for i, sg := range page.StatGroups {
104 switch i {
105 case 0:
106 fmt.Fprint(w, "Sent:\n")
107 case 1:
108 fmt.Fprint(w, "\nReceived:\n")
109 }
110 tw := tabwriter.NewWriter(w, 6, 8, 1, ' ', 0)
111 fmt.Fprint(tw, "Method\tCount\t\t\tAvgLat\t\t\tMaxLat\t\t\tRate\t\t\tIn (MiB/s)\t\t\tOut (MiB/s)\t\t\tErrors\t\t\n")
112 fmt.Fprint(tw, "\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\n")
113 for _, s := range sg.Snapshots {
114 fmt.Fprintf(tw, "%s\t%d\t%d\t%d\t%v\t%v\t%v\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%d\t%d\t%d\n",
115 s.Method,
116 s.CountMinute,
117 s.CountHour,
118 s.CountTotal,
119 s.AvgLatencyMinute,
120 s.AvgLatencyHour,
121 s.AvgLatencyTotal,
122 s.RPCRateMinute,
123 s.RPCRateHour,
124 s.RPCRateTotal,
125 s.InputRateMinute/bytesPerKb,
126 s.InputRateHour/bytesPerKb,
127 s.InputRateTotal/bytesPerKb,
128 s.OutputRateMinute/bytesPerKb,
129 s.OutputRateHour/bytesPerKb,
130 s.OutputRateTotal/bytesPerKb,
131 s.ErrorsMinute,
132 s.ErrorsHour,
133 s.ErrorsTotal)
134 }
135 tw.Flush()
136 }
137 }
138
139
140 type headerData struct {
141 Title string
142 }
143
144
145 type statsPage struct {
146 StatGroups []*statGroup
147 }
148
149
150 type statGroup struct {
151 Direction string
152 Snapshots []*statSnapshot
153 }
154
155 func (s *statGroup) Len() int {
156 return len(s.Snapshots)
157 }
158
159 func (s *statGroup) Swap(i, j int) {
160 s.Snapshots[i], s.Snapshots[j] = s.Snapshots[j], s.Snapshots[i]
161 }
162
163 func (s *statGroup) Less(i, j int) bool {
164 return s.Snapshots[i].Method < s.Snapshots[j].Method
165 }
166
167
168
169 type statSnapshot struct {
170
171 Method string
172 Received bool
173 CountMinute uint64
174 CountHour uint64
175 CountTotal uint64
176 AvgLatencyMinute time.Duration
177 AvgLatencyHour time.Duration
178 AvgLatencyTotal time.Duration
179 RPCRateMinute float64
180 RPCRateHour float64
181 RPCRateTotal float64
182 InputRateMinute float64
183 InputRateHour float64
184 InputRateTotal float64
185 OutputRateMinute float64
186 OutputRateHour float64
187 OutputRateTotal float64
188 ErrorsMinute uint64
189 ErrorsHour uint64
190 ErrorsTotal uint64
191 }
192
193 type methodKey struct {
194 method string
195 received bool
196 }
197
198 type snapExporter struct{}
199
200 func (s snapExporter) ExportView(vd *view.Data) {
201 received, ok := viewType[vd.View]
202 if !ok {
203 return
204 }
205 if len(vd.Rows) == 0 {
206 return
207 }
208 ageSec := float64(time.Since(programStartTime)) / float64(time.Second)
209
210 computeRate := func(maxSec, x float64) float64 {
211 dur := ageSec
212 if maxSec > 0 && dur > maxSec {
213 dur = maxSec
214 }
215 return x / dur
216 }
217
218 convertTime := func(ms float64) time.Duration {
219 if math.IsInf(ms, 0) || math.IsNaN(ms) {
220 return 0
221 }
222 return time.Duration(float64(time.Millisecond) * ms)
223 }
224
225 haveResetErrors := make(map[string]struct{})
226
227 mu.Lock()
228 defer mu.Unlock()
229 for _, row := range vd.Rows {
230 var method string
231 for _, tag := range row.Tags {
232 if tag.Key == ocgrpc.KeyClientMethod || tag.Key == ocgrpc.KeyServerMethod {
233 method = tag.Value
234 break
235 }
236 }
237
238 key := methodKey{method: method, received: received}
239 s := snaps[key]
240 if s == nil {
241 s = &statSnapshot{Method: method, Received: received}
242 snaps[key] = s
243 }
244
245 var (
246 sum float64
247 count float64
248 )
249 switch v := row.Data.(type) {
250 case *view.CountData:
251 sum = float64(v.Value)
252 count = float64(v.Value)
253 case *view.DistributionData:
254 sum = v.Sum()
255 count = float64(v.Count)
256 case *view.SumData:
257 sum = v.Value
258 count = v.Value
259 }
260
261
262 switch vd.View {
263 case ocgrpc.ClientCompletedRPCsView:
264 if _, ok := haveResetErrors[method]; !ok {
265 haveResetErrors[method] = struct{}{}
266 s.ErrorsTotal = 0
267 }
268 for _, tag := range row.Tags {
269 if tag.Key == ocgrpc.KeyClientStatus && tag.Value != "OK" {
270 s.ErrorsTotal += uint64(count)
271 }
272 }
273
274 case ocgrpc.ClientRoundtripLatencyView:
275 s.AvgLatencyTotal = convertTime(sum / count)
276
277 case ocgrpc.ClientSentBytesPerRPCView:
278 s.OutputRateTotal = computeRate(0, sum)
279
280 case ocgrpc.ClientReceivedBytesPerRPCView:
281 s.InputRateTotal = computeRate(0, sum)
282
283 case ocgrpc.ClientSentMessagesPerRPCView:
284 s.CountTotal = uint64(count)
285 s.RPCRateTotal = computeRate(0, count)
286
287 case ocgrpc.ClientReceivedMessagesPerRPCView:
288
289
290 case ocgrpc.ServerCompletedRPCsView:
291 if _, ok := haveResetErrors[method]; !ok {
292 haveResetErrors[method] = struct{}{}
293 s.ErrorsTotal = 0
294 }
295 for _, tag := range row.Tags {
296 if tag.Key == ocgrpc.KeyServerStatus && tag.Value != "OK" {
297 s.ErrorsTotal += uint64(count)
298 }
299 }
300
301 case ocgrpc.ServerLatencyView:
302 s.AvgLatencyTotal = convertTime(sum / count)
303
304 case ocgrpc.ServerSentBytesPerRPCView:
305 s.OutputRateTotal = computeRate(0, sum)
306
307 case ocgrpc.ServerReceivedMessagesPerRPCView:
308 s.CountTotal = uint64(count)
309 s.RPCRateTotal = computeRate(0, count)
310
311 case ocgrpc.ServerSentMessagesPerRPCView:
312
313 }
314 }
315 }
316
317 func getStatsPage() *statsPage {
318 sentStats := statGroup{Direction: "Sent"}
319 receivedStats := statGroup{Direction: "Received"}
320 for key, sg := range snaps {
321 if key.received {
322 receivedStats.Snapshots = append(receivedStats.Snapshots, sg)
323 } else {
324 sentStats.Snapshots = append(sentStats.Snapshots, sg)
325 }
326 }
327 sort.Sort(&sentStats)
328 sort.Sort(&receivedStats)
329
330 return &statsPage{
331 StatGroups: []*statGroup{&sentStats, &receivedStats},
332 }
333 }
334
View as plain text