...

Source file src/google.golang.org/api/support/bundler/bundler_test.go

Documentation: google.golang.org/api/support/bundler

     1  // Copyright 2016 Google LLC.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package bundler
     6  
     7  import (
     8  	"context"
     9  	"fmt"
    10  	"math"
    11  	"reflect"
    12  	"runtime"
    13  	"sort"
    14  	"sync"
    15  	"testing"
    16  	"time"
    17  )
    18  
    19  func TestBundlerCount1(t *testing.T) {
    20  	// Unbundled case: one item per bundle.
    21  	handler := &testHandler{}
    22  	b := NewBundler(int(0), handler.handleImmediate)
    23  	b.BundleCountThreshold = 1
    24  	b.DelayThreshold = time.Second
    25  
    26  	for i := 0; i < 3; i++ {
    27  		if err := b.Add(i, 1); err != nil {
    28  			t.Fatal(err)
    29  		}
    30  	}
    31  	b.Flush()
    32  	got := handler.bundles()
    33  	want := [][]int{{0}, {1}, {2}}
    34  	if !reflect.DeepEqual(got, want) {
    35  		t.Errorf("bundles: got %v, want %v", got, want)
    36  	}
    37  	// All bundles should have been handled "immediately": much less
    38  	// than the delay threshold of 1s.
    39  	tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
    40  	twant := []int{0, 0, 0}
    41  	if !reflect.DeepEqual(tgot, twant) {
    42  		t.Errorf("times: got %v, want %v", tgot, twant)
    43  	}
    44  }
    45  
    46  func TestBundlerCount3(t *testing.T) {
    47  	handler := &testHandler{}
    48  	b := NewBundler(int(0), handler.handleImmediate)
    49  	b.BundleCountThreshold = 3
    50  	b.DelayThreshold = 100 * time.Millisecond
    51  	// Add 8 items.
    52  	// The first two bundles of 3 should both be handled quickly.
    53  	// The third bundle of 2 should not be handled for about DelayThreshold ms.
    54  	for i := 0; i < 8; i++ {
    55  		if err := b.Add(i, 1); err != nil {
    56  			t.Fatal(err)
    57  		}
    58  	}
    59  	time.Sleep(5 * b.DelayThreshold)
    60  	// We should not need to close the bundler.
    61  
    62  	bgot := handler.bundles()
    63  	bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7}}
    64  	if !reflect.DeepEqual(bgot, bwant) {
    65  		t.Errorf("bundles: got %v, want %v", bgot, bwant)
    66  	}
    67  
    68  	tgot := quantizeTimes(handler.times(), b.DelayThreshold)
    69  	if len(tgot) != 3 || tgot[0] != 0 || tgot[1] != 0 || tgot[2] == 0 {
    70  		t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
    71  	}
    72  }
    73  
    74  // Test that items are handled correctly at roughly the right time with a "slow"
    75  // handler (takes 300 milliseconds) and that the last bundle is automatically
    76  // flushed.
    77  func TestBundlerCountSlowHandler(t *testing.T) {
    78  	handler := &testHandler{}
    79  	b := NewBundler(int(0), handler.handleSlow)
    80  	b.BundleCountThreshold = 3
    81  	b.DelayThreshold = 500 * time.Millisecond
    82  	// Add 10 items.
    83  	for i := 0; i < 10; i++ {
    84  		if err := b.Add(i, 1); err != nil {
    85  			t.Fatal(err)
    86  		}
    87  	}
    88  	time.Sleep(4 * 300 * time.Millisecond)
    89  	// We should not need to close the bundler.
    90  
    91  	bgot := handler.bundles()
    92  	bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7, 8}, {9}}
    93  	if !reflect.DeepEqual(bgot, bwant) {
    94  		t.Errorf("bundles: got %v, want %v", bgot, bwant)
    95  	}
    96  
    97  	tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
    98  	// Should handle new bundle every 300 milliseconds, and last incomplete
    99  	// bundle should get automatically flushed.
   100  	twant := []int{0, 3, 6, 9}
   101  	if !reflect.DeepEqual(tgot, twant) {
   102  		t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
   103  	}
   104  }
   105  
   106  func TestBundlerByteThreshold(t *testing.T) {
   107  	handler := &testHandler{}
   108  	b := NewBundler(int(0), handler.handleImmediate)
   109  	b.BundleCountThreshold = 10
   110  	b.BundleByteThreshold = 3
   111  	// Increase the limit beyond the number of bundles we expect (3)
   112  	// so that bundles get handled immediately after they cross the
   113  	// threshold. Otherwise, the test is non-deterministic. With the default
   114  	// HandlerLimit of 1, the 2nd and 3rd bundles may or may not be
   115  	// combined based on how long it takes to handle the 1st bundle.
   116  	b.HandlerLimit = 10
   117  	add := func(i interface{}, s int) {
   118  		if err := b.Add(i, s); err != nil {
   119  			t.Fatal(err)
   120  		}
   121  	}
   122  
   123  	add(1, 1)
   124  	add(2, 2)
   125  	// Hit byte threshold AND under HandlerLimit:
   126  	// bundle = 1, 2
   127  	add(3, 1)
   128  	add(4, 1)
   129  	add(5, 2)
   130  	// Passed byte threshold AND under byte limit AND under HandlerLimit:
   131  	// bundle = 3, 4, 5
   132  	add(6, 1)
   133  	b.Flush()
   134  	bgot := handler.bundles()
   135  	// We don't care about the order they were handled in. We just want
   136  	// to test that crossing the threshold triggered handling.
   137  	sort.Slice(bgot, func(i, j int) bool {
   138  		return bgot[i][0] < bgot[j][0]
   139  	})
   140  	bwant := [][]int{{1, 2}, {3, 4, 5}, {6}}
   141  	if !reflect.DeepEqual(bgot, bwant) {
   142  		t.Errorf("bundles: got %v, want %v", bgot, bwant)
   143  	}
   144  	tgot := quantizeTimes(handler.times(), b.DelayThreshold)
   145  	twant := []int{0, 0, 0}
   146  	if !reflect.DeepEqual(tgot, twant) {
   147  		t.Errorf("times: got %v, want %v", tgot, twant)
   148  	}
   149  }
   150  
   151  func TestBundlerLimit(t *testing.T) {
   152  	handler := &testHandler{}
   153  	b := NewBundler(int(0), handler.handleImmediate)
   154  	b.BundleCountThreshold = 10
   155  	b.BundleByteLimit = 3
   156  	add := func(i interface{}, s int) {
   157  		if err := b.Add(i, s); err != nil {
   158  			t.Fatal(err)
   159  		}
   160  	}
   161  
   162  	add(1, 1)
   163  	add(2, 2)
   164  	// Hit byte limit: bundle = 1, 2
   165  	add(3, 1)
   166  	add(4, 1)
   167  	add(5, 2)
   168  	// Exceeded byte limit: bundle = 3, 4
   169  	add(6, 2)
   170  	// Exceeded byte limit: bundle = 5
   171  	b.Flush()
   172  	bgot := handler.bundles()
   173  	bwant := [][]int{{1, 2}, {3, 4}, {5}, {6}}
   174  	if !reflect.DeepEqual(bgot, bwant) {
   175  		t.Errorf("bundles: got %v, want %v", bgot, bwant)
   176  	}
   177  	tgot := quantizeTimes(handler.times(), b.DelayThreshold)
   178  	twant := []int{0, 0, 0, 0}
   179  	if !reflect.DeepEqual(tgot, twant) {
   180  		t.Errorf("times: got %v, want %v", tgot, twant)
   181  	}
   182  }
   183  
   184  func TestAddWait(t *testing.T) {
   185  	var (
   186  		mu     sync.Mutex
   187  		events []string
   188  	)
   189  	event := func(s string) {
   190  		mu.Lock()
   191  		events = append(events, s)
   192  		mu.Unlock()
   193  	}
   194  
   195  	handlec := make(chan int)
   196  	done := make(chan struct{})
   197  	b := NewBundler(int(0), func(interface{}) {
   198  		<-handlec
   199  		event("handle")
   200  	})
   201  	b.BufferedByteLimit = 3
   202  	addw := func(sz int) {
   203  		if err := b.AddWait(context.Background(), 0, sz); err != nil {
   204  			t.Fatal(err)
   205  		}
   206  		event(fmt.Sprintf("addw(%d)", sz))
   207  	}
   208  
   209  	addw(2)
   210  	go func() {
   211  		addw(3) // blocks until first bundle is handled
   212  		close(done)
   213  	}()
   214  	// Give addw(3) a chance to finish
   215  	time.Sleep(100 * time.Millisecond)
   216  	handlec <- 1 // handle the first bundle
   217  	select {
   218  	case <-time.After(time.Second):
   219  		t.Fatal("timed out")
   220  	case <-done:
   221  	}
   222  	want := []string{"addw(2)", "handle", "addw(3)"}
   223  	if !reflect.DeepEqual(events, want) {
   224  		t.Errorf("got  %v\nwant%v", events, want)
   225  	}
   226  }
   227  
   228  func TestAddWaitCancel(t *testing.T) {
   229  	b := NewBundler(int(0), func(interface{}) {})
   230  	b.BufferedByteLimit = 3
   231  	ctx, cancel := context.WithCancel(context.Background())
   232  	go func() {
   233  		time.Sleep(100 * time.Millisecond)
   234  		cancel()
   235  	}()
   236  	err := b.AddWait(ctx, 0, 4)
   237  	if want := context.Canceled; err != want {
   238  		t.Fatalf("got %v, want %v", err, want)
   239  	}
   240  }
   241  
   242  func TestBundlerErrors(t *testing.T) {
   243  	// Use a handler that blocks forever, to force the bundler to run out of
   244  	// memory.
   245  	b := NewBundler(int(0), func(interface{}) { select {} })
   246  	b.BundleByteLimit = 3
   247  	b.BufferedByteLimit = 10
   248  
   249  	if got, want := b.Add(1, 4), ErrOversizedItem; got != want {
   250  		t.Fatalf("got %v, want %v", got, want)
   251  	}
   252  
   253  	for i := 0; i < 5; i++ {
   254  		if err := b.Add(i, 2); err != nil {
   255  			t.Fatal(err)
   256  		}
   257  	}
   258  	if got, want := b.Add(5, 1), ErrOverflow; got != want {
   259  		t.Fatalf("got %v, want %v", got, want)
   260  	}
   261  }
   262  
   263  func TestModeError(t *testing.T) {
   264  	// Call Add then AddWait.
   265  	b := NewBundler(int(0), func(interface{}) {})
   266  	b.BundleByteLimit = 4
   267  	b.BufferedByteLimit = 4
   268  	if err := b.Add(0, 2); err != nil {
   269  		t.Fatal(err)
   270  	}
   271  	if got, want := b.AddWait(context.Background(), 0, 2), errMixedMethods; got != want {
   272  		t.Fatalf("got %v, want %v", got, want)
   273  	}
   274  	// Call AddWait then Add on new Bundler.
   275  	b1 := NewBundler(int(0), func(interface{}) {})
   276  	b1.BundleByteLimit = 4
   277  	b1.BufferedByteLimit = 4
   278  	if err := b1.AddWait(context.Background(), 0, 2); err != nil {
   279  		t.Fatal(err)
   280  	}
   281  	if got, want := b1.Add(0, 2), errMixedMethods; got != want {
   282  		t.Fatalf("got %v, want %v", got, want)
   283  	}
   284  }
   285  
   286  // Check that no more than HandlerLimit handlers are active at once.
   287  func TestConcurrentHandlersMax(t *testing.T) {
   288  	const handlerLimit = 10
   289  	var (
   290  		mu          sync.Mutex
   291  		active      int
   292  		maxHandlers int
   293  	)
   294  	b := NewBundler(int(0), func(s interface{}) {
   295  		mu.Lock()
   296  		active++
   297  		if active > maxHandlers {
   298  			maxHandlers = active
   299  		}
   300  		if maxHandlers > handlerLimit {
   301  			t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit)
   302  		}
   303  		mu.Unlock()
   304  		time.Sleep(1 * time.Millisecond) // let the scheduler work
   305  		mu.Lock()
   306  		active--
   307  		mu.Unlock()
   308  	})
   309  	b.BundleCountThreshold = 5
   310  	b.HandlerLimit = 10
   311  	defer b.Flush()
   312  
   313  	more := 0 // extra iterations past saturation
   314  	for i := 0; more == 0 || i < more; i++ {
   315  		mu.Lock()
   316  		m := maxHandlers
   317  		mu.Unlock()
   318  		if m >= handlerLimit && more == 0 {
   319  			// Run past saturation to check that we don't exceed the max.
   320  			more = 2 * i
   321  		}
   322  		b.Add(i, 1)
   323  	}
   324  }
   325  
   326  // Check that Flush doesn't return until all prior items have been handled.
   327  func TestConcurrentFlush(t *testing.T) {
   328  	var (
   329  		mu    sync.Mutex
   330  		items = make(map[int]bool)
   331  	)
   332  	b := NewBundler(int(0), func(s interface{}) {
   333  		mu.Lock()
   334  		for _, i := range s.([]int) {
   335  			items[i] = true
   336  		}
   337  		mu.Unlock()
   338  		time.Sleep(10 * time.Millisecond)
   339  	})
   340  	b.BundleCountThreshold = 5
   341  	b.HandlerLimit = 10
   342  	defer b.Flush()
   343  
   344  	var wg sync.WaitGroup
   345  	defer wg.Wait()
   346  	for i := 0; i < 50; i++ {
   347  		b.Add(i, 1)
   348  		if i%100 == 0 {
   349  			i := i
   350  			wg.Add(1)
   351  			go func() {
   352  				defer wg.Done()
   353  				b.Flush()
   354  				mu.Lock()
   355  				defer mu.Unlock()
   356  				for j := 0; j <= i; j++ {
   357  					if !items[j] {
   358  						// Cannot use Fatal, since we're in a non-test goroutine.
   359  						t.Errorf("flush(%d): item %d not handled", i, j)
   360  						break
   361  					}
   362  				}
   363  			}()
   364  		}
   365  	}
   366  }
   367  
   368  // Test that time based flushes do not deadlock
   369  func TestBundlerTimeBasedFlushDeadlock(t *testing.T) {
   370  	const (
   371  		goroutines = 1e3
   372  		iterations = 1e3
   373  
   374  		N = goroutines * iterations
   375  	)
   376  
   377  	var wg sync.WaitGroup
   378  	wg.Add(N)
   379  
   380  	flush := func(i interface{}) {
   381  		time.Sleep(10 * time.Millisecond)
   382  		buf := i.([]int)
   383  		for i := 0; i < len(buf); i++ {
   384  			wg.Done()
   385  		}
   386  	}
   387  
   388  	b := NewBundler(int(0), flush)
   389  	b.DelayThreshold = 10 * time.Millisecond
   390  	b.HandlerLimit = 1
   391  
   392  	// high thresholds to ensure that we only hit time based flushes
   393  	b.BundleCountThreshold = math.MaxInt32
   394  	b.BundleByteThreshold = math.MaxInt32
   395  
   396  	ctx, cancel := context.WithCancel(context.Background())
   397  	time.AfterFunc(15*time.Second, cancel)
   398  
   399  	add := func(i int) {
   400  		for j := 0; j < iterations; j++ {
   401  			if err := b.AddWait(ctx, i, 1); err != nil {
   402  				t.Fatalf("timed out: %v", err)
   403  			}
   404  			runtime.Gosched()
   405  		}
   406  	}
   407  
   408  	for i := 0; i < goroutines; i++ {
   409  		go add(i)
   410  	}
   411  
   412  	// verify that we don't block forever
   413  	wg.Wait()
   414  }
   415  
   416  type testHandler struct {
   417  	mu sync.Mutex
   418  	b  [][]int
   419  	t  []time.Time
   420  }
   421  
   422  func (t *testHandler) bundles() [][]int {
   423  	t.mu.Lock()
   424  	defer t.mu.Unlock()
   425  	return t.b
   426  }
   427  
   428  func (t *testHandler) times() []time.Time {
   429  	t.mu.Lock()
   430  	defer t.mu.Unlock()
   431  	return t.t
   432  }
   433  
   434  // Handler takes no time beyond adding to a list
   435  func (t *testHandler) handleImmediate(b interface{}) {
   436  	t.mu.Lock()
   437  	defer t.mu.Unlock()
   438  	t.b = append(t.b, b.([]int))
   439  	t.t = append(t.t, time.Now())
   440  }
   441  
   442  // Handler takes 300 milliseconds
   443  func (t *testHandler) handleSlow(b interface{}) {
   444  	t.mu.Lock()
   445  	defer t.mu.Unlock()
   446  	t.b = append(t.b, b.([]int))
   447  	t.t = append(t.t, time.Now())
   448  	time.Sleep(300 * time.Millisecond)
   449  }
   450  
   451  // Handler takes one millisecond
   452  func (t *testHandler) handleQuick(b interface{}) {
   453  	t.mu.Lock()
   454  	defer t.mu.Unlock()
   455  	t.b = append(t.b, b.([]int))
   456  	t.t = append(t.t, time.Now())
   457  	time.Sleep(time.Millisecond)
   458  }
   459  
   460  // Round times to the nearest q and express them as the number of q
   461  // since the first time.
   462  // E.g. if q is 100ms, then a time within 50ms of the first time
   463  // will be represented as 0, a time 150 to 250ms of the first time
   464  // we be represented as 1, etc.
   465  func quantizeTimes(times []time.Time, q time.Duration) []int {
   466  	var rs []int
   467  	for _, t := range times {
   468  		d := t.Sub(times[0])
   469  		r := int((d + q/2) / q)
   470  		rs = append(rs, r)
   471  	}
   472  	return rs
   473  }
   474  
   475  func TestQuantizeTimes(t *testing.T) {
   476  	quantum := 100 * time.Millisecond
   477  	for _, test := range []struct {
   478  		millis []int // times in milliseconds
   479  		want   []int
   480  	}{
   481  		{[]int{10, 20, 30}, []int{0, 0, 0}},
   482  		{[]int{0, 49, 50, 90}, []int{0, 0, 1, 1}},
   483  		{[]int{0, 95, 170, 315}, []int{0, 1, 2, 3}},
   484  	} {
   485  		var times []time.Time
   486  		for _, ms := range test.millis {
   487  			times = append(times, time.Unix(0, int64(ms*1e6)))
   488  		}
   489  		got := quantizeTimes(times, quantum)
   490  		if !reflect.DeepEqual(got, test.want) {
   491  			t.Errorf("%v: got %v, want %v", test.millis, got, test.want)
   492  		}
   493  	}
   494  }
   495  
   496  // Measure the cost of adding a bunch of items only, though some handling may be
   497  // happening in the background
   498  func BenchmarkBundlerAdd(bench *testing.B) {
   499  	// Unbundled case: one item per bundle.
   500  	handler := &testHandler{}
   501  	b := NewBundler(int(0), handler.handleImmediate)
   502  	b.BundleCountThreshold = 1
   503  	b.DelayThreshold = time.Second
   504  
   505  	for i := 0; i < bench.N; i++ {
   506  		if err := b.Add(i, 1); err != nil {
   507  			bench.Fatal(err)
   508  		}
   509  	}
   510  }
   511  
   512  // Measure the cost of adding a bunch of items, and then waiting for them all to
   513  // be handled, when handling is immediate (no delay)
   514  func BenchmarkBundlerAddAndFlush(bench *testing.B) {
   515  	// Unbundled case: one item per bundle.
   516  	handler := &testHandler{}
   517  	b := NewBundler(int(0), handler.handleImmediate)
   518  	b.BundleCountThreshold = 1
   519  	b.DelayThreshold = time.Second
   520  
   521  	for i := 0; i < bench.N; i++ {
   522  		if err := b.Add(i, 1); err != nil {
   523  			bench.Fatal(err)
   524  		}
   525  	}
   526  	b.Flush()
   527  }
   528  
   529  // Measure the cost of adding a bunch of items, and then waiting for them all to
   530  // be handled, when handling a bundle (1 item only) takes one millisecond
   531  func BenchmarkBundlerAddAndFlushSlow1(bench *testing.B) {
   532  	// Unbundled case: one item per bundle.
   533  	handler := &testHandler{}
   534  	b := NewBundler(int(0), handler.handleQuick)
   535  	b.BundleCountThreshold = 1
   536  	b.DelayThreshold = time.Second
   537  
   538  	for i := 0; i < bench.N; i++ {
   539  		if err := b.Add(i, 1); err != nil {
   540  			bench.Fatal(err)
   541  		}
   542  	}
   543  	b.Flush()
   544  }
   545  
   546  // Measure the cost of adding a bunch of items, and then waiting for them all to
   547  // be handled, when handling a bundle (25 items) takes one millisecond
   548  func BenchmarkBundlerAddAndFlushSlow25(bench *testing.B) {
   549  	// More realistic: 25 items per bundle
   550  	handler := &testHandler{}
   551  	b := NewBundler(int(0), handler.handleQuick)
   552  	b.BundleCountThreshold = 25
   553  	b.DelayThreshold = time.Second
   554  
   555  	for i := 0; i < bench.N; i++ {
   556  		if err := b.Add(i, 1); err != nil {
   557  			bench.Fatal(err)
   558  		}
   559  	}
   560  	b.Flush()
   561  }
   562  

View as plain text