...

Source file src/github.com/gorilla/websocket/conn_broadcast_test.go

Documentation: github.com/gorilla/websocket

     1  // Copyright 2017 The Gorilla WebSocket Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package websocket
     6  
     7  import (
     8  	"io"
     9  	"io/ioutil"
    10  	"sync/atomic"
    11  	"testing"
    12  )
    13  
    14  // broadcastBench allows to run broadcast benchmarks.
    15  // In every broadcast benchmark we create many connections, then send the same
    16  // message into every connection and wait for all writes complete. This emulates
    17  // an application where many connections listen to the same data - i.e. PUB/SUB
    18  // scenarios with many subscribers in one channel.
    19  type broadcastBench struct {
    20  	w           io.Writer
    21  	closeCh     chan struct{}
    22  	doneCh      chan struct{}
    23  	count       int32
    24  	conns       []*broadcastConn
    25  	compression bool
    26  	usePrepared bool
    27  }
    28  
    29  type broadcastMessage struct {
    30  	payload  []byte
    31  	prepared *PreparedMessage
    32  }
    33  
    34  type broadcastConn struct {
    35  	conn  *Conn
    36  	msgCh chan *broadcastMessage
    37  }
    38  
    39  func newBroadcastConn(c *Conn) *broadcastConn {
    40  	return &broadcastConn{
    41  		conn:  c,
    42  		msgCh: make(chan *broadcastMessage, 1),
    43  	}
    44  }
    45  
    46  func newBroadcastBench(usePrepared, compression bool) *broadcastBench {
    47  	bench := &broadcastBench{
    48  		w:           ioutil.Discard,
    49  		doneCh:      make(chan struct{}),
    50  		closeCh:     make(chan struct{}),
    51  		usePrepared: usePrepared,
    52  		compression: compression,
    53  	}
    54  	bench.makeConns(10000)
    55  	return bench
    56  }
    57  
    58  func (b *broadcastBench) makeConns(numConns int) {
    59  	conns := make([]*broadcastConn, numConns)
    60  
    61  	for i := 0; i < numConns; i++ {
    62  		c := newTestConn(nil, b.w, true)
    63  		if b.compression {
    64  			c.enableWriteCompression = true
    65  			c.newCompressionWriter = compressNoContextTakeover
    66  		}
    67  		conns[i] = newBroadcastConn(c)
    68  		go func(c *broadcastConn) {
    69  			for {
    70  				select {
    71  				case msg := <-c.msgCh:
    72  					if msg.prepared != nil {
    73  						c.conn.WritePreparedMessage(msg.prepared)
    74  					} else {
    75  						c.conn.WriteMessage(TextMessage, msg.payload)
    76  					}
    77  					val := atomic.AddInt32(&b.count, 1)
    78  					if val%int32(numConns) == 0 {
    79  						b.doneCh <- struct{}{}
    80  					}
    81  				case <-b.closeCh:
    82  					return
    83  				}
    84  			}
    85  		}(conns[i])
    86  	}
    87  	b.conns = conns
    88  }
    89  
    90  func (b *broadcastBench) close() {
    91  	close(b.closeCh)
    92  }
    93  
    94  func (b *broadcastBench) broadcastOnce(msg *broadcastMessage) {
    95  	for _, c := range b.conns {
    96  		c.msgCh <- msg
    97  	}
    98  	<-b.doneCh
    99  }
   100  
   101  func BenchmarkBroadcast(b *testing.B) {
   102  	benchmarks := []struct {
   103  		name        string
   104  		usePrepared bool
   105  		compression bool
   106  	}{
   107  		{"NoCompression", false, false},
   108  		{"Compression", false, true},
   109  		{"NoCompressionPrepared", true, false},
   110  		{"CompressionPrepared", true, true},
   111  	}
   112  	payload := textMessages(1)[0]
   113  	for _, bm := range benchmarks {
   114  		b.Run(bm.name, func(b *testing.B) {
   115  			bench := newBroadcastBench(bm.usePrepared, bm.compression)
   116  			defer bench.close()
   117  			b.ResetTimer()
   118  			for i := 0; i < b.N; i++ {
   119  				message := &broadcastMessage{
   120  					payload: payload,
   121  				}
   122  				if bench.usePrepared {
   123  					pm, _ := NewPreparedMessage(TextMessage, message.payload)
   124  					message.prepared = pm
   125  				}
   126  				bench.broadcastOnce(message)
   127  			}
   128  			b.ReportAllocs()
   129  		})
   130  	}
   131  }
   132  

View as plain text