...
1 package ringbuf
2
3 import (
4 "encoding/binary"
5 "errors"
6 "fmt"
7 "io"
8 "os"
9 "sync"
10
11 "github.com/cilium/ebpf"
12 "github.com/cilium/ebpf/internal"
13 "github.com/cilium/ebpf/internal/epoll"
14 "github.com/cilium/ebpf/internal/unix"
15 )
16
17 var (
18 ErrClosed = os.ErrClosed
19 errEOR = errors.New("end of ring")
20 errDiscard = errors.New("sample discarded")
21 errBusy = errors.New("sample not committed yet")
22 )
23
24 var ringbufHeaderSize = binary.Size(ringbufHeader{})
25
26
27 type ringbufHeader struct {
28 Len uint32
29 PgOff uint32
30 }
31
32 func (rh *ringbufHeader) isBusy() bool {
33 return rh.Len&unix.BPF_RINGBUF_BUSY_BIT != 0
34 }
35
36 func (rh *ringbufHeader) isDiscard() bool {
37 return rh.Len&unix.BPF_RINGBUF_DISCARD_BIT != 0
38 }
39
40 func (rh *ringbufHeader) dataLen() int {
41 return int(rh.Len & ^uint32(unix.BPF_RINGBUF_BUSY_BIT|unix.BPF_RINGBUF_DISCARD_BIT))
42 }
43
44 type Record struct {
45 RawSample []byte
46 }
47
48
49
50
51 func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error {
52 rd.loadConsumer()
53
54 buf = buf[:ringbufHeaderSize]
55 if _, err := io.ReadFull(rd, buf); err == io.EOF {
56 return errEOR
57 } else if err != nil {
58 return fmt.Errorf("read event header: %w", err)
59 }
60
61 header := ringbufHeader{
62 internal.NativeEndian.Uint32(buf[0:4]),
63 internal.NativeEndian.Uint32(buf[4:8]),
64 }
65
66 if header.isBusy() {
67
68
69
70 return errBusy
71 }
72
73
74 dataLenAligned := uint64(internal.Align(header.dataLen(), 8))
75
76 if header.isDiscard() {
77
78
79
80
81
82 rd.skipRead(dataLenAligned)
83 rd.storeConsumer()
84
85 return errDiscard
86 }
87
88 if cap(rec.RawSample) < int(dataLenAligned) {
89 rec.RawSample = make([]byte, dataLenAligned)
90 } else {
91 rec.RawSample = rec.RawSample[:dataLenAligned]
92 }
93
94 if _, err := io.ReadFull(rd, rec.RawSample); err != nil {
95 return fmt.Errorf("read sample: %w", err)
96 }
97
98 rd.storeConsumer()
99 rec.RawSample = rec.RawSample[:header.dataLen()]
100 return nil
101 }
102
103
104
105 type Reader struct {
106 poller *epoll.Poller
107
108
109 mu sync.Mutex
110 ring *ringbufEventRing
111 epollEvents []unix.EpollEvent
112 header []byte
113 haveData bool
114 }
115
116
117 func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
118 if ringbufMap.Type() != ebpf.RingBuf {
119 return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
120 }
121
122 maxEntries := int(ringbufMap.MaxEntries())
123 if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
124 return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
125 }
126
127 poller, err := epoll.New()
128 if err != nil {
129 return nil, err
130 }
131
132 if err := poller.Add(ringbufMap.FD(), 0); err != nil {
133 poller.Close()
134 return nil, err
135 }
136
137 ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
138 if err != nil {
139 poller.Close()
140 return nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
141 }
142
143 return &Reader{
144 poller: poller,
145 ring: ring,
146 epollEvents: make([]unix.EpollEvent, 1),
147 header: make([]byte, ringbufHeaderSize),
148 }, nil
149 }
150
151
152
153
154 func (r *Reader) Close() error {
155 if err := r.poller.Close(); err != nil {
156 if errors.Is(err, os.ErrClosed) {
157 return nil
158 }
159 return err
160 }
161
162
163 r.mu.Lock()
164 defer r.mu.Unlock()
165
166 if r.ring != nil {
167 r.ring.Close()
168 r.ring = nil
169 }
170
171 return nil
172 }
173
174
175
176
177 func (r *Reader) Read() (Record, error) {
178 var rec Record
179 return rec, r.ReadInto(&rec)
180 }
181
182
183 func (r *Reader) ReadInto(rec *Record) error {
184 r.mu.Lock()
185 defer r.mu.Unlock()
186
187 if r.ring == nil {
188 return fmt.Errorf("ringbuffer: %w", ErrClosed)
189 }
190
191 for {
192 if !r.haveData {
193 _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)])
194 if err != nil {
195 return err
196 }
197 r.haveData = true
198 }
199
200 for {
201 err := readRecord(r.ring, rec, r.header)
202 if err == errBusy || err == errDiscard {
203 continue
204 }
205 if err == errEOR {
206 r.haveData = false
207 break
208 }
209
210 return err
211 }
212 }
213 }
214
View as plain text