...

Source file src/github.com/cilium/ebpf/ringbuf/reader.go

Documentation: github.com/cilium/ebpf/ringbuf

     1  package ringbuf
     2  
     3  import (
     4  	"encoding/binary"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"os"
     9  	"sync"
    10  
    11  	"github.com/cilium/ebpf"
    12  	"github.com/cilium/ebpf/internal"
    13  	"github.com/cilium/ebpf/internal/epoll"
    14  	"github.com/cilium/ebpf/internal/unix"
    15  )
    16  
    17  var (
    18  	ErrClosed  = os.ErrClosed
    19  	errEOR     = errors.New("end of ring")
    20  	errDiscard = errors.New("sample discarded")
    21  	errBusy    = errors.New("sample not committed yet")
    22  )
    23  
    24  var ringbufHeaderSize = binary.Size(ringbufHeader{})
    25  
    26  // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
    27  type ringbufHeader struct {
    28  	Len   uint32
    29  	PgOff uint32
    30  }
    31  
    32  func (rh *ringbufHeader) isBusy() bool {
    33  	return rh.Len&unix.BPF_RINGBUF_BUSY_BIT != 0
    34  }
    35  
    36  func (rh *ringbufHeader) isDiscard() bool {
    37  	return rh.Len&unix.BPF_RINGBUF_DISCARD_BIT != 0
    38  }
    39  
    40  func (rh *ringbufHeader) dataLen() int {
    41  	return int(rh.Len & ^uint32(unix.BPF_RINGBUF_BUSY_BIT|unix.BPF_RINGBUF_DISCARD_BIT))
    42  }
    43  
    44  type Record struct {
    45  	RawSample []byte
    46  }
    47  
    48  // Read a record from an event ring.
    49  //
    50  // buf must be at least ringbufHeaderSize bytes long.
    51  func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error {
    52  	rd.loadConsumer()
    53  
    54  	buf = buf[:ringbufHeaderSize]
    55  	if _, err := io.ReadFull(rd, buf); err == io.EOF {
    56  		return errEOR
    57  	} else if err != nil {
    58  		return fmt.Errorf("read event header: %w", err)
    59  	}
    60  
    61  	header := ringbufHeader{
    62  		internal.NativeEndian.Uint32(buf[0:4]),
    63  		internal.NativeEndian.Uint32(buf[4:8]),
    64  	}
    65  
    66  	if header.isBusy() {
    67  		// the next sample in the ring is not committed yet so we
    68  		// exit without storing the reader/consumer position
    69  		// and start again from the same position.
    70  		return errBusy
    71  	}
    72  
    73  	/* read up to 8 byte alignment */
    74  	dataLenAligned := uint64(internal.Align(header.dataLen(), 8))
    75  
    76  	if header.isDiscard() {
    77  		// when the record header indicates that the data should be
    78  		// discarded, we skip it by just updating the consumer position
    79  		// to the next record instead of normal Read() to avoid allocating data
    80  		// and reading/copying from the ring (which normally keeps track of the
    81  		// consumer position).
    82  		rd.skipRead(dataLenAligned)
    83  		rd.storeConsumer()
    84  
    85  		return errDiscard
    86  	}
    87  
    88  	if cap(rec.RawSample) < int(dataLenAligned) {
    89  		rec.RawSample = make([]byte, dataLenAligned)
    90  	} else {
    91  		rec.RawSample = rec.RawSample[:dataLenAligned]
    92  	}
    93  
    94  	if _, err := io.ReadFull(rd, rec.RawSample); err != nil {
    95  		return fmt.Errorf("read sample: %w", err)
    96  	}
    97  
    98  	rd.storeConsumer()
    99  	rec.RawSample = rec.RawSample[:header.dataLen()]
   100  	return nil
   101  }
   102  
   103  // Reader allows reading bpf_ringbuf_output
   104  // from user space.
   105  type Reader struct {
   106  	poller *epoll.Poller
   107  
   108  	// mu protects read/write access to the Reader structure
   109  	mu          sync.Mutex
   110  	ring        *ringbufEventRing
   111  	epollEvents []unix.EpollEvent
   112  	header      []byte
   113  	haveData    bool
   114  }
   115  
   116  // NewReader creates a new BPF ringbuf reader.
   117  func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
   118  	if ringbufMap.Type() != ebpf.RingBuf {
   119  		return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
   120  	}
   121  
   122  	maxEntries := int(ringbufMap.MaxEntries())
   123  	if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
   124  		return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
   125  	}
   126  
   127  	poller, err := epoll.New()
   128  	if err != nil {
   129  		return nil, err
   130  	}
   131  
   132  	if err := poller.Add(ringbufMap.FD(), 0); err != nil {
   133  		poller.Close()
   134  		return nil, err
   135  	}
   136  
   137  	ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
   138  	if err != nil {
   139  		poller.Close()
   140  		return nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
   141  	}
   142  
   143  	return &Reader{
   144  		poller:      poller,
   145  		ring:        ring,
   146  		epollEvents: make([]unix.EpollEvent, 1),
   147  		header:      make([]byte, ringbufHeaderSize),
   148  	}, nil
   149  }
   150  
   151  // Close frees resources used by the reader.
   152  //
   153  // It interrupts calls to Read.
   154  func (r *Reader) Close() error {
   155  	if err := r.poller.Close(); err != nil {
   156  		if errors.Is(err, os.ErrClosed) {
   157  			return nil
   158  		}
   159  		return err
   160  	}
   161  
   162  	// Acquire the lock. This ensures that Read isn't running.
   163  	r.mu.Lock()
   164  	defer r.mu.Unlock()
   165  
   166  	if r.ring != nil {
   167  		r.ring.Close()
   168  		r.ring = nil
   169  	}
   170  
   171  	return nil
   172  }
   173  
   174  // Read the next record from the BPF ringbuf.
   175  //
   176  // Calling Close interrupts the function.
   177  func (r *Reader) Read() (Record, error) {
   178  	var rec Record
   179  	return rec, r.ReadInto(&rec)
   180  }
   181  
   182  // ReadInto is like Read except that it allows reusing Record and associated buffers.
   183  func (r *Reader) ReadInto(rec *Record) error {
   184  	r.mu.Lock()
   185  	defer r.mu.Unlock()
   186  
   187  	if r.ring == nil {
   188  		return fmt.Errorf("ringbuffer: %w", ErrClosed)
   189  	}
   190  
   191  	for {
   192  		if !r.haveData {
   193  			_, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)])
   194  			if err != nil {
   195  				return err
   196  			}
   197  			r.haveData = true
   198  		}
   199  
   200  		for {
   201  			err := readRecord(r.ring, rec, r.header)
   202  			if err == errBusy || err == errDiscard {
   203  				continue
   204  			}
   205  			if err == errEOR {
   206  				r.haveData = false
   207  				break
   208  			}
   209  
   210  			return err
   211  		}
   212  	}
   213  }
   214  

View as plain text