...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package cluster
15
16 import (
17 "bytes"
18 "context"
19 "io"
20 "os"
21 "testing"
22
23 "github.com/go-kit/log"
24 "github.com/hashicorp/memberlist"
25 "github.com/prometheus/client_golang/prometheus"
26 )
27
28 func TestNormalMessagesGossiped(t *testing.T) {
29 var sent bool
30 c := newChannel(
31 func(_ []byte) { sent = true },
32 func() []*memberlist.Node { return nil },
33 func(_ *memberlist.Node, _ []byte) error { return nil },
34 )
35
36 c.Broadcast([]byte{})
37
38 if sent != true {
39 t.Fatalf("small message not sent")
40 }
41 }
42
43 func TestOversizedMessagesGossiped(t *testing.T) {
44 var sent bool
45 ctx, cancel := context.WithCancel(context.Background())
46 c := newChannel(
47 func(_ []byte) {},
48 func() []*memberlist.Node { return []*memberlist.Node{{}} },
49 func(_ *memberlist.Node, _ []byte) error { sent = true; cancel(); return nil },
50 )
51
52 f, err := os.Open("/dev/zero")
53 if err != nil {
54 t.Fatalf("failed to open /dev/zero: %v", err)
55 }
56 defer f.Close()
57
58 buf := new(bytes.Buffer)
59 toCopy := int64(800)
60 if n, err := io.CopyN(buf, f, toCopy); err != nil {
61 t.Fatalf("failed to copy bytes: %v", err)
62 } else if n != toCopy {
63 t.Fatalf("wanted to copy %d bytes, only copied %d", toCopy, n)
64 }
65
66 c.Broadcast(buf.Bytes())
67
68 <-ctx.Done()
69
70 if sent != true {
71 t.Fatalf("oversized message not sent")
72 }
73 }
74
75 func newChannel(
76 send func([]byte),
77 peers func() []*memberlist.Node,
78 sendOversize func(*memberlist.Node, []byte) error,
79 ) *Channel {
80 return NewChannel(
81 "test",
82 send,
83 peers,
84 sendOversize,
85 log.NewNopLogger(),
86 make(chan struct{}),
87 prometheus.NewRegistry(),
88 )
89 }
90
View as plain text