...

Source file src/github.com/cilium/ebpf/ringbuf/reader_test.go

Documentation: github.com/cilium/ebpf/ringbuf

     1  package ringbuf
     2  
     3  import (
     4  	"errors"
     5  	"syscall"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/cilium/ebpf"
    10  	"github.com/cilium/ebpf/asm"
    11  	"github.com/cilium/ebpf/internal/testutils"
    12  	"github.com/google/go-cmp/cmp"
    13  )
    14  
    15  func TestRingbufReader(t *testing.T) {
    16  	testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
    17  
    18  	readerTests := []struct {
    19  		name     string
    20  		messages []int
    21  		want     map[int][]byte
    22  	}{
    23  		{
    24  			name:     "send one short sample",
    25  			messages: []int{5},
    26  			want: map[int][]byte{
    27  				5: {1, 2, 3, 4, 4},
    28  			},
    29  		},
    30  		{
    31  			name:     "send three short samples, the second is discarded",
    32  			messages: []int{5, 10, 15},
    33  			want: map[int][]byte{
    34  				5:  {1, 2, 3, 4, 4},
    35  				15: {1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2},
    36  			},
    37  		},
    38  	}
    39  	for _, tt := range readerTests {
    40  		t.Run(tt.name, func(t *testing.T) {
    41  			prog, events := mustOutputSamplesProg(t, 0, tt.messages...)
    42  
    43  			rd, err := NewReader(events)
    44  			if err != nil {
    45  				t.Fatal(err)
    46  			}
    47  			defer rd.Close()
    48  
    49  			ret, _, err := prog.Test(make([]byte, 14))
    50  			testutils.SkipIfNotSupported(t, err)
    51  			if err != nil {
    52  				t.Fatal(err)
    53  			}
    54  
    55  			if errno := syscall.Errno(-int32(ret)); errno != 0 {
    56  				t.Fatal("Expected 0 as return value, got", errno)
    57  			}
    58  
    59  			raw := make(map[int][]byte)
    60  
    61  			for len(raw) < len(tt.want) {
    62  				record, err := rd.Read()
    63  				if err != nil {
    64  					t.Fatal("Can't read samples:", err)
    65  				}
    66  				raw[len(record.RawSample)] = record.RawSample
    67  			}
    68  
    69  			if diff := cmp.Diff(tt.want, raw); diff != "" {
    70  				t.Errorf("Read samples mismatch (-want +got):\n%s", diff)
    71  			}
    72  		})
    73  	}
    74  }
    75  
    76  func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map, error) {
    77  	events, err := ebpf.NewMap(&ebpf.MapSpec{
    78  		Type:       ebpf.RingBuf,
    79  		MaxEntries: 4096,
    80  	})
    81  	if err != nil {
    82  		return nil, nil, err
    83  	}
    84  
    85  	var maxSampleSize int
    86  	for _, sampleSize := range sampleSizes {
    87  		if sampleSize > maxSampleSize {
    88  			maxSampleSize = sampleSize
    89  		}
    90  	}
    91  
    92  	insns := asm.Instructions{
    93  		asm.LoadImm(asm.R0, 0x0102030404030201, asm.DWord),
    94  		asm.Mov.Reg(asm.R9, asm.R1),
    95  	}
    96  
    97  	bufDwords := (maxSampleSize / 8) + 1
    98  	for i := 0; i < bufDwords; i++ {
    99  		insns = append(insns,
   100  			asm.StoreMem(asm.RFP, int16(i+1)*-8, asm.R0, asm.DWord),
   101  		)
   102  	}
   103  
   104  	for sampleIdx, sampleSize := range sampleSizes {
   105  		insns = append(insns,
   106  			asm.LoadMapPtr(asm.R1, events.FD()),
   107  			asm.Mov.Imm(asm.R2, int32(sampleSize)),
   108  			asm.Mov.Imm(asm.R3, int32(0)),
   109  			asm.FnRingbufReserve.Call(),
   110  			asm.JEq.Imm(asm.R0, 0, "exit"),
   111  			asm.Mov.Reg(asm.R5, asm.R0),
   112  		)
   113  		for i := 0; i < sampleSize; i++ {
   114  			insns = append(insns,
   115  				asm.LoadMem(asm.R4, asm.RFP, int16(i+1)*-1, asm.Byte),
   116  				asm.StoreMem(asm.R5, int16(i), asm.R4, asm.Byte),
   117  			)
   118  		}
   119  
   120  		// discard every even sample
   121  		if sampleIdx&1 != 0 {
   122  			insns = append(insns,
   123  				asm.Mov.Reg(asm.R1, asm.R5),
   124  				asm.Mov.Imm(asm.R2, flags),
   125  				asm.FnRingbufDiscard.Call(),
   126  			)
   127  		} else {
   128  			insns = append(insns,
   129  				asm.Mov.Reg(asm.R1, asm.R5),
   130  				asm.Mov.Imm(asm.R2, flags),
   131  				asm.FnRingbufSubmit.Call(),
   132  			)
   133  		}
   134  	}
   135  
   136  	insns = append(insns,
   137  		asm.Mov.Imm(asm.R0, int32(0)).WithSymbol("exit"),
   138  		asm.Return(),
   139  	)
   140  
   141  	prog, err := ebpf.NewProgram(&ebpf.ProgramSpec{
   142  		License:      "MIT",
   143  		Type:         ebpf.XDP,
   144  		Instructions: insns,
   145  	})
   146  	if err != nil {
   147  		events.Close()
   148  		return nil, nil, err
   149  	}
   150  
   151  	return prog, events, nil
   152  }
   153  
   154  func mustOutputSamplesProg(tb testing.TB, flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map) {
   155  	tb.Helper()
   156  
   157  	prog, events, err := outputSamplesProg(flags, sampleSizes...)
   158  	if err != nil {
   159  		tb.Fatal(err)
   160  	}
   161  
   162  	tb.Cleanup(func() {
   163  		prog.Close()
   164  		events.Close()
   165  	})
   166  
   167  	return prog, events
   168  }
   169  
   170  func TestReaderBlocking(t *testing.T) {
   171  	testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
   172  
   173  	prog, events := mustOutputSamplesProg(t, 0, 5)
   174  	ret, _, err := prog.Test(make([]byte, 14))
   175  	testutils.SkipIfNotSupported(t, err)
   176  	if err != nil {
   177  		t.Fatal(err)
   178  	}
   179  
   180  	if errno := syscall.Errno(-int32(ret)); errno != 0 {
   181  		t.Fatal("Expected 0 as return value, got", errno)
   182  	}
   183  
   184  	rd, err := NewReader(events)
   185  	if err != nil {
   186  		t.Fatal(err)
   187  	}
   188  	defer rd.Close()
   189  
   190  	errs := make(chan error)
   191  	go func() {
   192  		for {
   193  			_, err := rd.Read()
   194  			errs <- err
   195  		}
   196  	}()
   197  
   198  	if err := <-errs; err != nil {
   199  		t.Fatal("Can't read first sample", err)
   200  	}
   201  
   202  	select {
   203  	case err := <-errs:
   204  		t.Fatal("Read returns error instead of blocking:", err)
   205  	case <-time.After(100 * time.Millisecond):
   206  	}
   207  
   208  	// Close should interrupt blocking Read
   209  	if err := rd.Close(); err != nil {
   210  		t.Fatal(err)
   211  	}
   212  
   213  	select {
   214  	case err := <-errs:
   215  		if !errors.Is(err, ErrClosed) {
   216  			t.Fatal("Read from RingbufReader that got closed does return ErrClosed")
   217  		}
   218  	case <-time.After(time.Second):
   219  		t.Fatal("Close doesn't interrupt Read")
   220  	}
   221  
   222  	// And we should be able to call it multiple times
   223  	if err := rd.Close(); err != nil {
   224  		t.Fatal(err)
   225  	}
   226  
   227  	if _, err := rd.Read(); !errors.Is(err, ErrClosed) {
   228  		t.Fatal("Second Read on a closed RingbufReader doesn't return ErrClosed")
   229  	}
   230  }
   231  
   232  func BenchmarkReader(b *testing.B) {
   233  	testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")
   234  
   235  	readerBenchmarks := []struct {
   236  		name  string
   237  		flags int32
   238  	}{
   239  		{
   240  			name: "normal epoll with timeout -1",
   241  		},
   242  	}
   243  
   244  	for _, bm := range readerBenchmarks {
   245  		b.Run(bm.name, func(b *testing.B) {
   246  			prog, events := mustOutputSamplesProg(b, bm.flags, 80)
   247  
   248  			rd, err := NewReader(events)
   249  			if err != nil {
   250  				b.Fatal(err)
   251  			}
   252  			defer rd.Close()
   253  
   254  			buf := make([]byte, 14)
   255  
   256  			b.ResetTimer()
   257  			b.ReportAllocs()
   258  
   259  			for i := 0; i < b.N; i++ {
   260  				ret, _, err := prog.Test(buf)
   261  				if err != nil {
   262  					b.Fatal(err)
   263  				} else if errno := syscall.Errno(-int32(ret)); errno != 0 {
   264  					b.Fatal("Expected 0 as return value, got", errno)
   265  				}
   266  				_, err = rd.Read()
   267  				if err != nil {
   268  					b.Fatal("Can't read samples:", err)
   269  				}
   270  			}
   271  		})
   272  	}
   273  }
   274  
   275  func BenchmarkReadInto(b *testing.B) {
   276  	testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")
   277  
   278  	prog, events := mustOutputSamplesProg(b, 0, 80)
   279  
   280  	rd, err := NewReader(events)
   281  	if err != nil {
   282  		b.Fatal(err)
   283  	}
   284  	defer rd.Close()
   285  
   286  	buf := make([]byte, 14)
   287  
   288  	b.ResetTimer()
   289  	b.ReportAllocs()
   290  
   291  	var rec Record
   292  	for i := 0; i < b.N; i++ {
   293  		ret, _, err := prog.Test(buf)
   294  		if err != nil {
   295  			b.Fatal(err)
   296  		} else if errno := syscall.Errno(-int32(ret)); errno != 0 {
   297  			b.Fatal("Expected 0 as return value, got", errno)
   298  		}
   299  
   300  		if err := rd.ReadInto(&rec); err != nil {
   301  			b.Fatal("Can't read samples:", err)
   302  		}
   303  	}
   304  }
   305  

View as plain text