...

Source file src/github.com/nozzle/throttler/throttler.go

Documentation: github.com/nozzle/throttler

     1  // Package throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines
     2  // with channels. The API is almost identical to Wait Groups, but it allows you to set
     3  // a max number of workers that can be running simultaneously. It uses channels internally
     4  // to block until a job completes by calling Done(err) or until all jobs have been completed.
     5  //
     6  // After exiting the loop where you are using Throttler, you can call the `Err` or `Errs` method to check
     7  // for errors. `Err` will return a single error representative of all the errors Throttler caught. The
     8  // `Errs` method will return all the errors as a slice of errors (`[]error`).
     9  //
    10  // Compare the Throttler example to the sync.WaitGroup example http://golang.org/pkg/sync/#example_WaitGroup
    11  //
    12  // See a fully functional example on the playground at http://bit.ly/throttler-v3
    13  package throttler
    14  
    15  import (
    16  	"fmt"
    17  	"math"
    18  	"sync"
    19  	"sync/atomic"
    20  )
    21  
    22  // Throttler stores all the information about the number of workers, the active workers and error information
    23  type Throttler struct {
    24  	maxWorkers    int32
    25  	workerCount   int32
    26  	batchingTotal int32
    27  	batchSize     int32
    28  	totalJobs     int32
    29  	jobsStarted   int32
    30  	jobsCompleted int32
    31  	doneChan      chan struct{}
    32  	errsMutex     *sync.Mutex
    33  	errs          []error
    34  	errorCount    int32
    35  }
    36  
    37  // New returns a Throttler that will govern the max number of workers and will
    38  // work with the total number of jobs. It panics if maxWorkers < 1.
    39  func New(maxWorkers, totalJobs int) *Throttler {
    40  	if maxWorkers < 1 {
    41  		panic("maxWorkers has to be at least 1")
    42  	}
    43  	return &Throttler{
    44  		maxWorkers: int32(maxWorkers),
    45  		batchSize:  1,
    46  		totalJobs:  int32(totalJobs),
    47  		doneChan:   make(chan struct{}, totalJobs),
    48  		errsMutex:  &sync.Mutex{},
    49  	}
    50  }
    51  
    52  // NewBatchedThrottler returns a Throttler (just like New), but also enables batching.
    53  func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler {
    54  	totalJobs := int(math.Ceil(float64(batchingTotal) / float64(batchSize)))
    55  	t := New(maxWorkers, totalJobs)
    56  	t.batchSize = int32(batchSize)
    57  	t.batchingTotal = int32(batchingTotal)
    58  	return t
    59  }
    60  
    61  // SetMaxWorkers lets you change the total number of workers that can run concurrently. NOTE: If
    62  // all workers are currently running, this setting is not guaranteed to take effect until one of them
    63  // completes and Throttle() is called again
    64  func (t *Throttler) SetMaxWorkers(maxWorkers int) {
    65  	if maxWorkers < 1 {
    66  		panic("maxWorkers has to be at least 1")
    67  	}
    68  	atomic.StoreInt32(&t.maxWorkers, int32(maxWorkers))
    69  }
    70  
    71  // Throttle works similarly to sync.WaitGroup, except inside your goroutine dispatch
    72  // loop rather than after. It will not block until the number of active workers
    73  // matches the max number of workers designated in the call to NewThrottler or
    74  // all of the jobs have been dispatched. It stops blocking when Done has been called
    75  // as many times as totalJobs.
    76  func (t *Throttler) Throttle() int {
    77  	if atomic.LoadInt32(&t.totalJobs) < 1 {
    78  		return int(atomic.LoadInt32(&t.errorCount))
    79  	}
    80  	atomic.AddInt32(&t.jobsStarted, 1)
    81  	atomic.AddInt32(&t.workerCount, 1)
    82  
    83  	// check to see if the current number of workers equals the max number of workers
    84  	// if they are equal, wait for one to finish before continuing
    85  	if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
    86  		atomic.AddInt32(&t.jobsCompleted, 1)
    87  		atomic.AddInt32(&t.workerCount, -1)
    88  		<-t.doneChan
    89  	}
    90  
    91  	// check to see if all of the jobs have been started, and if so, wait until all
    92  	// jobs have been completed before continuing
    93  	if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
    94  		for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
    95  			atomic.AddInt32(&t.jobsCompleted, 1)
    96  			<-t.doneChan
    97  		}
    98  	}
    99  
   100  	return int(atomic.LoadInt32(&t.errorCount))
   101  }
   102  
   103  // Done lets Throttler know that a job has been completed so that another worker
   104  // can be activated. If Done is called less times than totalJobs,
   105  // Throttle will block forever
   106  func (t *Throttler) Done(err error) {
   107  	if err != nil {
   108  		t.errsMutex.Lock()
   109  		t.errs = append(t.errs, err)
   110  		atomic.AddInt32(&t.errorCount, 1)
   111  		t.errsMutex.Unlock()
   112  	}
   113  	t.doneChan <- struct{}{}
   114  }
   115  
   116  // Err returns an error representative of all errors caught by throttler
   117  func (t *Throttler) Err() error {
   118  	t.errsMutex.Lock()
   119  	defer t.errsMutex.Unlock()
   120  	if atomic.LoadInt32(&t.errorCount) == 0 {
   121  		return nil
   122  	}
   123  	return multiError(t.errs)
   124  }
   125  
   126  // Errs returns a slice of any errors that were received from calling Done()
   127  func (t *Throttler) Errs() []error {
   128  	t.errsMutex.Lock()
   129  	defer t.errsMutex.Unlock()
   130  	return t.errs
   131  }
   132  
   133  type multiError []error
   134  
   135  func (te multiError) Error() string {
   136  	errString := te[0].Error()
   137  	if len(te) > 1 {
   138  		errString += fmt.Sprintf(" (and %d more errors)", len(te)-1)
   139  	}
   140  	return errString
   141  }
   142  
   143  // BatchStartIndex returns the starting index for the next batch. The job count isn't modified
   144  // until th.Throttle() is called, so if you don't call Throttle before executing this
   145  // again, it will return the same index as before
   146  func (t *Throttler) BatchStartIndex() int {
   147  	return int(atomic.LoadInt32(&t.jobsStarted) * atomic.LoadInt32(&t.batchSize))
   148  }
   149  
   150  // BatchEndIndex returns the ending index for the next batch. It either returns the full batch size
   151  // or the remaining amount of jobs. The job count isn't modified
   152  // until th.Throttle() is called, so if you don't call Throttle before executing this
   153  // again, it will return the same index as before.
   154  func (t *Throttler) BatchEndIndex() int {
   155  	end := (atomic.LoadInt32(&t.jobsStarted) + 1) * atomic.LoadInt32(&t.batchSize)
   156  	if end > atomic.LoadInt32(&t.batchingTotal) {
   157  		end = atomic.LoadInt32(&t.batchingTotal)
   158  	}
   159  	return int(end)
   160  }
   161  
   162  // TotalJobs returns the total number of jobs throttler is performing
   163  func (t *Throttler) TotalJobs() int {
   164  	return int(atomic.LoadInt32(&t.totalJobs))
   165  }
   166  

View as plain text