1 package perf
2
3 import (
4 "encoding/binary"
5 "errors"
6 "fmt"
7 "io"
8 "os"
9 "runtime"
10 "sync"
11
12 "github.com/cilium/ebpf"
13 "github.com/cilium/ebpf/internal"
14 "github.com/cilium/ebpf/internal/epoll"
15 "github.com/cilium/ebpf/internal/unix"
16 )
17
18 var (
19 ErrClosed = os.ErrClosed
20 errEOR = errors.New("end of ring")
21 )
22
23 var perfEventHeaderSize = binary.Size(perfEventHeader{})
24
25
26 type perfEventHeader struct {
27 Type uint32
28 Misc uint16
29 Size uint16
30 }
31
32 func cpuForEvent(event *unix.EpollEvent) int {
33 return int(event.Pad)
34 }
35
36
37
38 type Record struct {
39
40 CPU int
41
42
43
44
45 RawSample []byte
46
47
48
49 LostSamples uint64
50 }
51
52
53
54
55 func readRecord(rd io.Reader, rec *Record, buf []byte) error {
56
57 buf = buf[:perfEventHeaderSize]
58 _, err := io.ReadFull(rd, buf)
59 if errors.Is(err, io.EOF) {
60 return errEOR
61 } else if err != nil {
62 return fmt.Errorf("read perf event header: %v", err)
63 }
64
65 header := perfEventHeader{
66 internal.NativeEndian.Uint32(buf[0:4]),
67 internal.NativeEndian.Uint16(buf[4:6]),
68 internal.NativeEndian.Uint16(buf[6:8]),
69 }
70
71 switch header.Type {
72 case unix.PERF_RECORD_LOST:
73 rec.RawSample = rec.RawSample[:0]
74 rec.LostSamples, err = readLostRecords(rd)
75 return err
76
77 case unix.PERF_RECORD_SAMPLE:
78 rec.LostSamples = 0
79
80 rec.RawSample, err = readRawSample(rd, buf, rec.RawSample)
81 return err
82
83 default:
84 return &unknownEventError{header.Type}
85 }
86 }
87
88 func readLostRecords(rd io.Reader) (uint64, error) {
89
90 var lostHeader struct {
91 ID uint64
92 Lost uint64
93 }
94
95 err := binary.Read(rd, internal.NativeEndian, &lostHeader)
96 if err != nil {
97 return 0, fmt.Errorf("can't read lost records header: %v", err)
98 }
99
100 return lostHeader.Lost, nil
101 }
102
103 var perfEventSampleSize = binary.Size(uint32(0))
104
105
106 type perfEventSample struct {
107 Size uint32
108 }
109
110 func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) {
111 buf = buf[:perfEventSampleSize]
112 if _, err := io.ReadFull(rd, buf); err != nil {
113 return nil, fmt.Errorf("read sample size: %v", err)
114 }
115
116 sample := perfEventSample{
117 internal.NativeEndian.Uint32(buf),
118 }
119
120 var data []byte
121 if size := int(sample.Size); cap(sampleBuf) < size {
122 data = make([]byte, size)
123 } else {
124 data = sampleBuf[:size]
125 }
126
127 if _, err := io.ReadFull(rd, data); err != nil {
128 return nil, fmt.Errorf("read sample: %v", err)
129 }
130 return data, nil
131 }
132
133
134
135 type Reader struct {
136 poller *epoll.Poller
137
138
139
140
141 mu sync.Mutex
142
143
144
145 array *ebpf.Map
146 rings []*perfEventRing
147 epollEvents []unix.EpollEvent
148 epollRings []*perfEventRing
149 eventHeader []byte
150
151
152
153
154 pauseMu sync.Mutex
155 pauseFds []int
156 }
157
158
159
160 type ReaderOptions struct {
161
162
163
164 Watermark int
165 }
166
167
168
169
170
171
172 func NewReader(array *ebpf.Map, perCPUBuffer int) (*Reader, error) {
173 return NewReaderWithOptions(array, perCPUBuffer, ReaderOptions{})
174 }
175
176
177 func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) (pr *Reader, err error) {
178 if perCPUBuffer < 1 {
179 return nil, errors.New("perCPUBuffer must be larger than 0")
180 }
181
182 var (
183 fds []int
184 nCPU = int(array.MaxEntries())
185 rings = make([]*perfEventRing, 0, nCPU)
186 pauseFds = make([]int, 0, nCPU)
187 )
188
189 poller, err := epoll.New()
190 if err != nil {
191 return nil, err
192 }
193
194 defer func() {
195 if err != nil {
196 poller.Close()
197 for _, fd := range fds {
198 unix.Close(fd)
199 }
200 for _, ring := range rings {
201 if ring != nil {
202 ring.Close()
203 }
204 }
205 }
206 }()
207
208
209
210
211 for i := 0; i < nCPU; i++ {
212 ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark)
213 if errors.Is(err, unix.ENODEV) {
214
215 rings = append(rings, nil)
216 pauseFds = append(pauseFds, -1)
217 continue
218 }
219
220 if err != nil {
221 return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err)
222 }
223 rings = append(rings, ring)
224 pauseFds = append(pauseFds, ring.fd)
225
226 if err := poller.Add(ring.fd, i); err != nil {
227 return nil, err
228 }
229 }
230
231 array, err = array.Clone()
232 if err != nil {
233 return nil, err
234 }
235
236 pr = &Reader{
237 array: array,
238 rings: rings,
239 poller: poller,
240 epollEvents: make([]unix.EpollEvent, len(rings)),
241 epollRings: make([]*perfEventRing, 0, len(rings)),
242 eventHeader: make([]byte, perfEventHeaderSize),
243 pauseFds: pauseFds,
244 }
245 if err = pr.Resume(); err != nil {
246 return nil, err
247 }
248 runtime.SetFinalizer(pr, (*Reader).Close)
249 return pr, nil
250 }
251
252
253
254
255
256
257
258 func (pr *Reader) Close() error {
259 if err := pr.poller.Close(); err != nil {
260 if errors.Is(err, os.ErrClosed) {
261 return nil
262 }
263 return fmt.Errorf("close poller: %w", err)
264 }
265
266
267
268 pr.mu.Lock()
269 defer pr.mu.Unlock()
270
271 for _, ring := range pr.rings {
272 if ring != nil {
273 ring.Close()
274 }
275 }
276 pr.rings = nil
277 pr.pauseFds = nil
278 pr.array.Close()
279
280 return nil
281 }
282
283
284
285
286
287
288
289
290
291
292
293 func (pr *Reader) Read() (Record, error) {
294 var r Record
295 return r, pr.ReadInto(&r)
296 }
297
298
299 func (pr *Reader) ReadInto(rec *Record) error {
300 pr.mu.Lock()
301 defer pr.mu.Unlock()
302
303 if pr.rings == nil {
304 return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
305 }
306
307 for {
308 if len(pr.epollRings) == 0 {
309 nEvents, err := pr.poller.Wait(pr.epollEvents)
310 if err != nil {
311 return err
312 }
313
314 for _, event := range pr.epollEvents[:nEvents] {
315 ring := pr.rings[cpuForEvent(&event)]
316 pr.epollRings = append(pr.epollRings, ring)
317
318
319
320
321 ring.loadHead()
322 }
323 }
324
325
326
327
328 err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1])
329 if err == errEOR {
330
331
332 pr.epollRings = pr.epollRings[:len(pr.epollRings)-1]
333 continue
334 }
335
336 return err
337 }
338 }
339
340
341
342
343
344
345
346 func (pr *Reader) Pause() error {
347 pr.pauseMu.Lock()
348 defer pr.pauseMu.Unlock()
349
350 if pr.pauseFds == nil {
351 return fmt.Errorf("%w", ErrClosed)
352 }
353
354 for i := range pr.pauseFds {
355 if err := pr.array.Delete(uint32(i)); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) {
356 return fmt.Errorf("could't delete event fd for CPU %d: %w", i, err)
357 }
358 }
359
360 return nil
361 }
362
363
364
365
366 func (pr *Reader) Resume() error {
367 pr.pauseMu.Lock()
368 defer pr.pauseMu.Unlock()
369
370 if pr.pauseFds == nil {
371 return fmt.Errorf("%w", ErrClosed)
372 }
373
374 for i, fd := range pr.pauseFds {
375 if fd == -1 {
376 continue
377 }
378
379 if err := pr.array.Put(uint32(i), uint32(fd)); err != nil {
380 return fmt.Errorf("couldn't put event fd %d for CPU %d: %w", fd, i, err)
381 }
382 }
383
384 return nil
385 }
386
387
388 func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
389 defer ring.writeTail()
390
391 rec.CPU = ring.cpu
392 return readRecord(ring, rec, pr.eventHeader)
393 }
394
395 type unknownEventError struct {
396 eventType uint32
397 }
398
399 func (uev *unknownEventError) Error() string {
400 return fmt.Sprintf("unknown event type: %d", uev.eventType)
401 }
402
403
404
405 func IsUnknownEvent(err error) bool {
406 var uee *unknownEventError
407 return errors.As(err, &uee)
408 }
409
View as plain text