...

Source file src/github.com/docker/distribution/notifications/sinks_test.go

Documentation: github.com/docker/distribution/notifications

     1  package notifications
     2  
     3  import (
     4  	"fmt"
     5  	"math/rand"
     6  	"reflect"
     7  	"sync"
     8  	"time"
     9  
    10  	"github.com/sirupsen/logrus"
    11  
    12  	"testing"
    13  )
    14  
    15  func TestBroadcaster(t *testing.T) {
    16  	const nEvents = 1000
    17  	var sinks []Sink
    18  
    19  	for i := 0; i < 10; i++ {
    20  		sinks = append(sinks, &testSink{})
    21  	}
    22  
    23  	b := NewBroadcaster(sinks...)
    24  
    25  	var block []Event
    26  	var wg sync.WaitGroup
    27  	for i := 1; i <= nEvents; i++ {
    28  		block = append(block, createTestEvent("push", "library/test", "blob"))
    29  
    30  		if i%10 == 0 && i > 0 {
    31  			wg.Add(1)
    32  			go func(block ...Event) {
    33  				if err := b.Write(block...); err != nil {
    34  					t.Errorf("error writing block of length %d: %v", len(block), err)
    35  				}
    36  				wg.Done()
    37  			}(block...)
    38  
    39  			block = nil
    40  		}
    41  	}
    42  
    43  	wg.Wait() // Wait until writes complete
    44  	if t.Failed() {
    45  		t.FailNow()
    46  	}
    47  	checkClose(t, b)
    48  
    49  	// Iterate through the sinks and check that they all have the expected length.
    50  	for _, sink := range sinks {
    51  		ts := sink.(*testSink)
    52  		ts.mu.Lock()
    53  		defer ts.mu.Unlock()
    54  
    55  		if len(ts.events) != nEvents {
    56  			t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
    57  		}
    58  
    59  		if !ts.closed {
    60  			t.Fatalf("sink should have been closed")
    61  		}
    62  	}
    63  
    64  }
    65  
    66  func TestEventQueue(t *testing.T) {
    67  	const nevents = 1000
    68  	var ts testSink
    69  	metrics := newSafeMetrics()
    70  	eq := newEventQueue(
    71  		// delayed sync simulates destination slower than channel comms
    72  		&delayedSink{
    73  			Sink:  &ts,
    74  			delay: time.Millisecond * 1,
    75  		}, metrics.eventQueueListener())
    76  
    77  	var wg sync.WaitGroup
    78  	var block []Event
    79  	for i := 1; i <= nevents; i++ {
    80  		block = append(block, createTestEvent("push", "library/test", "blob"))
    81  		if i%10 == 0 && i > 0 {
    82  			wg.Add(1)
    83  			go func(block ...Event) {
    84  				if err := eq.Write(block...); err != nil {
    85  					t.Errorf("error writing event block: %v", err)
    86  				}
    87  				wg.Done()
    88  			}(block...)
    89  
    90  			block = nil
    91  		}
    92  	}
    93  
    94  	wg.Wait()
    95  	if t.Failed() {
    96  		t.FailNow()
    97  	}
    98  	checkClose(t, eq)
    99  
   100  	ts.mu.Lock()
   101  	defer ts.mu.Unlock()
   102  	metrics.Lock()
   103  	defer metrics.Unlock()
   104  
   105  	if len(ts.events) != nevents {
   106  		t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
   107  	}
   108  
   109  	if !ts.closed {
   110  		t.Fatalf("sink should have been closed")
   111  	}
   112  
   113  	if metrics.Events != nevents {
   114  		t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
   115  	}
   116  
   117  	if metrics.Pending != 0 {
   118  		t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
   119  	}
   120  }
   121  
   122  func TestIgnoredSink(t *testing.T) {
   123  	blob := createTestEvent("push", "library/test", "blob")
   124  	manifest := createTestEvent("pull", "library/test", "manifest")
   125  
   126  	type testcase struct {
   127  		ignoreMediaTypes []string
   128  		ignoreActions    []string
   129  		expected         []Event
   130  	}
   131  
   132  	cases := []testcase{
   133  		{nil, nil, []Event{blob, manifest}},
   134  		{[]string{"other"}, []string{"other"}, []Event{blob, manifest}},
   135  		{[]string{"blob"}, []string{"other"}, []Event{manifest}},
   136  		{[]string{"blob", "manifest"}, []string{"other"}, nil},
   137  		{[]string{"other"}, []string{"push"}, []Event{manifest}},
   138  		{[]string{"other"}, []string{"pull"}, []Event{blob}},
   139  		{[]string{"other"}, []string{"pull", "push"}, nil},
   140  	}
   141  
   142  	for _, c := range cases {
   143  		ts := &testSink{}
   144  		s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions)
   145  
   146  		if err := s.Write(blob, manifest); err != nil {
   147  			t.Fatalf("error writing event: %v", err)
   148  		}
   149  
   150  		ts.mu.Lock()
   151  		if !reflect.DeepEqual(ts.events, c.expected) {
   152  			t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected)
   153  		}
   154  		ts.mu.Unlock()
   155  	}
   156  }
   157  
   158  func TestRetryingSink(t *testing.T) {
   159  
   160  	// Make a sync that fails most of the time, ensuring that all the events
   161  	// make it through.
   162  	var ts testSink
   163  	flaky := &flakySink{
   164  		rate: 1.0, // start out always failing.
   165  		Sink: &ts,
   166  	}
   167  	s := newRetryingSink(flaky, 3, 10*time.Millisecond)
   168  
   169  	var wg sync.WaitGroup
   170  	var block []Event
   171  	for i := 1; i <= 100; i++ {
   172  		block = append(block, createTestEvent("push", "library/test", "blob"))
   173  
   174  		// Above 50, set the failure rate lower
   175  		if i > 50 {
   176  			s.mu.Lock()
   177  			flaky.rate = 0.90
   178  			s.mu.Unlock()
   179  		}
   180  
   181  		if i%10 == 0 && i > 0 {
   182  			wg.Add(1)
   183  			go func(block ...Event) {
   184  				defer wg.Done()
   185  				if err := s.Write(block...); err != nil {
   186  					t.Errorf("error writing event block: %v", err)
   187  				}
   188  			}(block...)
   189  
   190  			block = nil
   191  		}
   192  	}
   193  
   194  	wg.Wait()
   195  	if t.Failed() {
   196  		t.FailNow()
   197  	}
   198  	checkClose(t, s)
   199  
   200  	ts.mu.Lock()
   201  	defer ts.mu.Unlock()
   202  
   203  	if len(ts.events) != 100 {
   204  		t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
   205  	}
   206  }
   207  
   208  type testSink struct {
   209  	events []Event
   210  	mu     sync.Mutex
   211  	closed bool
   212  }
   213  
   214  func (ts *testSink) Write(events ...Event) error {
   215  	ts.mu.Lock()
   216  	defer ts.mu.Unlock()
   217  	ts.events = append(ts.events, events...)
   218  	return nil
   219  }
   220  
   221  func (ts *testSink) Close() error {
   222  	ts.mu.Lock()
   223  	defer ts.mu.Unlock()
   224  	ts.closed = true
   225  
   226  	logrus.Infof("closing testSink")
   227  	return nil
   228  }
   229  
   230  type delayedSink struct {
   231  	Sink
   232  	delay time.Duration
   233  }
   234  
   235  func (ds *delayedSink) Write(events ...Event) error {
   236  	time.Sleep(ds.delay)
   237  	return ds.Sink.Write(events...)
   238  }
   239  
   240  type flakySink struct {
   241  	Sink
   242  	rate float64
   243  }
   244  
   245  func (fs *flakySink) Write(events ...Event) error {
   246  	if rand.Float64() < fs.rate {
   247  		return fmt.Errorf("error writing %d events", len(events))
   248  	}
   249  
   250  	return fs.Sink.Write(events...)
   251  }
   252  
   253  func checkClose(t *testing.T, sink Sink) {
   254  	if err := sink.Close(); err != nil {
   255  		t.Fatalf("unexpected error closing: %v", err)
   256  	}
   257  
   258  	// second close should not crash but should return an error.
   259  	if err := sink.Close(); err == nil {
   260  		t.Fatalf("no error on double close")
   261  	}
   262  
   263  	// Write after closed should be an error
   264  	if err := sink.Write([]Event{}...); err == nil {
   265  		t.Fatalf("write after closed did not have an error")
   266  	} else if err != ErrSinkClosed {
   267  		t.Fatalf("error should be ErrSinkClosed")
   268  	}
   269  }
   270  

View as plain text