...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/ambex/ratelimit_test.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/ambex

     1  package ambex
     2  
     3  import (
     4  	"fmt"
     5  	"sync"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/stretchr/testify/assert"
    10  
    11  	"github.com/datawire/dlib/dlog"
    12  )
    13  
    14  type harness struct {
    15  	T               *testing.T
    16  	C               chan time.Time
    17  	version         int         // for generating versions
    18  	updates         chan Update // we push versions here
    19  	pushed          chan int    // versions that are pushed end up here
    20  	expectedVersion int
    21  
    22  	mutex sync.Mutex // to proect the clock and usage
    23  	usage int        // simulated memory usage
    24  	clock time.Time  // current simulated time
    25  }
    26  
    27  var drainTime = 10 * time.Minute
    28  
    29  func newHarness(t *testing.T) *harness {
    30  	C := make(chan time.Time)
    31  	h := &harness{t, C, 0, make(chan Update), make(chan int, 10000), 1, sync.Mutex{}, 0, time.Now()}
    32  	go func() {
    33  		assert.NoError(t, updaterWithTicker(dlog.NewTestContext(t, false), h.updates, h.getUsage, drainTime, &time.Ticker{C: C}, h.time))
    34  	}()
    35  	return h
    36  }
    37  
    38  func (h *harness) advance(d time.Duration) time.Time {
    39  	h.mutex.Lock()
    40  	result := h.clock.Add(d)
    41  	h.clock = result
    42  	h.mutex.Unlock()
    43  	return result
    44  }
    45  
    46  func (h *harness) time() time.Time {
    47  	return h.advance(0)
    48  }
    49  
    50  func (h *harness) getUsage() int {
    51  	h.mutex.Lock()
    52  	defer h.mutex.Unlock()
    53  	return h.usage
    54  }
    55  
    56  func (h *harness) setUsage(u int) {
    57  	h.mutex.Lock()
    58  	defer h.mutex.Unlock()
    59  	h.usage = u
    60  }
    61  
    62  // Simulate a timer tick after the given duration.
    63  func (h *harness) tick(d time.Duration) {
    64  	h.C <- h.advance(d)
    65  }
    66  
    67  // Simulate an update with the specified version after the specified duration.
    68  func (h *harness) update(d time.Duration) int {
    69  	h.version++
    70  	version := h.version
    71  	h.advance(d)
    72  	h.updates <- Update{fmt.Sprintf("%d", version), func() error {
    73  		h.pushed <- version
    74  		return nil
    75  	}}
    76  	return version
    77  }
    78  
    79  // Assert that a contiguous sequences of updates were pushed up to and including the specified
    80  // version.
    81  func (h *harness) expectUntil(version int) {
    82  	for {
    83  		select {
    84  		case v := <-h.pushed:
    85  			assert.Equal(h.T, h.expectedVersion, v)
    86  			if v < version {
    87  				h.expectedVersion++
    88  				continue
    89  			}
    90  
    91  			if v != version {
    92  				assert.Fail(h.T, fmt.Sprintf("expected version %d, got %d", version, v))
    93  			}
    94  			return
    95  		case <-time.After(1 * time.Second):
    96  			assert.Fail(h.T, fmt.Sprintf("expected version %d, but timed out", h.expectedVersion))
    97  			return
    98  		}
    99  	}
   100  
   101  }
   102  
   103  // Assert that the next version is exactly the supplied version.
   104  func (h *harness) expectExact(version int) {
   105  	select {
   106  	case v := <-h.pushed:
   107  		assert.Equal(h.T, version, v)
   108  		h.expectedVersion = version + 1
   109  	case <-time.After(1 * time.Second):
   110  		assert.Fail(h.T, fmt.Sprintf("expected version %d, but timed out", version))
   111  		return
   112  	}
   113  }
   114  
   115  // Assert that no version was pushed.
   116  func (h *harness) expectNone() {
   117  	select {
   118  	case v := <-h.pushed:
   119  		assert.Fail(h.T, fmt.Sprintf("expected no pushes, but go version %d", v))
   120  	case <-time.After(1 * time.Second):
   121  		return
   122  	}
   123  }
   124  
   125  // Check to see when memory usage is zero we don't throttle at all.
   126  func TestHappyPath(t *testing.T) {
   127  	h := newHarness(t)
   128  	var version int
   129  	for i := 0; i < 1000; i++ {
   130  		version = h.update(0)
   131  	}
   132  	h.expectUntil(version)
   133  }
   134  
   135  // Progress through the various levels of constraint and check that the correct updates are dropped.
   136  func TestConstrained(t *testing.T) {
   137  	h := newHarness(t)
   138  
   139  	h.setUsage(50)
   140  	for i := 0; i < 1000; i++ {
   141  		h.update(0)
   142  	}
   143  	// Above 50% memory usage we only allow for 120 stale configs in memory at a time. Our harness
   144  	// versions start counting at 1, so we should get up to version 120 before getting throttled.
   145  	h.expectUntil(120)
   146  	// Fast forward by drainTime and check that the most recent update made it eventually.
   147  	h.tick(drainTime)
   148  	h.expectExact(1000)
   149  
   150  	h.setUsage(60)
   151  	for i := 0; i < 1000; i++ {
   152  		h.update(0)
   153  	}
   154  	// Above 60% memory usage we only allow for 60 stale configs in memory at a time.
   155  	h.expectUntil(1059)
   156  	// Fast forward by drainTime and check that the most recent update made it eventually.
   157  	h.tick(drainTime)
   158  	h.expectExact(2000)
   159  
   160  	h.setUsage(70)
   161  	for i := 0; i < 1000; i++ {
   162  		h.update(0)
   163  	}
   164  	// Above 70% memory usage we only allow for 30 stale configs in memory at a time.
   165  	h.expectUntil(2029)
   166  	// Fast forward by drainTime and check that the most recent update made it eventually.
   167  	h.tick(drainTime)
   168  	h.expectExact(3000)
   169  
   170  	h.setUsage(80)
   171  	for i := 0; i < 1000; i++ {
   172  		h.update(0)
   173  	}
   174  	// Above 80% memory usage we only allow for 15 stale configs in memory at a time.
   175  	h.expectUntil(3014)
   176  	// Fast forward by drainTime and check that the most recent update made it eventually.
   177  	h.tick(drainTime)
   178  	h.expectExact(4000)
   179  
   180  	h.setUsage(90)
   181  	for i := 0; i < 1000; i++ {
   182  		h.update(0)
   183  	}
   184  	// Above 90% memory usage we only allow for 1 stale config in memory at a time.
   185  	h.expectNone()
   186  	// Fast forward by drainTime and check that the most recent update made it eventually.
   187  	h.tick(drainTime)
   188  	h.expectExact(5000)
   189  
   190  	// Check that we go back to passing through everything when usage drops again.
   191  	h.setUsage(25)
   192  	for i := 0; i < 1000; i++ {
   193  		h.update(0)
   194  	}
   195  	h.expectUntil(6000)
   196  }
   197  

View as plain text