...

Source file src/github.com/docker/go-events/broadcast_test.go

Documentation: github.com/docker/go-events

     1  package events
     2  
     3  import (
     4  	"sync"
     5  	"testing"
     6  )
     7  
     8  func TestBroadcaster(t *testing.T) {
     9  	const nEvents = 1000
    10  	var sinks []Sink
    11  	b := NewBroadcaster()
    12  	for i := 0; i < 10; i++ {
    13  		sinks = append(sinks, newTestSink(t, nEvents))
    14  		b.Add(sinks[i])
    15  		b.Add(sinks[i]) // noop
    16  	}
    17  
    18  	var wg sync.WaitGroup
    19  	for i := 1; i <= nEvents; i++ {
    20  		wg.Add(1)
    21  		go func(event Event) {
    22  			if err := b.Write(event); err != nil {
    23  				t.Fatalf("error writing event %v: %v", event, err)
    24  			}
    25  			wg.Done()
    26  		}("event")
    27  	}
    28  
    29  	wg.Wait() // Wait until writes complete
    30  
    31  	for i := range sinks {
    32  		b.Remove(sinks[i])
    33  	}
    34  
    35  	// sending one more should trigger test failure if they weren't removed.
    36  	if err := b.Write("onemore"); err != nil {
    37  		t.Fatalf("unexpected error sending one more: %v", err)
    38  	}
    39  
    40  	// add them back to test closing.
    41  	for i := range sinks {
    42  		b.Add(sinks[i])
    43  	}
    44  
    45  	checkClose(t, b)
    46  
    47  	// Iterate through the sinks and check that they all have the expected length.
    48  	for _, sink := range sinks {
    49  		ts := sink.(*testSink)
    50  		ts.mu.Lock()
    51  		defer ts.mu.Unlock()
    52  
    53  		if len(ts.events) != nEvents {
    54  			t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
    55  		}
    56  
    57  		if !ts.closed {
    58  			t.Fatalf("sink should have been closed")
    59  		}
    60  	}
    61  }
    62  
    63  func BenchmarkBroadcast10(b *testing.B) {
    64  	benchmarkBroadcast(b, 10)
    65  }
    66  
    67  func BenchmarkBroadcast100(b *testing.B) {
    68  	benchmarkBroadcast(b, 100)
    69  }
    70  
    71  func BenchmarkBroadcast1000(b *testing.B) {
    72  	benchmarkBroadcast(b, 1000)
    73  }
    74  
    75  func BenchmarkBroadcast10000(b *testing.B) {
    76  	benchmarkBroadcast(b, 10000)
    77  }
    78  
    79  func benchmarkBroadcast(b *testing.B, nsinks int) {
    80  	// counter := metrics.NewCounter()
    81  	// metrics.DefaultRegistry.Register(fmt.Sprintf("nsinks: %v", nsinks), counter)
    82  	// go metrics.Log(metrics.DefaultRegistry, 500*time.Millisecond, log.New(os.Stderr, "metrics: ", log.LstdFlags))
    83  
    84  	b.StopTimer()
    85  	var sinks []Sink
    86  	for i := 0; i < nsinks; i++ {
    87  		// counter.Inc(1)
    88  		sinks = append(sinks, newTestSink(b, b.N))
    89  		// sinks = append(sinks, NewQueue(&testSink{t: b, expected: b.N}))
    90  	}
    91  	b.StartTimer()
    92  
    93  	// meter := metered{}
    94  	// NewQueue(meter.Egress(dst))
    95  
    96  	benchmarkSink(b, NewBroadcaster(sinks...))
    97  }
    98  

View as plain text