1
2
3
4
5
6
7
8
9 package trace
10
11 import (
12 "bufio"
13 "fmt"
14 "io"
15 "slices"
16 "strings"
17
18 "golang.org/x/exp/trace/internal/event/go122"
19 "golang.org/x/exp/trace/internal/oldtrace"
20 "golang.org/x/exp/trace/internal/version"
21 )
22
23
24 type Reader struct {
25 r *bufio.Reader
26 lastTs Time
27 gen *generation
28 spill *spilledBatch
29 frontier []*batchCursor
30 cpuSamples []cpuSample
31 order ordering
32 emittedSync bool
33
34 go121Events *oldTraceConverter
35 }
36
37
38 func NewReader(r io.Reader) (*Reader, error) {
39 br := bufio.NewReader(r)
40 v, err := version.ReadHeader(br)
41 if err != nil {
42 return nil, err
43 }
44 switch v {
45 case version.Go111, version.Go119, version.Go121:
46 tr, err := oldtrace.Parse(br, v)
47 if err != nil {
48 return nil, err
49 }
50 return &Reader{
51 go121Events: convertOldFormat(tr),
52 }, nil
53 case version.Go122, version.Go123:
54 return &Reader{
55 r: br,
56 order: ordering{
57 mStates: make(map[ThreadID]*mState),
58 pStates: make(map[ProcID]*pState),
59 gStates: make(map[GoID]*gState),
60 activeTasks: make(map[TaskID]taskState),
61 },
62
63 emittedSync: true,
64 }, nil
65 default:
66 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
67 }
68 }
69
70
71
72
73
74 func (r *Reader) ReadEvent() (e Event, err error) {
75 if r.go121Events != nil {
76 ev, err := r.go121Events.next()
77 if err != nil {
78
79 return Event{}, err
80 }
81 return ev, nil
82 }
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 defer func() {
102 if err != nil {
103 return
104 }
105 if err = e.validateTableIDs(); err != nil {
106 return
107 }
108 if e.base.time <= r.lastTs {
109 e.base.time = r.lastTs + 1
110 }
111 r.lastTs = e.base.time
112 }()
113
114
115 if ev, ok := r.order.Next(); ok {
116 return ev, nil
117 }
118
119
120 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
121 if !r.emittedSync {
122 r.emittedSync = true
123 return syncEvent(r.gen.evTable, r.lastTs), nil
124 }
125 if r.gen != nil && r.spill == nil {
126
127
128
129
130
131 return Event{}, io.EOF
132 }
133
134 r.gen, r.spill, err = readGeneration(r.r, r.spill)
135 if err != nil {
136 return Event{}, err
137 }
138
139
140 r.cpuSamples = r.gen.cpuSamples
141
142
143 for m, batches := range r.gen.batches {
144 bc := &batchCursor{m: m}
145 ok, err := bc.nextEvent(batches, r.gen.freq)
146 if err != nil {
147 return Event{}, err
148 }
149 if !ok {
150
151 continue
152 }
153 r.frontier = heapInsert(r.frontier, bc)
154 }
155
156
157 r.emittedSync = false
158 }
159 tryAdvance := func(i int) (bool, error) {
160 bc := r.frontier[i]
161
162 if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
163 return ok, err
164 }
165
166
167 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
168 if err != nil {
169 return false, err
170 }
171 if ok {
172
173 heapUpdate(r.frontier, i)
174 } else {
175
176 r.frontier = heapRemove(r.frontier, i)
177 }
178 return true, nil
179 }
180
181 if len(r.cpuSamples) != 0 {
182 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
183 e := r.cpuSamples[0].asEvent(r.gen.evTable)
184 r.cpuSamples = r.cpuSamples[1:]
185 return e, nil
186 }
187 }
188
189
190 if len(r.frontier) == 0 {
191 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
192 }
193 if ok, err := tryAdvance(0); err != nil {
194 return Event{}, err
195 } else if !ok {
196
197
198
199
200
201 slices.SortFunc(r.frontier, (*batchCursor).compare)
202 success := false
203 for i := 1; i < len(r.frontier); i++ {
204 if ok, err = tryAdvance(i); err != nil {
205 return Event{}, err
206 } else if ok {
207 success = true
208 break
209 }
210 }
211 if !success {
212 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
213 }
214 }
215
216
217 ev, ok := r.order.Next()
218 if !ok {
219 panic("invariant violation: advance successful, but queue is empty")
220 }
221 return ev, nil
222 }
223
224 func dumpFrontier(frontier []*batchCursor) string {
225 var sb strings.Builder
226 for _, bc := range frontier {
227 spec := go122.Specs()[bc.ev.typ]
228 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
229 for i, arg := range spec.Args[1:] {
230 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
231 }
232 fmt.Fprintf(&sb, "]\n")
233 }
234 return sb.String()
235 }
236
View as plain text