...

Source file src/github.com/datawire/ambassador/v2/pkg/ambex/ratelimit.go

Documentation: github.com/datawire/ambassador/v2/pkg/ambex

     1  package ambex
     2  
     3  import (
     4  	"context"
     5  	"os"
     6  	"strconv"
     7  	"time"
     8  
     9  	"github.com/datawire/ambassador/v2/pkg/debug"
    10  	"github.com/datawire/dlib/dlog"
    11  )
    12  
    13  // An Update encapsulates everything needed to perform an update (of envoy configuration). The
    14  // version string is for logging purposes, the Updator func does the actual work of updating.
    15  type Update struct {
    16  	Version string
    17  	Update  func() error
    18  }
    19  
    20  // Function type for fetching memory usage as a percentage.
    21  type MemoryGetter func() int
    22  
    23  // The Updator function will run forever (or until the ctx is canceled) and look for updates on the
    24  // incoming channel. If memory usage is constrained as reported by the getUsage function, updates
    25  // will be rate limited to guarantee that there are only so many stale configs in memory at a
    26  // time. The function assumes updates are cumulative and it will drop old queued updates if a new
    27  // update arrives.
    28  func Updater(ctx context.Context, updates <-chan Update, getUsage MemoryGetter) error {
    29  	drainTime := GetAmbassadorDrainTime(ctx)
    30  	ticker := time.NewTicker(drainTime)
    31  	defer ticker.Stop()
    32  	return updaterWithTicker(ctx, updates, getUsage, drainTime, ticker, time.Now)
    33  }
    34  
    35  type debugInfo struct {
    36  	Times              []time.Time `json:"times"`
    37  	StaleCount         int         `json:"staleCount"`
    38  	StaleMax           int         `json:"staleMax"`
    39  	Synced             bool        `json:"synced"`
    40  	DisableRatelimiter bool        `json:"disableRatelimiter"`
    41  }
    42  
    43  func updaterWithTicker(ctx context.Context, updates <-chan Update, getUsage MemoryGetter,
    44  	drainTime time.Duration, ticker *time.Ticker, clock func() time.Time) error {
    45  
    46  	dbg := debug.FromContext(ctx)
    47  	info := dbg.Value("envoyReconfigs")
    48  
    49  	// Is the rate-limiter meant to be active at all?
    50  	disableRatelimiter, err := strconv.ParseBool(os.Getenv("AMBASSADOR_AMBEX_NO_RATELIMIT"))
    51  
    52  	if err != nil {
    53  		disableRatelimiter = false
    54  	}
    55  
    56  	if disableRatelimiter {
    57  		dlog.Info(ctx, "snapshot ratelimiter DISABLED")
    58  	}
    59  
    60  	// This slice holds the times of any updates we have made. This lets us compute how many stale
    61  	// configs are being held in memory since we can filter this list down to just those times that
    62  	// are between now - drain-time and now, i.e. we keep only the events that are more recent than
    63  	// drain-time ago.
    64  	updateTimes := []time.Time{}
    65  
    66  	// This variable holds the most recent desired configuration.
    67  	var latest Update
    68  	gotFirst := false
    69  	pushed := false
    70  	for {
    71  		// The basic idea here is that we wakeup whenever we either a) get a new snapshot to update,
    72  		// or b) the timer ticks. In case a) we update the "latest" variable so that it always holds
    73  		// the most recent desired Update. In either case, we filter the list of updateTimes so we
    74  		// know exactly how many updates are in memory, and then based on that we decide whether we
    75  		// can do another reconfig or whether we should wait until the next (tick|update) whichever
    76  		// happens first.
    77  
    78  		var now time.Time
    79  		tick := false
    80  		select {
    81  		case up := <-updates:
    82  			latest = up
    83  			pushed = false
    84  			gotFirst = true
    85  			now = clock()
    86  		case now = <-ticker.C:
    87  			if pushed {
    88  				continue
    89  			}
    90  			tick = true
    91  		case <-ctx.Done():
    92  			return nil
    93  		}
    94  
    95  		// Remove updates that were longer than drain-time ago
    96  		updateTimes = gcUpdateTimes(updateTimes, now, drainTime)
    97  
    98  		usagePercent := getUsage()
    99  
   100  		if disableRatelimiter {
   101  			usagePercent = 0
   102  		}
   103  
   104  		var maxStaleReconfigs int
   105  		switch {
   106  		case usagePercent >= 90:
   107  			// With the default 10 minute drain time this works out to an average of one reconfig
   108  			// every 10 minutes. This will guarantee the minimum possible memory usage due to stale
   109  			// configs.
   110  			maxStaleReconfigs = 1
   111  		case usagePercent >= 80:
   112  			// With the default 10 minute drain time this works out to one reconfig every 40
   113  			// seconds on average within the window. (They could all happen in one burst.)
   114  			maxStaleReconfigs = 15
   115  		case usagePercent >= 70:
   116  			// With the default 10 minute drain time this works out to one reconfig every 20
   117  			// seconds on average within the window. (They could all happen in one burst.)
   118  			maxStaleReconfigs = 30
   119  		case usagePercent >= 60:
   120  			// With the default 10 minute drain time this works out to one reconfig every 10
   121  			// seconds on average within the window. (They could all happen in one burst.)
   122  			maxStaleReconfigs = 60
   123  		case usagePercent >= 50:
   124  			// With the default 10 minute drain time this works out to one reconfig every 5 seconds
   125  			// on average within the window. (They could all happen in one burst.)
   126  			maxStaleReconfigs = 120
   127  		default:
   128  			// Zero means no limit. This is what we want by default when memory usage is in the 0 to
   129  			// 50 range.
   130  			maxStaleReconfigs = 0
   131  		}
   132  
   133  		staleReconfigs := len(updateTimes)
   134  
   135  		info.Store(debugInfo{updateTimes, staleReconfigs, maxStaleReconfigs, pushed, disableRatelimiter})
   136  
   137  		// Decide if we have enough capacity left to perform a reconfig.
   138  		if maxStaleReconfigs > 0 && staleReconfigs >= maxStaleReconfigs {
   139  			if !tick {
   140  				dlog.Warnf(ctx, "Memory Usage: throttling reconfig %+v due to constrained memory with %d stale reconfigs (%d max)",
   141  					latest.Version, staleReconfigs, maxStaleReconfigs)
   142  			}
   143  			continue
   144  		}
   145  
   146  		// This is just in case we get a timer tick before the first update actually arrives.
   147  		if !gotFirst {
   148  			continue
   149  		}
   150  
   151  		// This is going to do the actual work of pushing an update.
   152  		err := latest.Update()
   153  		if err != nil {
   154  			return err
   155  		}
   156  
   157  		// Since we just pushed an update, we add the current time to the set of update times.
   158  		updateTimes = append(updateTimes, now)
   159  		dlog.Infof(ctx, "Pushing snapshot %+v", latest.Version)
   160  		pushed = true
   161  
   162  		info.Store(debugInfo{updateTimes, staleReconfigs, maxStaleReconfigs, pushed, disableRatelimiter})
   163  	}
   164  }
   165  
   166  // The gcUpdateTimes function filters out timestamps that should have drained by now.
   167  func gcUpdateTimes(updateTimes []time.Time, now time.Time, drainTime time.Duration) []time.Time {
   168  	result := []time.Time{}
   169  	for _, ut := range updateTimes {
   170  		if ut.Add(drainTime).After(now) {
   171  			result = append(result, ut)
   172  		}
   173  	}
   174  	return result
   175  }
   176  
   177  // The GetAmbassadorDrainTime function retuns the AMBASSADOR_DRAIN_TIME env var as a time.Duration
   178  func GetAmbassadorDrainTime(ctx context.Context) time.Duration {
   179  	s := os.Getenv("AMBASSADOR_DRAIN_TIME")
   180  	if s == "" {
   181  		s = "600"
   182  	}
   183  	i, err := strconv.Atoi(s)
   184  	if err != nil {
   185  		dlog.Printf(ctx, "Error parsing AMBASSADOR_DRAIN_TIME: %v", err)
   186  		i = 600
   187  	}
   188  
   189  	return time.Duration(i) * time.Second
   190  }
   191  

View as plain text