...

Source file src/k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner_test.go

Documentation: k8s.io/kubernetes/pkg/util/async

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package async
    18  
    19  import (
    20  	"sync"
    21  	"testing"
    22  	"time"
    23  )
    24  
    25  // Track calls to the managed function.
    26  type receiver struct {
    27  	lock    sync.Mutex
    28  	run     bool
    29  	retryFn func()
    30  }
    31  
    32  func (r *receiver) F() {
    33  	r.lock.Lock()
    34  	defer r.lock.Unlock()
    35  	r.run = true
    36  
    37  	if r.retryFn != nil {
    38  		r.retryFn()
    39  		r.retryFn = nil
    40  	}
    41  }
    42  
    43  func (r *receiver) reset() bool {
    44  	r.lock.Lock()
    45  	defer r.lock.Unlock()
    46  	was := r.run
    47  	r.run = false
    48  	return was
    49  }
    50  
    51  func (r *receiver) setRetryFn(retryFn func()) {
    52  	r.lock.Lock()
    53  	defer r.lock.Unlock()
    54  	r.retryFn = retryFn
    55  }
    56  
    57  // A single change event in the fake timer.
    58  type timerUpdate struct {
    59  	active bool
    60  	next   time.Duration // iff active == true
    61  }
    62  
    63  // Fake time.
    64  type fakeTimer struct {
    65  	c chan time.Time
    66  
    67  	lock    sync.Mutex
    68  	now     time.Time
    69  	timeout time.Time
    70  	active  bool
    71  
    72  	updated chan timerUpdate
    73  }
    74  
    75  func newFakeTimer() *fakeTimer {
    76  	ft := &fakeTimer{
    77  		now:     time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
    78  		c:       make(chan time.Time),
    79  		updated: make(chan timerUpdate),
    80  	}
    81  	return ft
    82  }
    83  
    84  func (ft *fakeTimer) C() <-chan time.Time {
    85  	return ft.c
    86  }
    87  
    88  func (ft *fakeTimer) Reset(in time.Duration) bool {
    89  	ft.lock.Lock()
    90  	defer ft.lock.Unlock()
    91  
    92  	was := ft.active
    93  	ft.active = true
    94  	ft.timeout = ft.now.Add(in)
    95  	ft.updated <- timerUpdate{
    96  		active: true,
    97  		next:   in,
    98  	}
    99  	return was
   100  }
   101  
   102  func (ft *fakeTimer) Stop() bool {
   103  	ft.lock.Lock()
   104  	defer ft.lock.Unlock()
   105  
   106  	was := ft.active
   107  	ft.active = false
   108  	ft.updated <- timerUpdate{
   109  		active: false,
   110  	}
   111  	return was
   112  }
   113  
   114  func (ft *fakeTimer) Now() time.Time {
   115  	ft.lock.Lock()
   116  	defer ft.lock.Unlock()
   117  
   118  	return ft.now
   119  }
   120  
   121  func (ft *fakeTimer) Remaining() time.Duration {
   122  	ft.lock.Lock()
   123  	defer ft.lock.Unlock()
   124  
   125  	return ft.timeout.Sub(ft.now)
   126  }
   127  
   128  func (ft *fakeTimer) Since(t time.Time) time.Duration {
   129  	ft.lock.Lock()
   130  	defer ft.lock.Unlock()
   131  
   132  	return ft.now.Sub(t)
   133  }
   134  
   135  func (ft *fakeTimer) Sleep(d time.Duration) {
   136  	// ft.advance grabs ft.lock
   137  	ft.advance(d)
   138  }
   139  
   140  // advance the current time.
   141  func (ft *fakeTimer) advance(d time.Duration) {
   142  	ft.lock.Lock()
   143  	defer ft.lock.Unlock()
   144  
   145  	ft.now = ft.now.Add(d)
   146  	if ft.active && !ft.now.Before(ft.timeout) {
   147  		ft.active = false
   148  		ft.c <- ft.timeout
   149  	}
   150  }
   151  
   152  // return the calling line number (for printing)
   153  // test the timer's state
   154  func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
   155  	if upd.active != active {
   156  		t.Fatalf("%s: expected timer active=%v", name, active)
   157  	}
   158  	if active && upd.next != next {
   159  		t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
   160  	}
   161  }
   162  
   163  // test and reset the receiver's state
   164  func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
   165  	triggered := receiver.reset()
   166  	if expected && !triggered {
   167  		t.Fatalf("%s: function should have been called", name)
   168  	} else if !expected && triggered {
   169  		t.Fatalf("%s: function should not have been called", name)
   170  	}
   171  }
   172  
   173  // Durations embedded in test cases depend on these.
   174  var minInterval = 1 * time.Second
   175  var maxInterval = 10 * time.Second
   176  
   177  func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
   178  	upd := <-timer.updated // wait for stop
   179  	checkReceiver(name, t, obj, expectCall)
   180  	checkReceiver(name, t, obj, false) // prove post-condition
   181  	checkTimer(name, t, upd, false, 0)
   182  	upd = <-timer.updated // wait for reset
   183  	checkTimer(name, t, upd, true, expectNext)
   184  }
   185  
   186  func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
   187  	waitForReset(name, t, timer, obj, true, maxInterval)
   188  }
   189  
   190  func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
   191  	// It will first get reset as with a normal run, and then get set again
   192  	waitForRun(name, t, timer, obj)
   193  	waitForReset(name, t, timer, obj, false, expectNext)
   194  }
   195  
   196  func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
   197  	waitForReset(name, t, timer, obj, false, expectNext)
   198  }
   199  
   200  func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
   201  	select {
   202  	case <-timer.c:
   203  		t.Fatalf("%s: unexpected timer tick", name)
   204  	case upd := <-timer.updated:
   205  		t.Fatalf("%s: unexpected timer update %v", name, upd)
   206  	default:
   207  	}
   208  	checkReceiver(name, t, obj, false)
   209  }
   210  
   211  func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
   212  	obj := &receiver{}
   213  	timer := newFakeTimer()
   214  	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
   215  	stop := make(chan struct{})
   216  
   217  	var upd timerUpdate
   218  
   219  	// Start.
   220  	go runner.Loop(stop)
   221  	upd = <-timer.updated // wait for initial time to be set to max
   222  	checkTimer("init", t, upd, true, maxInterval)
   223  	checkReceiver("init", t, obj, false)
   224  
   225  	// Run once, immediately.
   226  	// rel=0ms
   227  	runner.Run()
   228  	waitForRun("first run", t, timer, obj)
   229  
   230  	// Run again, before minInterval expires.
   231  	timer.advance(500 * time.Millisecond) // rel=500ms
   232  	runner.Run()
   233  	waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
   234  
   235  	// Run again, before minInterval expires.
   236  	timer.advance(499 * time.Millisecond) // rel=999ms
   237  	runner.Run()
   238  	waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
   239  
   240  	// Do the deferred run
   241  	timer.advance(1 * time.Millisecond) // rel=1000ms
   242  	waitForRun("second run", t, timer, obj)
   243  
   244  	// Try again immediately
   245  	runner.Run()
   246  	waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
   247  
   248  	// Run again, before minInterval expires.
   249  	timer.advance(1 * time.Millisecond) // rel=1ms
   250  	runner.Run()
   251  	waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
   252  
   253  	// Ensure that we don't run again early
   254  	timer.advance(998 * time.Millisecond) // rel=999ms
   255  	waitForNothing("premature", t, timer, obj)
   256  
   257  	// Do the deferred run
   258  	timer.advance(1 * time.Millisecond) // rel=1000ms
   259  	waitForRun("third run", t, timer, obj)
   260  
   261  	// Let minInterval pass, but there are no runs queued
   262  	timer.advance(1 * time.Second) // rel=1000ms
   263  	waitForNothing("minInterval", t, timer, obj)
   264  
   265  	// Let maxInterval pass
   266  	timer.advance(9 * time.Second) // rel=10000ms
   267  	waitForRun("maxInterval", t, timer, obj)
   268  
   269  	// Run again, before minInterval expires.
   270  	timer.advance(1 * time.Millisecond) // rel=1ms
   271  	runner.Run()
   272  	waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
   273  
   274  	// Let minInterval pass
   275  	timer.advance(999 * time.Millisecond) // rel=1000ms
   276  	waitForRun("fifth run", t, timer, obj)
   277  
   278  	// Clean up.
   279  	stop <- struct{}{}
   280  	// a message is sent to time.updated in func Stop() at the end of the child goroutine
   281  	// to terminate the child, a receive on time.updated is needed here
   282  	<-timer.updated
   283  }
   284  
   285  func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
   286  	obj := &receiver{}
   287  	timer := newFakeTimer()
   288  	runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
   289  	stop := make(chan struct{})
   290  
   291  	var upd timerUpdate
   292  
   293  	// Start.
   294  	go runner.Loop(stop)
   295  	upd = <-timer.updated // wait for initial time to be set to max
   296  	checkTimer("init", t, upd, true, maxInterval)
   297  	checkReceiver("init", t, obj, false)
   298  
   299  	// Run once, immediately.
   300  	// abs=0ms, rel=0ms
   301  	runner.Run()
   302  	waitForRun("first run", t, timer, obj)
   303  
   304  	// Run again, before minInterval expires, with burst.
   305  	timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
   306  	runner.Run()
   307  	waitForRun("second run", t, timer, obj)
   308  
   309  	// Run again, before minInterval expires.
   310  	timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
   311  	runner.Run()
   312  	waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
   313  
   314  	// Run again, before minInterval expires.
   315  	timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
   316  	runner.Run()
   317  	waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
   318  
   319  	// Run again, before minInterval expires.
   320  	timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
   321  	runner.Run()
   322  	waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
   323  
   324  	// Advance timer enough to replenish bursts, but not enough to be minInterval
   325  	// after the last run
   326  	timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
   327  	waitForNothing("not minInterval", t, timer, obj)
   328  	runner.Run()
   329  	waitForRun("third run", t, timer, obj)
   330  
   331  	// Run again, before minInterval expires.
   332  	timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
   333  	runner.Run()
   334  	waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
   335  
   336  	// Run again, before minInterval expires.
   337  	timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
   338  	runner.Run()
   339  	waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
   340  
   341  	// Advance and do the deferred run
   342  	timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
   343  	waitForRun("fourth run", t, timer, obj)
   344  
   345  	// Run again, once burst has fully replenished.
   346  	timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
   347  	runner.Run()
   348  	waitForRun("fifth run", t, timer, obj)
   349  	runner.Run()
   350  	waitForRun("sixth run", t, timer, obj)
   351  	runner.Run()
   352  	waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
   353  
   354  	// Wait until minInterval after the last run
   355  	timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
   356  	waitForRun("seventh run", t, timer, obj)
   357  
   358  	// Wait for maxInterval
   359  	timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
   360  	waitForRun("maxInterval", t, timer, obj)
   361  
   362  	// Clean up.
   363  	stop <- struct{}{}
   364  	// a message is sent to time.updated in func Stop() at the end of the child goroutine
   365  	// to terminate the child, a receive on time.updated is needed here
   366  	<-timer.updated
   367  }
   368  
   369  func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
   370  	obj := &receiver{}
   371  	timer := newFakeTimer()
   372  	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
   373  	stop := make(chan struct{})
   374  
   375  	var upd timerUpdate
   376  
   377  	// Start.
   378  	go runner.Loop(stop)
   379  	upd = <-timer.updated // wait for initial time to be set to max
   380  	checkTimer("init", t, upd, true, maxInterval)
   381  	checkReceiver("init", t, obj, false)
   382  
   383  	// Run once, immediately, and queue a retry
   384  	// rel=0ms
   385  	obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
   386  	runner.Run()
   387  	waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
   388  
   389  	// Nothing happens...
   390  	timer.advance(time.Second) // rel=1000ms
   391  	waitForNothing("minInterval, nothing queued", t, timer, obj)
   392  
   393  	// After retryInterval, function is called
   394  	timer.advance(4 * time.Second) // rel=5000ms
   395  	waitForRun("retry", t, timer, obj)
   396  
   397  	// Run again, before minInterval expires.
   398  	timer.advance(499 * time.Millisecond) // rel=499ms
   399  	runner.Run()
   400  	waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
   401  
   402  	// Do the deferred run, queue another retry after it returns
   403  	timer.advance(501 * time.Millisecond) // rel=1000ms
   404  	runner.RetryAfter(5 * time.Second)
   405  	waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
   406  
   407  	// Wait for minInterval to pass
   408  	timer.advance(time.Second) // rel=1000ms
   409  	waitForNothing("minInterval, nothing queued", t, timer, obj)
   410  
   411  	// Now do another run
   412  	runner.Run()
   413  	waitForRun("third run", t, timer, obj)
   414  
   415  	// Retry was cancelled because we already ran
   416  	timer.advance(4 * time.Second)
   417  	waitForNothing("retry cancelled", t, timer, obj)
   418  
   419  	// Run, queue a retry from a goroutine
   420  	obj.setRetryFn(func() {
   421  		go func() {
   422  			time.Sleep(100 * time.Millisecond)
   423  			runner.RetryAfter(5 * time.Second)
   424  		}()
   425  	})
   426  	runner.Run()
   427  	waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
   428  
   429  	// Call Run again before minInterval passes
   430  	timer.advance(100 * time.Millisecond) // rel=100ms
   431  	runner.Run()
   432  	waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
   433  
   434  	// Deferred run will run after minInterval passes
   435  	timer.advance(900 * time.Millisecond) // rel=1000ms
   436  	waitForRun("fifth run", t, timer, obj)
   437  
   438  	// Retry was cancelled because we already ran
   439  	timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
   440  	waitForNothing("retry cancelled", t, timer, obj)
   441  
   442  	// Rerun happens after maxInterval
   443  	timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
   444  	waitForNothing("premature", t, timer, obj)
   445  	timer.advance(time.Second) // rel=10s since run
   446  	waitForRun("maxInterval", t, timer, obj)
   447  
   448  	// Clean up.
   449  	stop <- struct{}{}
   450  	// a message is sent to time.updated in func Stop() at the end of the child goroutine
   451  	// to terminate the child, a receive on time.updated is needed here
   452  	<-timer.updated
   453  }
   454  

View as plain text