1 // Copyright 2016 Google LLC. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 // Package bundler supports bundling (batching) of items. Bundling amortizes an 6 // action with fixed costs over multiple items. For example, if an API provides 7 // an RPC that accepts a list of items as input, but clients would prefer 8 // adding items one at a time, then a Bundler can accept individual items from 9 // the client and bundle many of them into a single RPC. 10 // 11 // This package is experimental and subject to change without notice. 12 package bundler 13 14 import ( 15 "context" 16 "errors" 17 "reflect" 18 "sync" 19 "time" 20 21 "golang.org/x/sync/semaphore" 22 ) 23 24 type mode int 25 26 const ( 27 DefaultDelayThreshold = time.Second 28 DefaultBundleCountThreshold = 10 29 DefaultBundleByteThreshold = 1e6 // 1M 30 DefaultBufferedByteLimit = 1e9 // 1G 31 ) 32 33 const ( 34 none mode = iota 35 add 36 addWait 37 ) 38 39 var ( 40 // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit. 41 ErrOverflow = errors.New("bundler reached buffered byte limit") 42 43 // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size. 44 ErrOversizedItem = errors.New("item size exceeds bundle byte limit") 45 46 // errMixedMethods indicates that mutually exclusive methods has been 47 // called subsequently. 48 errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed") 49 ) 50 51 // A Bundler collects items added to it into a bundle until the bundle 52 // exceeds a given size, then calls a user-provided function to handle the 53 // bundle. 54 // 55 // The exported fields are only safe to modify prior to the first call to Add 56 // or AddWait. 57 type Bundler struct { 58 // Starting from the time that the first message is added to a bundle, once 59 // this delay has passed, handle the bundle. The default is DefaultDelayThreshold. 60 DelayThreshold time.Duration 61 62 // Once a bundle has this many items, handle the bundle. Since only one 63 // item at a time is added to a bundle, no bundle will exceed this 64 // threshold, so it also serves as a limit. The default is 65 // DefaultBundleCountThreshold. 66 BundleCountThreshold int 67 68 // Once the number of bytes in current bundle reaches this threshold, handle 69 // the bundle. The default is DefaultBundleByteThreshold. This triggers handling, 70 // but does not cap the total size of a bundle. 71 BundleByteThreshold int 72 73 // The maximum size of a bundle, in bytes. Zero means unlimited. 74 BundleByteLimit int 75 76 // The maximum number of bytes that the Bundler will keep in memory before 77 // returning ErrOverflow. The default is DefaultBufferedByteLimit. 78 BufferedByteLimit int 79 80 // The maximum number of handler invocations that can be running at once. 81 // The default is 1. 82 HandlerLimit int 83 84 handler func(interface{}) // called to handle a bundle 85 itemSliceZero reflect.Value // nil (zero value) for slice of items 86 87 mu sync.Mutex // guards access to fields below 88 flushTimer *time.Timer // implements DelayThreshold 89 handlerCount int // # of bundles currently being handled (i.e. handler is invoked on them) 90 sem *semaphore.Weighted // enforces BufferedByteLimit 91 semOnce sync.Once // guards semaphore initialization 92 // The current bundle we're adding items to. Not yet in the queue. 93 // Appended to the queue once the flushTimer fires or the bundle 94 // thresholds/limits are reached. If curBundle is nil and tail is 95 // not, we first try to add items to tail. Once tail is full or handled, 96 // we create a new curBundle for the incoming item. 97 curBundle *bundle 98 // The next bundle in the queue to be handled. Nil if the queue is 99 // empty. 100 head *bundle 101 // The last bundle in the queue to be handled. Nil if the queue is 102 // empty. If curBundle is nil and tail isn't, we attempt to add new 103 // items to the tail until if becomes full or has been passed to the 104 // handler. 105 tail *bundle 106 curFlush *sync.WaitGroup // counts outstanding bundles since last flush 107 prevFlush chan bool // signal used to wait for prior flush 108 109 // The first call to Add or AddWait, mode will be add or addWait respectively. 110 // If there wasn't call yet then mode is none. 111 mode mode 112 // TODO: consider alternative queue implementation for head/tail bundle. see: 113 // https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74 114 } 115 116 // A bundle is a group of items that were added individually and will be passed 117 // to a handler as a slice. 118 type bundle struct { 119 items reflect.Value // slice of T 120 size int // size in bytes of all items 121 next *bundle // bundles are handled in order as a linked list queue 122 flush *sync.WaitGroup // the counter that tracks flush completion 123 } 124 125 // add appends item to this bundle and increments the total size. It requires 126 // that b.mu is locked. 127 func (bu *bundle) add(item interface{}, size int) { 128 bu.items = reflect.Append(bu.items, reflect.ValueOf(item)) 129 bu.size += size 130 } 131 132 // NewBundler creates a new Bundler. 133 // 134 // itemExample is a value of the type that will be bundled. For example, if you 135 // want to create bundles of *Entry, you could pass &Entry{} for itemExample. 136 // 137 // handler is a function that will be called on each bundle. If itemExample is 138 // of type T, the argument to handler is of type []T. handler is always called 139 // sequentially for each bundle, and never in parallel. 140 // 141 // Configure the Bundler by setting its thresholds and limits before calling 142 // any of its methods. 143 func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler { 144 b := &Bundler{ 145 DelayThreshold: DefaultDelayThreshold, 146 BundleCountThreshold: DefaultBundleCountThreshold, 147 BundleByteThreshold: DefaultBundleByteThreshold, 148 BufferedByteLimit: DefaultBufferedByteLimit, 149 HandlerLimit: 1, 150 151 handler: handler, 152 itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))), 153 curFlush: &sync.WaitGroup{}, 154 } 155 return b 156 } 157 158 func (b *Bundler) initSemaphores() { 159 // Create the semaphores lazily, because the user may set limits 160 // after NewBundler. 161 b.semOnce.Do(func() { 162 b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit)) 163 }) 164 } 165 166 // enqueueCurBundle moves curBundle to the end of the queue. The bundle may be 167 // handled immediately if we are below HandlerLimit. It requires that b.mu is 168 // locked. 169 func (b *Bundler) enqueueCurBundle() { 170 // We don't require callers to check if there is a pending bundle. It 171 // may have already been appended to the queue. If so, return early. 172 if b.curBundle == nil { 173 return 174 } 175 // If we are below the HandlerLimit, the queue must be empty. Handle 176 // immediately with a new goroutine. 177 if b.handlerCount < b.HandlerLimit { 178 b.handlerCount++ 179 go b.handle(b.curBundle) 180 } else if b.tail != nil { 181 // There are bundles on the queue, so append to the end 182 b.tail.next = b.curBundle 183 b.tail = b.curBundle 184 } else { 185 // The queue is empty, so initialize the queue 186 b.head = b.curBundle 187 b.tail = b.curBundle 188 } 189 b.curBundle = nil 190 if b.flushTimer != nil { 191 b.flushTimer.Stop() 192 b.flushTimer = nil 193 } 194 } 195 196 // setMode sets the state of Bundler's mode. If mode was defined before 197 // and passed state is different from it then return an error. 198 func (b *Bundler) setMode(m mode) error { 199 b.mu.Lock() 200 defer b.mu.Unlock() 201 if b.mode == m || b.mode == none { 202 b.mode = m 203 return nil 204 } 205 return errMixedMethods 206 } 207 208 // canFit returns true if bu can fit an additional item of size bytes based 209 // on the limits of Bundler b. 210 func (b *Bundler) canFit(bu *bundle, size int) bool { 211 return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) && 212 (b.BundleCountThreshold <= 0 || bu.items.Len() < b.BundleCountThreshold) 213 } 214 215 // Add adds item to the current bundle. It marks the bundle for handling and 216 // starts a new one if any of the thresholds or limits are exceeded. 217 // The type of item must be assignable to the itemExample parameter of the NewBundler 218 // method, otherwise there will be a panic. 219 // 220 // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then 221 // the item can never be handled. Add returns ErrOversizedItem in this case. 222 // 223 // If adding the item would exceed the maximum memory allowed 224 // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for 225 // memory, Add returns ErrOverflow. 226 // 227 // Add never blocks. 228 func (b *Bundler) Add(item interface{}, size int) error { 229 if err := b.setMode(add); err != nil { 230 return err 231 } 232 // If this item exceeds the maximum size of a bundle, 233 // we can never send it. 234 if b.BundleByteLimit > 0 && size > b.BundleByteLimit { 235 return ErrOversizedItem 236 } 237 238 // If adding this item would exceed our allotted memory 239 // footprint, we can't accept it. 240 // (TryAcquire also returns false if anything is waiting on the semaphore, 241 // so calls to Add and AddWait shouldn't be mixed.) 242 b.initSemaphores() 243 if !b.sem.TryAcquire(int64(size)) { 244 return ErrOverflow 245 } 246 247 b.mu.Lock() 248 defer b.mu.Unlock() 249 return b.add(item, size) 250 } 251 252 // add adds item to the tail of the bundle queue or curBundle depending on space 253 // and nil-ness (see inline comments). It marks curBundle for handling (by 254 // appending it to the queue) if any of the thresholds or limits are exceeded. 255 // curBundle is lazily initialized. It requires that b.mu is locked. 256 func (b *Bundler) add(item interface{}, size int) error { 257 // If we don't have a curBundle, see if we can add to the queue tail. 258 if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) { 259 b.tail.add(item, size) 260 return nil 261 } 262 263 // If we can't fit in the existing curBundle, move it onto the queue. 264 if b.curBundle != nil && !b.canFit(b.curBundle, size) { 265 b.enqueueCurBundle() 266 } 267 268 // Create a curBundle if we don't have one. 269 if b.curBundle == nil { 270 b.curFlush.Add(1) 271 b.curBundle = &bundle{ 272 items: b.itemSliceZero, 273 flush: b.curFlush, 274 } 275 } 276 277 // Add the item. 278 b.curBundle.add(item, size) 279 280 // If curBundle is ready for handling, move it to the queue. 281 if b.curBundle.size >= b.BundleByteThreshold || 282 b.curBundle.items.Len() == b.BundleCountThreshold { 283 b.enqueueCurBundle() 284 } 285 286 // If we created a new bundle and it wasn't immediately handled, set a timer 287 if b.curBundle != nil && b.flushTimer == nil { 288 b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles) 289 } 290 291 return nil 292 } 293 294 // tryHandleBundles is the timer callback that handles or queues any current 295 // bundle after DelayThreshold time, even if the bundle isn't completely full. 296 func (b *Bundler) tryHandleBundles() { 297 b.mu.Lock() 298 b.enqueueCurBundle() 299 b.mu.Unlock() 300 } 301 302 // next returns the next bundle that is ready for handling and removes it from 303 // the internal queue. It requires that b.mu is locked. 304 func (b *Bundler) next() *bundle { 305 if b.head == nil { 306 return nil 307 } 308 out := b.head 309 b.head = b.head.next 310 if b.head == nil { 311 b.tail = nil 312 } 313 out.next = nil 314 return out 315 } 316 317 // handle calls the user-specified handler on the given bundle. handle is 318 // intended to be run as a goroutine. After the handler returns, we update the 319 // byte total. handle continues processing additional bundles that are ready. 320 // If no more bundles are ready, the handler count is decremented and the 321 // goroutine ends. 322 func (b *Bundler) handle(bu *bundle) { 323 for bu != nil { 324 b.handler(bu.items.Interface()) 325 bu = b.postHandle(bu) 326 } 327 } 328 329 func (b *Bundler) postHandle(bu *bundle) *bundle { 330 b.mu.Lock() 331 defer b.mu.Unlock() 332 333 b.sem.Release(int64(bu.size)) 334 bu.flush.Done() 335 336 bu = b.next() 337 if bu == nil { 338 b.handlerCount-- 339 } 340 return bu 341 } 342 343 // AddWait adds item to the current bundle. It marks the bundle for handling and 344 // starts a new one if any of the thresholds or limits are exceeded. 345 // 346 // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then 347 // the item can never be handled. AddWait returns ErrOversizedItem in this case. 348 // 349 // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), 350 // AddWait blocks until space is available or ctx is done. 351 // 352 // Calls to Add and AddWait should not be mixed on the same Bundler. 353 func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error { 354 if err := b.setMode(addWait); err != nil { 355 return err 356 } 357 // If this item exceeds the maximum size of a bundle, 358 // we can never send it. 359 if b.BundleByteLimit > 0 && size > b.BundleByteLimit { 360 return ErrOversizedItem 361 } 362 // If adding this item would exceed our allotted memory footprint, block 363 // until space is available. The semaphore is FIFO, so there will be no 364 // starvation. 365 b.initSemaphores() 366 if err := b.sem.Acquire(ctx, int64(size)); err != nil { 367 return err 368 } 369 370 b.mu.Lock() 371 defer b.mu.Unlock() 372 return b.add(item, size) 373 } 374 375 // Flush invokes the handler for all remaining items in the Bundler and waits 376 // for it to return. 377 func (b *Bundler) Flush() { 378 b.mu.Lock() 379 380 // If a curBundle is pending, move it to the queue. 381 b.enqueueCurBundle() 382 383 // Store a pointer to the WaitGroup that counts outstanding bundles 384 // in the current flush and create a new one to track the next flush. 385 wg := b.curFlush 386 b.curFlush = &sync.WaitGroup{} 387 388 // Flush must wait for all prior, outstanding flushes to complete. 389 // We use a channel to communicate completion between each flush in 390 // the sequence. 391 prev := b.prevFlush 392 next := make(chan bool) 393 b.prevFlush = next 394 395 b.mu.Unlock() 396 397 // Wait until the previous flush is finished. 398 if prev != nil { 399 <-prev 400 } 401 402 // Wait until this flush is finished. 403 wg.Wait() 404 405 // Allow the next flush to finish. 406 close(next) 407 } 408