1
18
19 package main
20
21 import (
22 "encoding/binary"
23 "encoding/json"
24 "fmt"
25 "os"
26 "sort"
27 "strings"
28
29 ppb "google.golang.org/grpc/profiling/proto"
30 )
31
32 type jsonNode struct {
33 Name string `json:"name"`
34 Cat string `json:"cat"`
35 ID string `json:"id"`
36 Cname string `json:"cname"`
37 Phase string `json:"ph"`
38 Timestamp float64 `json:"ts"`
39 PID string `json:"pid"`
40 TID string `json:"tid"`
41 }
42
43
44
45
46
47
48 func hashCname(tag string) string {
49 if strings.Contains(tag, "encoding") {
50 return "rail_response"
51 }
52
53 if strings.Contains(tag, "compression") {
54 return "cq_build_passed"
55 }
56
57 if strings.Contains(tag, "transport") {
58 if strings.Contains(tag, "blocking") {
59 return "rail_animation"
60 }
61 return "good"
62 }
63
64 if strings.Contains(tag, "header") {
65 return "cq_build_attempt_failed"
66 }
67
68 if tag == "/" {
69 return "heap_dump_stack_frame"
70 }
71
72 if strings.Contains(tag, "flow") || strings.Contains(tag, "tmp") {
73 return "heap_dump_stack_frame"
74 }
75
76 return ""
77 }
78
79
80
81
82
83
84
85 func filterCounter(stat *ppb.Stat, filter string, counter int) int {
86 localCounter := 0
87 for i := 0; i < len(stat.Timers); i++ {
88 if stat.Timers[i].Tags == filter {
89 if localCounter == counter {
90 return i
91 }
92 localCounter++
93 }
94 }
95
96 return -1
97 }
98
99
100
101 type counter struct {
102 c map[string]int
103 }
104
105 func newCounter() *counter {
106 return &counter{c: make(map[string]int)}
107 }
108
109 func (c *counter) GetAndInc(s string) int {
110 ret := c.c[s]
111 c.c[s]++
112 return ret
113 }
114
115 func catapultNs(sec int64, nsec int32) float64 {
116 return float64((sec * 1000000000) + int64(nsec))
117 }
118
119
120
121 func streamStatsCatapultJSONSingle(stat *ppb.Stat, baseSec int64, baseNsec int32) []jsonNode {
122 if len(stat.Timers) == 0 {
123 return nil
124 }
125
126 connectionCounter := binary.BigEndian.Uint64(stat.Metadata[0:8])
127 streamID := binary.BigEndian.Uint32(stat.Metadata[8:12])
128 opid := fmt.Sprintf("/%s/%d/%d", stat.Tags, connectionCounter, streamID)
129
130 var loopyReaderGoID, loopyWriterGoID int64
131 for i := 0; i < len(stat.Timers) && (loopyReaderGoID == 0 || loopyWriterGoID == 0); i++ {
132 if strings.Contains(stat.Timers[i].Tags, "/loopyReader") {
133 loopyReaderGoID = stat.Timers[i].GoId
134 } else if strings.Contains(stat.Timers[i].Tags, "/loopyWriter") {
135 loopyWriterGoID = stat.Timers[i].GoId
136 }
137 }
138
139 lrc, lwc := newCounter(), newCounter()
140
141 var result []jsonNode
142 result = append(result,
143 jsonNode{
144 Name: "loopyReaderTmp",
145 ID: opid,
146 Cname: hashCname("tmp"),
147 Phase: "i",
148 Timestamp: 0,
149 PID: fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter),
150 TID: fmt.Sprintf("%d", loopyReaderGoID),
151 },
152 jsonNode{
153 Name: "loopyWriterTmp",
154 ID: opid,
155 Cname: hashCname("tmp"),
156 Phase: "i",
157 Timestamp: 0,
158 PID: fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter),
159 TID: fmt.Sprintf("%d", loopyWriterGoID),
160 },
161 )
162
163 for i := 0; i < len(stat.Timers); i++ {
164 categories := stat.Tags
165 pid, tid := opid, fmt.Sprintf("%d", stat.Timers[i].GoId)
166
167 if stat.Timers[i].GoId == loopyReaderGoID {
168 pid, tid = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId)
169
170 var flowEndID int
171 var flowEndPID, flowEndTID string
172 switch stat.Timers[i].Tags {
173 case "/http2/recv/header":
174 flowEndID = filterCounter(stat, "/grpc/stream/recv/header", lrc.GetAndInc("/http2/recv/header"))
175 if flowEndID != -1 {
176 flowEndPID = opid
177 flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId)
178 } else {
179 logger.Infof("cannot find %s/grpc/stream/recv/header for %s/http2/recv/header", opid, opid)
180 }
181 case "/http2/recv/dataFrame/loopyReader":
182 flowEndID = filterCounter(stat, "/recvAndDecompress", lrc.GetAndInc("/http2/recv/dataFrame/loopyReader"))
183 if flowEndID != -1 {
184 flowEndPID = opid
185 flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId)
186 } else {
187 logger.Infof("cannot find %s/recvAndDecompress for %s/http2/recv/dataFrame/loopyReader", opid, opid)
188 }
189 default:
190 flowEndID = -1
191 }
192
193 if flowEndID != -1 {
194 flowID := fmt.Sprintf("lrc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[i].Tags, connectionCounter, stat.Timers[flowEndID].Tags, i, pid, tid, flowEndID, flowEndPID, flowEndTID)
195 result = append(result,
196 jsonNode{
197 Name: fmt.Sprintf("%s/flow", opid),
198 Cat: categories + ",flow",
199 ID: flowID,
200 Cname: hashCname("flow"),
201 Phase: "s",
202 Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec),
203 PID: pid,
204 TID: tid,
205 },
206 jsonNode{
207 Name: fmt.Sprintf("%s/flow", opid),
208 Cat: categories + ",flow",
209 ID: flowID,
210 Cname: hashCname("flow"),
211 Phase: "f",
212 Timestamp: catapultNs(stat.Timers[flowEndID].BeginSec-baseSec, stat.Timers[flowEndID].BeginNsec-baseNsec),
213 PID: flowEndPID,
214 TID: flowEndTID,
215 },
216 )
217 }
218 } else if stat.Timers[i].GoId == loopyWriterGoID {
219 pid, tid = fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId)
220
221 var flowBeginID int
222 var flowBeginPID, flowBeginTID string
223 switch stat.Timers[i].Tags {
224 case "/http2/recv/header/loopyWriter/registerOutStream":
225 flowBeginID = filterCounter(stat, "/http2/recv/header", lwc.GetAndInc("/http2/recv/header/loopyWriter/registerOutStream"))
226 flowBeginPID = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter)
227 flowBeginTID = fmt.Sprintf("%d", loopyReaderGoID)
228 case "/http2/send/dataFrame/loopyWriter/preprocess":
229 flowBeginID = filterCounter(stat, "/transport/enqueue", lwc.GetAndInc("/http2/send/dataFrame/loopyWriter/preprocess"))
230 if flowBeginID != -1 {
231 flowBeginPID = opid
232 flowBeginTID = fmt.Sprintf("%d", stat.Timers[flowBeginID].GoId)
233 } else {
234 logger.Infof("cannot find /%d/transport/enqueue for /%d/http2/send/dataFrame/loopyWriter/preprocess", connectionCounter, connectionCounter)
235 }
236 default:
237 flowBeginID = -1
238 }
239
240 if flowBeginID != -1 {
241 flowID := fmt.Sprintf("lwc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[flowBeginID].Tags, connectionCounter, stat.Timers[i].Tags, flowBeginID, flowBeginPID, flowBeginTID, i, pid, tid)
242 result = append(result,
243 jsonNode{
244 Name: fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID),
245 Cat: categories + ",flow",
246 ID: flowID,
247 Cname: hashCname("flow"),
248 Phase: "s",
249 Timestamp: catapultNs(stat.Timers[flowBeginID].EndSec-baseSec, stat.Timers[flowBeginID].EndNsec-baseNsec),
250 PID: flowBeginPID,
251 TID: flowBeginTID,
252 },
253 jsonNode{
254 Name: fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID),
255 Cat: categories + ",flow",
256 ID: flowID,
257 Cname: hashCname("flow"),
258 Phase: "f",
259 Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec),
260 PID: pid,
261 TID: tid,
262 },
263 )
264 }
265 }
266
267 result = append(result,
268 jsonNode{
269 Name: fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags),
270 Cat: categories,
271 ID: opid,
272 Cname: hashCname(stat.Timers[i].Tags),
273 Phase: "B",
274 Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec),
275 PID: pid,
276 TID: tid,
277 },
278 jsonNode{
279 Name: fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags),
280 Cat: categories,
281 ID: opid,
282 Cname: hashCname(stat.Timers[i].Tags),
283 Phase: "E",
284 Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec),
285 PID: pid,
286 TID: tid,
287 },
288 )
289 }
290
291 return result
292 }
293
294
295
296 func timerBeginIsBefore(ti *ppb.Timer, tj *ppb.Timer) bool {
297 if ti.BeginSec == tj.BeginSec {
298 return ti.BeginNsec < tj.BeginNsec
299 }
300 return ti.BeginSec < tj.BeginSec
301 }
302
303
304
305
306 func streamStatsCatapultJSON(s *snapshot, streamStatsCatapultJSONFileName string) (err error) {
307 logger.Infof("calculating stream stats filters")
308 filterArray := strings.Split(*flagStreamStatsFilter, ",")
309 filter := make(map[string]bool)
310 for _, f := range filterArray {
311 filter[f] = true
312 }
313
314 logger.Infof("filter stream stats for %s", *flagStreamStatsFilter)
315 var streamStats []*ppb.Stat
316 for _, stat := range s.StreamStats {
317 if _, ok := filter[stat.Tags]; ok {
318 streamStats = append(streamStats, stat)
319 }
320 }
321
322 logger.Infof("sorting timers within all stats")
323 for id := range streamStats {
324 sort.Slice(streamStats[id].Timers, func(i, j int) bool {
325 return timerBeginIsBefore(streamStats[id].Timers[i], streamStats[id].Timers[j])
326 })
327 }
328
329 logger.Infof("sorting stream stats")
330 sort.Slice(streamStats, func(i, j int) bool {
331 if len(streamStats[j].Timers) == 0 {
332 return true
333 } else if len(streamStats[i].Timers) == 0 {
334 return false
335 }
336 pi := binary.BigEndian.Uint64(streamStats[i].Metadata[0:8])
337 pj := binary.BigEndian.Uint64(streamStats[j].Metadata[0:8])
338 if pi == pj {
339 return timerBeginIsBefore(streamStats[i].Timers[0], streamStats[j].Timers[0])
340 }
341
342 return pi < pj
343 })
344
345
346
347
348
349 if len(streamStats) > 0 {
350 streamStats = streamStats[:len(streamStats)-1]
351 }
352
353
354 logger.Infof("calculating the earliest timestamp across all timers")
355 var base *ppb.Timer
356 for _, stat := range streamStats {
357 for _, timer := range stat.Timers {
358 if base == nil || timerBeginIsBefore(base, timer) {
359 base = timer
360 }
361 }
362 }
363
364 logger.Infof("converting %d stats to catapult JSON format", len(streamStats))
365 var jsonNodes []jsonNode
366 for _, stat := range streamStats {
367 jsonNodes = append(jsonNodes, streamStatsCatapultJSONSingle(stat, base.BeginSec, base.BeginNsec)...)
368 }
369
370 logger.Infof("marshalling catapult JSON")
371 b, err := json.Marshal(jsonNodes)
372 if err != nil {
373 logger.Errorf("cannot marshal JSON: %v", err)
374 return err
375 }
376
377 logger.Infof("creating catapult JSON file")
378 streamStatsCatapultJSONFile, err := os.Create(streamStatsCatapultJSONFileName)
379 if err != nil {
380 logger.Errorf("cannot create file %s: %v", streamStatsCatapultJSONFileName, err)
381 return err
382 }
383 defer streamStatsCatapultJSONFile.Close()
384
385 logger.Infof("writing catapult JSON to disk")
386 _, err = streamStatsCatapultJSONFile.Write(b)
387 if err != nil {
388 logger.Errorf("cannot write marshalled JSON: %v", err)
389 return err
390 }
391
392 logger.Infof("successfully wrote catapult JSON file %s", streamStatsCatapultJSONFileName)
393 return nil
394 }
395
View as plain text