...

Source file src/github.com/cilium/ebpf/perf/reader.go

Documentation: github.com/cilium/ebpf/perf

     1  package perf
     2  
     3  import (
     4  	"encoding/binary"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"os"
     9  	"runtime"
    10  	"sync"
    11  
    12  	"github.com/cilium/ebpf"
    13  	"github.com/cilium/ebpf/internal"
    14  	"github.com/cilium/ebpf/internal/epoll"
    15  	"github.com/cilium/ebpf/internal/unix"
    16  )
    17  
    18  var (
    19  	ErrClosed = os.ErrClosed
    20  	errEOR    = errors.New("end of ring")
    21  )
    22  
    23  var perfEventHeaderSize = binary.Size(perfEventHeader{})
    24  
    25  // perfEventHeader must match 'struct perf_event_header` in <linux/perf_event.h>.
    26  type perfEventHeader struct {
    27  	Type uint32
    28  	Misc uint16
    29  	Size uint16
    30  }
    31  
    32  func cpuForEvent(event *unix.EpollEvent) int {
    33  	return int(event.Pad)
    34  }
    35  
    36  // Record contains either a sample or a counter of the
    37  // number of lost samples.
    38  type Record struct {
    39  	// The CPU this record was generated on.
    40  	CPU int
    41  
    42  	// The data submitted via bpf_perf_event_output.
    43  	// Due to a kernel bug, this can contain between 0 and 7 bytes of trailing
    44  	// garbage from the ring depending on the input sample's length.
    45  	RawSample []byte
    46  
    47  	// The number of samples which could not be output, since
    48  	// the ring buffer was full.
    49  	LostSamples uint64
    50  }
    51  
    52  // Read a record from a reader and tag it as being from the given CPU.
    53  //
    54  // buf must be at least perfEventHeaderSize bytes long.
    55  func readRecord(rd io.Reader, rec *Record, buf []byte) error {
    56  	// Assert that the buffer is large enough.
    57  	buf = buf[:perfEventHeaderSize]
    58  	_, err := io.ReadFull(rd, buf)
    59  	if errors.Is(err, io.EOF) {
    60  		return errEOR
    61  	} else if err != nil {
    62  		return fmt.Errorf("read perf event header: %v", err)
    63  	}
    64  
    65  	header := perfEventHeader{
    66  		internal.NativeEndian.Uint32(buf[0:4]),
    67  		internal.NativeEndian.Uint16(buf[4:6]),
    68  		internal.NativeEndian.Uint16(buf[6:8]),
    69  	}
    70  
    71  	switch header.Type {
    72  	case unix.PERF_RECORD_LOST:
    73  		rec.RawSample = rec.RawSample[:0]
    74  		rec.LostSamples, err = readLostRecords(rd)
    75  		return err
    76  
    77  	case unix.PERF_RECORD_SAMPLE:
    78  		rec.LostSamples = 0
    79  		// We can reuse buf here because perfEventHeaderSize > perfEventSampleSize.
    80  		rec.RawSample, err = readRawSample(rd, buf, rec.RawSample)
    81  		return err
    82  
    83  	default:
    84  		return &unknownEventError{header.Type}
    85  	}
    86  }
    87  
    88  func readLostRecords(rd io.Reader) (uint64, error) {
    89  	// lostHeader must match 'struct perf_event_lost in kernel sources.
    90  	var lostHeader struct {
    91  		ID   uint64
    92  		Lost uint64
    93  	}
    94  
    95  	err := binary.Read(rd, internal.NativeEndian, &lostHeader)
    96  	if err != nil {
    97  		return 0, fmt.Errorf("can't read lost records header: %v", err)
    98  	}
    99  
   100  	return lostHeader.Lost, nil
   101  }
   102  
   103  var perfEventSampleSize = binary.Size(uint32(0))
   104  
   105  // This must match 'struct perf_event_sample in kernel sources.
   106  type perfEventSample struct {
   107  	Size uint32
   108  }
   109  
   110  func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) {
   111  	buf = buf[:perfEventSampleSize]
   112  	if _, err := io.ReadFull(rd, buf); err != nil {
   113  		return nil, fmt.Errorf("read sample size: %v", err)
   114  	}
   115  
   116  	sample := perfEventSample{
   117  		internal.NativeEndian.Uint32(buf),
   118  	}
   119  
   120  	var data []byte
   121  	if size := int(sample.Size); cap(sampleBuf) < size {
   122  		data = make([]byte, size)
   123  	} else {
   124  		data = sampleBuf[:size]
   125  	}
   126  
   127  	if _, err := io.ReadFull(rd, data); err != nil {
   128  		return nil, fmt.Errorf("read sample: %v", err)
   129  	}
   130  	return data, nil
   131  }
   132  
   133  // Reader allows reading bpf_perf_event_output
   134  // from user space.
   135  type Reader struct {
   136  	poller *epoll.Poller
   137  
   138  	// mu protects read/write access to the Reader structure with the
   139  	// exception of 'pauseFds', which is protected by 'pauseMu'.
   140  	// If locking both 'mu' and 'pauseMu', 'mu' must be locked first.
   141  	mu sync.Mutex
   142  
   143  	// Closing a PERF_EVENT_ARRAY removes all event fds
   144  	// stored in it, so we keep a reference alive.
   145  	array       *ebpf.Map
   146  	rings       []*perfEventRing
   147  	epollEvents []unix.EpollEvent
   148  	epollRings  []*perfEventRing
   149  	eventHeader []byte
   150  
   151  	// pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'.
   152  	// These allow Pause/Resume to be executed independently of any ongoing
   153  	// Read calls, which would otherwise need to be interrupted.
   154  	pauseMu  sync.Mutex
   155  	pauseFds []int
   156  }
   157  
   158  // ReaderOptions control the behaviour of the user
   159  // space reader.
   160  type ReaderOptions struct {
   161  	// The number of written bytes required in any per CPU buffer before
   162  	// Read will process data. Must be smaller than PerCPUBuffer.
   163  	// The default is to start processing as soon as data is available.
   164  	Watermark int
   165  }
   166  
   167  // NewReader creates a new reader with default options.
   168  //
   169  // array must be a PerfEventArray. perCPUBuffer gives the size of the
   170  // per CPU buffer in bytes. It is rounded up to the nearest multiple
   171  // of the current page size.
   172  func NewReader(array *ebpf.Map, perCPUBuffer int) (*Reader, error) {
   173  	return NewReaderWithOptions(array, perCPUBuffer, ReaderOptions{})
   174  }
   175  
   176  // NewReaderWithOptions creates a new reader with the given options.
   177  func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) (pr *Reader, err error) {
   178  	if perCPUBuffer < 1 {
   179  		return nil, errors.New("perCPUBuffer must be larger than 0")
   180  	}
   181  
   182  	var (
   183  		fds      []int
   184  		nCPU     = int(array.MaxEntries())
   185  		rings    = make([]*perfEventRing, 0, nCPU)
   186  		pauseFds = make([]int, 0, nCPU)
   187  	)
   188  
   189  	poller, err := epoll.New()
   190  	if err != nil {
   191  		return nil, err
   192  	}
   193  
   194  	defer func() {
   195  		if err != nil {
   196  			poller.Close()
   197  			for _, fd := range fds {
   198  				unix.Close(fd)
   199  			}
   200  			for _, ring := range rings {
   201  				if ring != nil {
   202  					ring.Close()
   203  				}
   204  			}
   205  		}
   206  	}()
   207  
   208  	// bpf_perf_event_output checks which CPU an event is enabled on,
   209  	// but doesn't allow using a wildcard like -1 to specify "all CPUs".
   210  	// Hence we have to create a ring for each CPU.
   211  	for i := 0; i < nCPU; i++ {
   212  		ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark)
   213  		if errors.Is(err, unix.ENODEV) {
   214  			// The requested CPU is currently offline, skip it.
   215  			rings = append(rings, nil)
   216  			pauseFds = append(pauseFds, -1)
   217  			continue
   218  		}
   219  
   220  		if err != nil {
   221  			return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err)
   222  		}
   223  		rings = append(rings, ring)
   224  		pauseFds = append(pauseFds, ring.fd)
   225  
   226  		if err := poller.Add(ring.fd, i); err != nil {
   227  			return nil, err
   228  		}
   229  	}
   230  
   231  	array, err = array.Clone()
   232  	if err != nil {
   233  		return nil, err
   234  	}
   235  
   236  	pr = &Reader{
   237  		array:       array,
   238  		rings:       rings,
   239  		poller:      poller,
   240  		epollEvents: make([]unix.EpollEvent, len(rings)),
   241  		epollRings:  make([]*perfEventRing, 0, len(rings)),
   242  		eventHeader: make([]byte, perfEventHeaderSize),
   243  		pauseFds:    pauseFds,
   244  	}
   245  	if err = pr.Resume(); err != nil {
   246  		return nil, err
   247  	}
   248  	runtime.SetFinalizer(pr, (*Reader).Close)
   249  	return pr, nil
   250  }
   251  
   252  // Close frees resources used by the reader.
   253  //
   254  // It interrupts calls to Read.
   255  //
   256  // Calls to perf_event_output from eBPF programs will return
   257  // ENOENT after calling this method.
   258  func (pr *Reader) Close() error {
   259  	if err := pr.poller.Close(); err != nil {
   260  		if errors.Is(err, os.ErrClosed) {
   261  			return nil
   262  		}
   263  		return fmt.Errorf("close poller: %w", err)
   264  	}
   265  
   266  	// Trying to poll will now fail, so Read() can't block anymore. Acquire the
   267  	// lock so that we can clean up.
   268  	pr.mu.Lock()
   269  	defer pr.mu.Unlock()
   270  
   271  	for _, ring := range pr.rings {
   272  		if ring != nil {
   273  			ring.Close()
   274  		}
   275  	}
   276  	pr.rings = nil
   277  	pr.pauseFds = nil
   278  	pr.array.Close()
   279  
   280  	return nil
   281  }
   282  
   283  // Read the next record from the perf ring buffer.
   284  //
   285  // The function blocks until there are at least Watermark bytes in one
   286  // of the per CPU buffers. Records from buffers below the Watermark
   287  // are not returned.
   288  //
   289  // Records can contain between 0 and 7 bytes of trailing garbage from the ring
   290  // depending on the input sample's length.
   291  //
   292  // Calling Close interrupts the function.
   293  func (pr *Reader) Read() (Record, error) {
   294  	var r Record
   295  	return r, pr.ReadInto(&r)
   296  }
   297  
   298  // ReadInto is like Read except that it allows reusing Record and associated buffers.
   299  func (pr *Reader) ReadInto(rec *Record) error {
   300  	pr.mu.Lock()
   301  	defer pr.mu.Unlock()
   302  
   303  	if pr.rings == nil {
   304  		return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
   305  	}
   306  
   307  	for {
   308  		if len(pr.epollRings) == 0 {
   309  			nEvents, err := pr.poller.Wait(pr.epollEvents)
   310  			if err != nil {
   311  				return err
   312  			}
   313  
   314  			for _, event := range pr.epollEvents[:nEvents] {
   315  				ring := pr.rings[cpuForEvent(&event)]
   316  				pr.epollRings = append(pr.epollRings, ring)
   317  
   318  				// Read the current head pointer now, not every time
   319  				// we read a record. This prevents a single fast producer
   320  				// from keeping the reader busy.
   321  				ring.loadHead()
   322  			}
   323  		}
   324  
   325  		// Start at the last available event. The order in which we
   326  		// process them doesn't matter, and starting at the back allows
   327  		// resizing epollRings to keep track of processed rings.
   328  		err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1])
   329  		if err == errEOR {
   330  			// We've emptied the current ring buffer, process
   331  			// the next one.
   332  			pr.epollRings = pr.epollRings[:len(pr.epollRings)-1]
   333  			continue
   334  		}
   335  
   336  		return err
   337  	}
   338  }
   339  
   340  // Pause stops all notifications from this Reader.
   341  //
   342  // While the Reader is paused, any attempts to write to the event buffer from
   343  // BPF programs will return -ENOENT.
   344  //
   345  // Subsequent calls to Read will block until a call to Resume.
   346  func (pr *Reader) Pause() error {
   347  	pr.pauseMu.Lock()
   348  	defer pr.pauseMu.Unlock()
   349  
   350  	if pr.pauseFds == nil {
   351  		return fmt.Errorf("%w", ErrClosed)
   352  	}
   353  
   354  	for i := range pr.pauseFds {
   355  		if err := pr.array.Delete(uint32(i)); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) {
   356  			return fmt.Errorf("could't delete event fd for CPU %d: %w", i, err)
   357  		}
   358  	}
   359  
   360  	return nil
   361  }
   362  
   363  // Resume allows this perf reader to emit notifications.
   364  //
   365  // Subsequent calls to Read will block until the next event notification.
   366  func (pr *Reader) Resume() error {
   367  	pr.pauseMu.Lock()
   368  	defer pr.pauseMu.Unlock()
   369  
   370  	if pr.pauseFds == nil {
   371  		return fmt.Errorf("%w", ErrClosed)
   372  	}
   373  
   374  	for i, fd := range pr.pauseFds {
   375  		if fd == -1 {
   376  			continue
   377  		}
   378  
   379  		if err := pr.array.Put(uint32(i), uint32(fd)); err != nil {
   380  			return fmt.Errorf("couldn't put event fd %d for CPU %d: %w", fd, i, err)
   381  		}
   382  	}
   383  
   384  	return nil
   385  }
   386  
   387  // NB: Has to be preceded by a call to ring.loadHead.
   388  func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
   389  	defer ring.writeTail()
   390  
   391  	rec.CPU = ring.cpu
   392  	return readRecord(ring, rec, pr.eventHeader)
   393  }
   394  
   395  type unknownEventError struct {
   396  	eventType uint32
   397  }
   398  
   399  func (uev *unknownEventError) Error() string {
   400  	return fmt.Sprintf("unknown event type: %d", uev.eventType)
   401  }
   402  
   403  // IsUnknownEvent returns true if the error occurred
   404  // because an unknown event was submitted to the perf event ring.
   405  func IsUnknownEvent(err error) bool {
   406  	var uee *unknownEventError
   407  	return errors.As(err, &uee)
   408  }
   409  

View as plain text