...
1
2
3
4
5 package websocket
6
7 import (
8 "io"
9 "io/ioutil"
10 "sync/atomic"
11 "testing"
12 )
13
14
15
16
17
18
19 type broadcastBench struct {
20 w io.Writer
21 closeCh chan struct{}
22 doneCh chan struct{}
23 count int32
24 conns []*broadcastConn
25 compression bool
26 usePrepared bool
27 }
28
29 type broadcastMessage struct {
30 payload []byte
31 prepared *PreparedMessage
32 }
33
34 type broadcastConn struct {
35 conn *Conn
36 msgCh chan *broadcastMessage
37 }
38
39 func newBroadcastConn(c *Conn) *broadcastConn {
40 return &broadcastConn{
41 conn: c,
42 msgCh: make(chan *broadcastMessage, 1),
43 }
44 }
45
46 func newBroadcastBench(usePrepared, compression bool) *broadcastBench {
47 bench := &broadcastBench{
48 w: ioutil.Discard,
49 doneCh: make(chan struct{}),
50 closeCh: make(chan struct{}),
51 usePrepared: usePrepared,
52 compression: compression,
53 }
54 bench.makeConns(10000)
55 return bench
56 }
57
58 func (b *broadcastBench) makeConns(numConns int) {
59 conns := make([]*broadcastConn, numConns)
60
61 for i := 0; i < numConns; i++ {
62 c := newTestConn(nil, b.w, true)
63 if b.compression {
64 c.enableWriteCompression = true
65 c.newCompressionWriter = compressNoContextTakeover
66 }
67 conns[i] = newBroadcastConn(c)
68 go func(c *broadcastConn) {
69 for {
70 select {
71 case msg := <-c.msgCh:
72 if msg.prepared != nil {
73 c.conn.WritePreparedMessage(msg.prepared)
74 } else {
75 c.conn.WriteMessage(TextMessage, msg.payload)
76 }
77 val := atomic.AddInt32(&b.count, 1)
78 if val%int32(numConns) == 0 {
79 b.doneCh <- struct{}{}
80 }
81 case <-b.closeCh:
82 return
83 }
84 }
85 }(conns[i])
86 }
87 b.conns = conns
88 }
89
90 func (b *broadcastBench) close() {
91 close(b.closeCh)
92 }
93
94 func (b *broadcastBench) broadcastOnce(msg *broadcastMessage) {
95 for _, c := range b.conns {
96 c.msgCh <- msg
97 }
98 <-b.doneCh
99 }
100
101 func BenchmarkBroadcast(b *testing.B) {
102 benchmarks := []struct {
103 name string
104 usePrepared bool
105 compression bool
106 }{
107 {"NoCompression", false, false},
108 {"Compression", false, true},
109 {"NoCompressionPrepared", true, false},
110 {"CompressionPrepared", true, true},
111 }
112 payload := textMessages(1)[0]
113 for _, bm := range benchmarks {
114 b.Run(bm.name, func(b *testing.B) {
115 bench := newBroadcastBench(bm.usePrepared, bm.compression)
116 defer bench.close()
117 b.ResetTimer()
118 for i := 0; i < b.N; i++ {
119 message := &broadcastMessage{
120 payload: payload,
121 }
122 if bench.usePrepared {
123 pm, _ := NewPreparedMessage(TextMessage, message.payload)
124 message.prepared = pm
125 }
126 bench.broadcastOnce(message)
127 }
128 b.ReportAllocs()
129 })
130 }
131 }
132
View as plain text