...

Source file src/google.golang.org/api/support/bundler/bundler.go

Documentation: google.golang.org/api/support/bundler

     1  // Copyright 2016 Google LLC.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package bundler supports bundling (batching) of items. Bundling amortizes an
     6  // action with fixed costs over multiple items. For example, if an API provides
     7  // an RPC that accepts a list of items as input, but clients would prefer
     8  // adding items one at a time, then a Bundler can accept individual items from
     9  // the client and bundle many of them into a single RPC.
    10  //
    11  // This package is experimental and subject to change without notice.
    12  package bundler
    13  
    14  import (
    15  	"context"
    16  	"errors"
    17  	"reflect"
    18  	"sync"
    19  	"time"
    20  
    21  	"golang.org/x/sync/semaphore"
    22  )
    23  
    24  type mode int
    25  
    26  const (
    27  	DefaultDelayThreshold       = time.Second
    28  	DefaultBundleCountThreshold = 10
    29  	DefaultBundleByteThreshold  = 1e6 // 1M
    30  	DefaultBufferedByteLimit    = 1e9 // 1G
    31  )
    32  
    33  const (
    34  	none mode = iota
    35  	add
    36  	addWait
    37  )
    38  
    39  var (
    40  	// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
    41  	ErrOverflow = errors.New("bundler reached buffered byte limit")
    42  
    43  	// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
    44  	ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
    45  
    46  	// errMixedMethods indicates that mutually exclusive methods has been
    47  	// called subsequently.
    48  	errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed")
    49  )
    50  
    51  // A Bundler collects items added to it into a bundle until the bundle
    52  // exceeds a given size, then calls a user-provided function to handle the
    53  // bundle.
    54  //
    55  // The exported fields are only safe to modify prior to the first call to Add
    56  // or AddWait.
    57  type Bundler struct {
    58  	// Starting from the time that the first message is added to a bundle, once
    59  	// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
    60  	DelayThreshold time.Duration
    61  
    62  	// Once a bundle has this many items, handle the bundle. Since only one
    63  	// item at a time is added to a bundle, no bundle will exceed this
    64  	// threshold, so it also serves as a limit. The default is
    65  	// DefaultBundleCountThreshold.
    66  	BundleCountThreshold int
    67  
    68  	// Once the number of bytes in current bundle reaches this threshold, handle
    69  	// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
    70  	// but does not cap the total size of a bundle.
    71  	BundleByteThreshold int
    72  
    73  	// The maximum size of a bundle, in bytes. Zero means unlimited.
    74  	BundleByteLimit int
    75  
    76  	// The maximum number of bytes that the Bundler will keep in memory before
    77  	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
    78  	BufferedByteLimit int
    79  
    80  	// The maximum number of handler invocations that can be running at once.
    81  	// The default is 1.
    82  	HandlerLimit int
    83  
    84  	handler       func(interface{}) // called to handle a bundle
    85  	itemSliceZero reflect.Value     // nil (zero value) for slice of items
    86  
    87  	mu           sync.Mutex          // guards access to fields below
    88  	flushTimer   *time.Timer         // implements DelayThreshold
    89  	handlerCount int                 // # of bundles currently being handled (i.e. handler is invoked on them)
    90  	sem          *semaphore.Weighted // enforces BufferedByteLimit
    91  	semOnce      sync.Once           // guards semaphore initialization
    92  	// The current bundle we're adding items to. Not yet in the queue.
    93  	// Appended to the queue once the flushTimer fires or the bundle
    94  	// thresholds/limits are reached. If curBundle is nil and tail is
    95  	// not, we first try to add items to tail. Once tail is full or handled,
    96  	// we create a new curBundle for the incoming item.
    97  	curBundle *bundle
    98  	// The next bundle in the queue to be handled. Nil if the queue is
    99  	// empty.
   100  	head *bundle
   101  	// The last bundle in the queue to be handled. Nil if the queue is
   102  	// empty. If curBundle is nil and tail isn't, we attempt to add new
   103  	// items to the tail until if becomes full or has been passed to the
   104  	// handler.
   105  	tail      *bundle
   106  	curFlush  *sync.WaitGroup // counts outstanding bundles since last flush
   107  	prevFlush chan bool       // signal used to wait for prior flush
   108  
   109  	// The first call to Add or AddWait, mode will be add or addWait respectively.
   110  	// If there wasn't call yet then mode is none.
   111  	mode mode
   112  	// TODO: consider alternative queue implementation for head/tail bundle. see:
   113  	// https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
   114  }
   115  
   116  // A bundle is a group of items that were added individually and will be passed
   117  // to a handler as a slice.
   118  type bundle struct {
   119  	items reflect.Value   // slice of T
   120  	size  int             // size in bytes of all items
   121  	next  *bundle         // bundles are handled in order as a linked list queue
   122  	flush *sync.WaitGroup // the counter that tracks flush completion
   123  }
   124  
   125  // add appends item to this bundle and increments the total size. It requires
   126  // that b.mu is locked.
   127  func (bu *bundle) add(item interface{}, size int) {
   128  	bu.items = reflect.Append(bu.items, reflect.ValueOf(item))
   129  	bu.size += size
   130  }
   131  
   132  // NewBundler creates a new Bundler.
   133  //
   134  // itemExample is a value of the type that will be bundled. For example, if you
   135  // want to create bundles of *Entry, you could pass &Entry{} for itemExample.
   136  //
   137  // handler is a function that will be called on each bundle. If itemExample is
   138  // of type T, the argument to handler is of type []T. handler is always called
   139  // sequentially for each bundle, and never in parallel.
   140  //
   141  // Configure the Bundler by setting its thresholds and limits before calling
   142  // any of its methods.
   143  func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
   144  	b := &Bundler{
   145  		DelayThreshold:       DefaultDelayThreshold,
   146  		BundleCountThreshold: DefaultBundleCountThreshold,
   147  		BundleByteThreshold:  DefaultBundleByteThreshold,
   148  		BufferedByteLimit:    DefaultBufferedByteLimit,
   149  		HandlerLimit:         1,
   150  
   151  		handler:       handler,
   152  		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
   153  		curFlush:      &sync.WaitGroup{},
   154  	}
   155  	return b
   156  }
   157  
   158  func (b *Bundler) initSemaphores() {
   159  	// Create the semaphores lazily, because the user may set limits
   160  	// after NewBundler.
   161  	b.semOnce.Do(func() {
   162  		b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
   163  	})
   164  }
   165  
   166  // enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
   167  // handled immediately if we are below HandlerLimit. It requires that b.mu is
   168  // locked.
   169  func (b *Bundler) enqueueCurBundle() {
   170  	// We don't require callers to check if there is a pending bundle. It
   171  	// may have already been appended to the queue. If so, return early.
   172  	if b.curBundle == nil {
   173  		return
   174  	}
   175  	// If we are below the HandlerLimit, the queue must be empty. Handle
   176  	// immediately with a new goroutine.
   177  	if b.handlerCount < b.HandlerLimit {
   178  		b.handlerCount++
   179  		go b.handle(b.curBundle)
   180  	} else if b.tail != nil {
   181  		// There are bundles on the queue, so append to the end
   182  		b.tail.next = b.curBundle
   183  		b.tail = b.curBundle
   184  	} else {
   185  		// The queue is empty, so initialize the queue
   186  		b.head = b.curBundle
   187  		b.tail = b.curBundle
   188  	}
   189  	b.curBundle = nil
   190  	if b.flushTimer != nil {
   191  		b.flushTimer.Stop()
   192  		b.flushTimer = nil
   193  	}
   194  }
   195  
   196  // setMode sets the state of Bundler's mode. If mode was defined before
   197  // and passed state is different from it then return an error.
   198  func (b *Bundler) setMode(m mode) error {
   199  	b.mu.Lock()
   200  	defer b.mu.Unlock()
   201  	if b.mode == m || b.mode == none {
   202  		b.mode = m
   203  		return nil
   204  	}
   205  	return errMixedMethods
   206  }
   207  
   208  // canFit returns true if bu can fit an additional item of size bytes based
   209  // on the limits of Bundler b.
   210  func (b *Bundler) canFit(bu *bundle, size int) bool {
   211  	return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) &&
   212  		(b.BundleCountThreshold <= 0 || bu.items.Len() < b.BundleCountThreshold)
   213  }
   214  
   215  // Add adds item to the current bundle. It marks the bundle for handling and
   216  // starts a new one if any of the thresholds or limits are exceeded.
   217  // The type of item must be assignable to the itemExample parameter of the NewBundler
   218  // method, otherwise there will be a panic.
   219  //
   220  // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
   221  // the item can never be handled. Add returns ErrOversizedItem in this case.
   222  //
   223  // If adding the item would exceed the maximum memory allowed
   224  // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
   225  // memory, Add returns ErrOverflow.
   226  //
   227  // Add never blocks.
   228  func (b *Bundler) Add(item interface{}, size int) error {
   229  	if err := b.setMode(add); err != nil {
   230  		return err
   231  	}
   232  	// If this item exceeds the maximum size of a bundle,
   233  	// we can never send it.
   234  	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
   235  		return ErrOversizedItem
   236  	}
   237  
   238  	// If adding this item would exceed our allotted memory
   239  	// footprint, we can't accept it.
   240  	// (TryAcquire also returns false if anything is waiting on the semaphore,
   241  	// so calls to Add and AddWait shouldn't be mixed.)
   242  	b.initSemaphores()
   243  	if !b.sem.TryAcquire(int64(size)) {
   244  		return ErrOverflow
   245  	}
   246  
   247  	b.mu.Lock()
   248  	defer b.mu.Unlock()
   249  	return b.add(item, size)
   250  }
   251  
   252  // add adds item to the tail of the bundle queue or curBundle depending on space
   253  // and nil-ness (see inline comments). It marks curBundle for handling (by
   254  // appending it to the queue) if any of the thresholds or limits are exceeded.
   255  // curBundle is lazily initialized. It requires that b.mu is locked.
   256  func (b *Bundler) add(item interface{}, size int) error {
   257  	// If we don't have a curBundle, see if we can add to the queue tail.
   258  	if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) {
   259  		b.tail.add(item, size)
   260  		return nil
   261  	}
   262  
   263  	// If we can't fit in the existing curBundle, move it onto the queue.
   264  	if b.curBundle != nil && !b.canFit(b.curBundle, size) {
   265  		b.enqueueCurBundle()
   266  	}
   267  
   268  	// Create a curBundle if we don't have one.
   269  	if b.curBundle == nil {
   270  		b.curFlush.Add(1)
   271  		b.curBundle = &bundle{
   272  			items: b.itemSliceZero,
   273  			flush: b.curFlush,
   274  		}
   275  	}
   276  
   277  	// Add the item.
   278  	b.curBundle.add(item, size)
   279  
   280  	// If curBundle is ready for handling, move it to the queue.
   281  	if b.curBundle.size >= b.BundleByteThreshold ||
   282  		b.curBundle.items.Len() == b.BundleCountThreshold {
   283  		b.enqueueCurBundle()
   284  	}
   285  
   286  	// If we created a new bundle and it wasn't immediately handled, set a timer
   287  	if b.curBundle != nil && b.flushTimer == nil {
   288  		b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles)
   289  	}
   290  
   291  	return nil
   292  }
   293  
   294  // tryHandleBundles is the timer callback that handles or queues any current
   295  // bundle after DelayThreshold time, even if the bundle isn't completely full.
   296  func (b *Bundler) tryHandleBundles() {
   297  	b.mu.Lock()
   298  	b.enqueueCurBundle()
   299  	b.mu.Unlock()
   300  }
   301  
   302  // next returns the next bundle that is ready for handling and removes it from
   303  // the internal queue. It requires that b.mu is locked.
   304  func (b *Bundler) next() *bundle {
   305  	if b.head == nil {
   306  		return nil
   307  	}
   308  	out := b.head
   309  	b.head = b.head.next
   310  	if b.head == nil {
   311  		b.tail = nil
   312  	}
   313  	out.next = nil
   314  	return out
   315  }
   316  
   317  // handle calls the user-specified handler on the given bundle. handle is
   318  // intended to be run as a goroutine. After the handler returns, we update the
   319  // byte total. handle continues processing additional bundles that are ready.
   320  // If no more bundles are ready, the handler count is decremented and the
   321  // goroutine ends.
   322  func (b *Bundler) handle(bu *bundle) {
   323  	for bu != nil {
   324  		b.handler(bu.items.Interface())
   325  		bu = b.postHandle(bu)
   326  	}
   327  }
   328  
   329  func (b *Bundler) postHandle(bu *bundle) *bundle {
   330  	b.mu.Lock()
   331  	defer b.mu.Unlock()
   332  
   333  	b.sem.Release(int64(bu.size))
   334  	bu.flush.Done()
   335  
   336  	bu = b.next()
   337  	if bu == nil {
   338  		b.handlerCount--
   339  	}
   340  	return bu
   341  }
   342  
   343  // AddWait adds item to the current bundle. It marks the bundle for handling and
   344  // starts a new one if any of the thresholds or limits are exceeded.
   345  //
   346  // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
   347  // the item can never be handled. AddWait returns ErrOversizedItem in this case.
   348  //
   349  // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
   350  // AddWait blocks until space is available or ctx is done.
   351  //
   352  // Calls to Add and AddWait should not be mixed on the same Bundler.
   353  func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
   354  	if err := b.setMode(addWait); err != nil {
   355  		return err
   356  	}
   357  	// If this item exceeds the maximum size of a bundle,
   358  	// we can never send it.
   359  	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
   360  		return ErrOversizedItem
   361  	}
   362  	// If adding this item would exceed our allotted memory footprint, block
   363  	// until space is available. The semaphore is FIFO, so there will be no
   364  	// starvation.
   365  	b.initSemaphores()
   366  	if err := b.sem.Acquire(ctx, int64(size)); err != nil {
   367  		return err
   368  	}
   369  
   370  	b.mu.Lock()
   371  	defer b.mu.Unlock()
   372  	return b.add(item, size)
   373  }
   374  
   375  // Flush invokes the handler for all remaining items in the Bundler and waits
   376  // for it to return.
   377  func (b *Bundler) Flush() {
   378  	b.mu.Lock()
   379  
   380  	// If a curBundle is pending, move it to the queue.
   381  	b.enqueueCurBundle()
   382  
   383  	// Store a pointer to the WaitGroup that counts outstanding bundles
   384  	// in the current flush and create a new one to track the next flush.
   385  	wg := b.curFlush
   386  	b.curFlush = &sync.WaitGroup{}
   387  
   388  	// Flush must wait for all prior, outstanding flushes to complete.
   389  	// We use a channel to communicate completion between each flush in
   390  	// the sequence.
   391  	prev := b.prevFlush
   392  	next := make(chan bool)
   393  	b.prevFlush = next
   394  
   395  	b.mu.Unlock()
   396  
   397  	// Wait until the previous flush is finished.
   398  	if prev != nil {
   399  		<-prev
   400  	}
   401  
   402  	// Wait until this flush is finished.
   403  	wg.Wait()
   404  
   405  	// Allow the next flush to finish.
   406  	close(next)
   407  }
   408  

View as plain text