     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package async
    19  import (
    20  	"sync"
    21  	"testing"
    22  	"time"
    23  )
    25  // Track calls to the managed function.
    26  type receiver struct {
    27  	lock    sync.Mutex
    28  	run     bool
    29  	retryFn func()
    30  }
    32  func (r *receiver) F() {
    33  	r.lock.Lock()
    34  	defer r.lock.Unlock()
    35  	r.run = true
    37  	if r.retryFn != nil {
    38  		r.retryFn()
    39  		r.retryFn = nil
    40  	}
    41  }
    43  func (r *receiver) reset() bool {
    44  	r.lock.Lock()
    45  	defer r.lock.Unlock()
    46  	was := r.run
    47  	r.run = false
    48  	return was
    49  }
    51  func (r *receiver) setRetryFn(retryFn func()) {
    52  	r.lock.Lock()
    53  	defer r.lock.Unlock()
    54  	r.retryFn = retryFn
    55  }
    57  // A single change event in the fake timer.
    58  type timerUpdate struct {
    59  	active bool
    60  	next   time.Duration // iff active == true
    61  }
    63  // Fake time.
    64  type fakeTimer struct {
    65  	c chan time.Time
    67  	lock    sync.Mutex
    68  	now     time.Time
    69  	timeout time.Time
    70  	active  bool
    72  	updated chan timerUpdate
    73  }
    75  func newFakeTimer() *fakeTimer {
    76  	ft := &fakeTimer{
    77  		now:     time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
    78  		c:       make(chan time.Time),
    79  		updated: make(chan timerUpdate),
    80  	}
    81  	return ft
    82  }
    84  func (ft *fakeTimer) C() <-chan time.Time {
    85  	return ft.c
    86  }
    88  func (ft *fakeTimer) Reset(in time.Duration) bool {
    89  	ft.lock.Lock()
    90  	defer ft.lock.Unlock()
    92  	was := ft.active
    93  	ft.active = true
    94  	ft.timeout = ft.now.Add(in)
    95  	ft.updated <- timerUpdate{
    96  		active: true,
    97  		next:   in,
    98  	}
    99  	return was
   100  }
   102  func (ft *fakeTimer) Stop() bool {
   103  	ft.lock.Lock()
   104  	defer ft.lock.Unlock()
   106  	was := ft.active
   107  	ft.active = false
   108  	ft.updated <- timerUpdate{
   109  		active: false,
   110  	}
   111  	return was
   112  }
   114  func (ft *fakeTimer) Now() time.Time {
   115  	ft.lock.Lock()
   116  	defer ft.lock.Unlock()
   118  	return ft.now
   119  }
   121  func (ft *fakeTimer) Remaining() time.Duration {
   122  	ft.lock.Lock()
   123  	defer ft.lock.Unlock()
   125  	return ft.timeout.Sub(ft.now)
   126  }
   128  func (ft *fakeTimer) Since(t time.Time) time.Duration {
   129  	ft.lock.Lock()
   130  	defer ft.lock.Unlock()
   132  	return ft.now.Sub(t)
   133  }
   135  func (ft *fakeTimer) Sleep(d time.Duration) {
   136  	// ft.advance grabs ft.lock
   137  	ft.advance(d)
   138  }
   140  // advance the current time.
   141  func (ft *fakeTimer) advance(d time.Duration) {
   142  	ft.lock.Lock()
   143  	defer ft.lock.Unlock()
   145  	ft.now = ft.now.Add(d)
   146  	if ft.active && !ft.now.Before(ft.timeout) {
   147  		ft.active = false
   148  		ft.c <- ft.timeout
   149  	}
   150  }
   152  // return the calling line number (for printing)
   153  // test the timer's state
   154  func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
   155  	if upd.active != active {
   156  		t.Fatalf("%s: expected timer active=%v", name, active)
   157  	}
   158  	if active && upd.next != next {
   159  		t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
   160  	}
   161  }
   163  // test and reset the receiver's state
   164  func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
   165  	triggered := receiver.reset()
   166  	if expected && !triggered {
   167  		t.Fatalf("%s: function should have been called", name)
   168  	} else if !expected && triggered {
   169  		t.Fatalf("%s: function should not have been called", name)
   170  	}
   171  }
   173  // Durations embedded in test cases depend on these.
   174  var minInterval = 1 * time.Second
   175  var maxInterval = 10 * time.Second
   177  func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
   178  	upd := <-timer.updated // wait for stop
   179  	checkReceiver(name, t, obj, expectCall)
   180  	checkReceiver(name, t, obj, false) // prove post-condition
   181  	checkTimer(name, t, upd, false, 0)
   182  	upd = <-timer.updated // wait for reset
   183  	checkTimer(name, t, upd, true, expectNext)
   184  }
   186  func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
   187  	waitForReset(name, t, timer, obj, true, maxInterval)
   188  }
   190  func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
   191  	// It will first get reset as with a normal run, and then get set again
   192  	waitForRun(name, t, timer, obj)
   193  	waitForReset(name, t, timer, obj, false, expectNext)
   194  }
   196  func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
   197  	waitForReset(name, t, timer, obj, false, expectNext)
   198  }
   200  func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
   201  	select {
   202  	case <-timer.c:
   203  		t.Fatalf("%s: unexpected timer tick", name)
   204  	case upd := <-timer.updated:
   205  		t.Fatalf("%s: unexpected timer update %v", name, upd)
   206  	default:
   207  	}
   208  	checkReceiver(name, t, obj, false)
   209  }
   211  func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
   212  	obj := &receiver{}
   213  	timer := newFakeTimer()
   214  	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
   215  	stop := make(chan struct{})
   217  	var upd timerUpdate
   219  	// Start.
   220  	go runner.Loop(stop)
   221  	upd = <-timer.updated // wait for initial time to be set to max
   222  	checkTimer("init", t, upd, true, maxInterval)
   223  	checkReceiver("init", t, obj, false)
   225  	// Run once, immediately.
   226  	// rel=0ms
   227  	runner.Run()
   228  	waitForRun("first run", t, timer, obj)
   230  	// Run again, before minInterval expires.
   231  	timer.advance(500 * time.Millisecond) // rel=500ms
   232  	runner.Run()
   233  	waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
   235  	// Run again, before minInterval expires.
   236  	timer.advance(499 * time.Millisecond) // rel=999ms
   237  	runner.Run()
   238  	waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
   240  	// Do the deferred run
   241  	timer.advance(1 * time.Millisecond) // rel=1000ms
   242  	waitForRun("second run", t, timer, obj)
   244  	// Try again immediately
   245  	runner.Run()
   246  	waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
   248  	// Run again, before minInterval expires.
   249  	timer.advance(1 * time.Millisecond) // rel=1ms
   250  	runner.Run()
   251  	waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
   253  	// Ensure that we don't run again early
   254  	timer.advance(998 * time.Millisecond) // rel=999ms
   255  	waitForNothing("premature", t, timer, obj)
   257  	// Do the deferred run
   258  	timer.advance(1 * time.Millisecond) // rel=1000ms
   259  	waitForRun("third run", t, timer, obj)
   261  	// Let minInterval pass, but there are no runs queued
   262  	timer.advance(1 * time.Second) // rel=1000ms
   263  	waitForNothing("minInterval", t, timer, obj)
   265  	// Let maxInterval pass
   266  	timer.advance(9 * time.Second) // rel=10000ms
   267  	waitForRun("maxInterval", t, timer, obj)
   269  	// Run again, before minInterval expires.
   270  	timer.advance(1 * time.Millisecond) // rel=1ms
   271  	runner.Run()
   272  	waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
   274  	// Let minInterval pass
   275  	timer.advance(999 * time.Millisecond) // rel=1000ms
   276  	waitForRun("fifth run", t, timer, obj)
   278  	// Clean up.
   279  	stop <- struct{}{}
   280  	// a message is sent to time.updated in func Stop() at the end of the child goroutine
   281  	// to terminate the child, a receive on time.updated is needed here
   282  	<-timer.updated
   283  }
   285  func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
   286  	obj := &receiver{}
   287  	timer := newFakeTimer()
   288  	runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
   289  	stop := make(chan struct{})
   291  	var upd timerUpdate
   293  	// Start.
   294  	go runner.Loop(stop)
   295  	upd = <-timer.updated // wait for initial time to be set to max
   296  	checkTimer("init", t, upd, true, maxInterval)
   297  	checkReceiver("init", t, obj, false)
   299  	// Run once, immediately.
   300  	// abs=0ms, rel=0ms
   301  	runner.Run()
   302  	waitForRun("first run", t, timer, obj)
   304  	// Run again, before minInterval expires, with burst.
   305  	timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
   306  	runner.Run()
   307  	waitForRun("second run", t, timer, obj)
   309  	// Run again, before minInterval expires.
   310  	timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
   311  	runner.Run()
   312  	waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
   314  	// Run again, before minInterval expires.
   315  	timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
   316  	runner.Run()
   317  	waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
   319  	// Run again, before minInterval expires.
   320  	timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
   321  	runner.Run()
   322  	waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
   324  	// Advance timer enough to replenish bursts, but not enough to be minInterval
   325  	// after the last run
   326  	timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
   327  	waitForNothing("not minInterval", t, timer, obj)
   328  	runner.Run()
   329  	waitForRun("third run", t, timer, obj)
   331  	// Run again, before minInterval expires.
   332  	timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
   333  	runner.Run()
   334  	waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
   336  	// Run again, before minInterval expires.
   337  	timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
   338  	runner.Run()
   339  	waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
   341  	// Advance and do the deferred run
   342  	timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
   343  	waitForRun("fourth run", t, timer, obj)
   345  	// Run again, once burst has fully replenished.
   346  	timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
   347  	runner.Run()
   348  	waitForRun("fifth run", t, timer, obj)
   349  	runner.Run()
   350  	waitForRun("sixth run", t, timer, obj)
   351  	runner.Run()
   352  	waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
   354  	// Wait until minInterval after the last run
   355  	timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
   356  	waitForRun("seventh run", t, timer, obj)
   358  	// Wait for maxInterval
   359  	timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
   360  	waitForRun("maxInterval", t, timer, obj)
   362  	// Clean up.
   363  	stop <- struct{}{}
   364  	// a message is sent to time.updated in func Stop() at the end of the child goroutine
   365  	// to terminate the child, a receive on time.updated is needed here
   366  	<-timer.updated
   367  }
   369  func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
   370  	obj := &receiver{}
   371  	timer := newFakeTimer()
   372  	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
   373  	stop := make(chan struct{})
   375  	var upd timerUpdate
   377  	// Start.
   378  	go runner.Loop(stop)
   379  	upd = <-timer.updated // wait for initial time to be set to max
   380  	checkTimer("init", t, upd, true, maxInterval)
   381  	checkReceiver("init", t, obj, false)
   383  	// Run once, immediately, and queue a retry
   384  	// rel=0ms
   385  	obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
   386  	runner.Run()
   387  	waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
   389  	// Nothing happens...
   390  	timer.advance(time.Second) // rel=1000ms
   391  	waitForNothing("minInterval, nothing queued", t, timer, obj)
   393  	// After retryInterval, function is called
   394  	timer.advance(4 * time.Second) // rel=5000ms
   395  	waitForRun("retry", t, timer, obj)
   397  	// Run again, before minInterval expires.
   398  	timer.advance(499 * time.Millisecond) // rel=499ms
   399  	runner.Run()
   400  	waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
   402  	// Do the deferred run, queue another retry after it returns
   403  	timer.advance(501 * time.Millisecond) // rel=1000ms
   404  	runner.RetryAfter(5 * time.Second)
   405  	waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
   407  	// Wait for minInterval to pass
   408  	timer.advance(time.Second) // rel=1000ms
   409  	waitForNothing("minInterval, nothing queued", t, timer, obj)
   411  	// Now do another run
   412  	runner.Run()
   413  	waitForRun("third run", t, timer, obj)
   415  	// Retry was cancelled because we already ran
   416  	timer.advance(4 * time.Second)
   417  	waitForNothing("retry cancelled", t, timer, obj)
   419  	// Run, queue a retry from a goroutine
   420  	obj.setRetryFn(func() {
   421  		go func() {
   422  			time.Sleep(100 * time.Millisecond)
   423  			runner.RetryAfter(5 * time.Second)
   424  		}()
   425  	})
   426  	runner.Run()
   427  	waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
   429  	// Call Run again before minInterval passes
   430  	timer.advance(100 * time.Millisecond) // rel=100ms
   431  	runner.Run()
   432  	waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
   434  	// Deferred run will run after minInterval passes
   435  	timer.advance(900 * time.Millisecond) // rel=1000ms
   436  	waitForRun("fifth run", t, timer, obj)
   438  	// Retry was cancelled because we already ran
   439  	timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
   440  	waitForNothing("retry cancelled", t, timer, obj)
   442  	// Rerun happens after maxInterval
   443  	timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
   444  	waitForNothing("premature", t, timer, obj)
   445  	timer.advance(time.Second) // rel=10s since run
   446  	waitForRun("maxInterval", t, timer, obj)
   448  	// Clean up.
   449  	stop <- struct{}{}
   450  	// a message is sent to time.updated in func Stop() at the end of the child goroutine
   451  	// to terminate the child, a receive on time.updated is needed here
   452  	<-timer.updated
   453  }

