...

Source file src/github.com/letsencrypt/boulder/semaphore/semaphore.go

Documentation: github.com/letsencrypt/boulder/semaphore

     1  // Copyright 2017 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  // Modified by Boulder to provide a load-shedding mechanism.
     5  
     6  // Package semaphore provides a weighted semaphore implementation.
     7  package semaphore // import "golang.org/x/sync/semaphore"
     8  
     9  import (
    10  	"container/list"
    11  	"context"
    12  	"errors"
    13  	"sync"
    14  )
    15  
    16  type waiter struct {
    17  	n     int64
    18  	ready chan<- struct{} // Closed when semaphore acquired.
    19  }
    20  
    21  // ErrMaxWaiters is returned when Acquire is called, but there are more than
    22  // maxWaiters waiters.
    23  var ErrMaxWaiters = errors.New("too many waiters")
    24  
    25  // NewWeighted creates a new weighted semaphore with the given
    26  // maximum combined weight for concurrent access.
    27  // maxWaiters provides a limit such that calls to Acquire
    28  // will immediately error if the number of waiters is that high.
    29  // A maxWaiters of zero means no limit.
    30  func NewWeighted(n int64, maxWaiters int) *Weighted {
    31  	w := &Weighted{size: n, maxWaiters: maxWaiters}
    32  	return w
    33  }
    34  
    35  // Weighted provides a way to bound concurrent access to a resource.
    36  // The callers can request access with a given weight.
    37  type Weighted struct {
    38  	size       int64
    39  	cur        int64
    40  	mu         sync.Mutex
    41  	waiters    list.List
    42  	maxWaiters int
    43  }
    44  
    45  // Acquire acquires the semaphore with a weight of n, blocking until resources
    46  // are available or ctx is done. On success, returns nil. On failure, returns
    47  // ctx.Err() and leaves the semaphore unchanged.
    48  //
    49  // If ctx is already done, Acquire may still succeed without blocking.
    50  //
    51  // If there are maxWaiters waiters, Acquire will return an error immediately.
    52  func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    53  	s.mu.Lock()
    54  	if s.size-s.cur >= n && s.waiters.Len() == 0 {
    55  		s.cur += n
    56  		s.mu.Unlock()
    57  		return nil
    58  	}
    59  
    60  	if n > s.size {
    61  		// Don't make other Acquire calls block on one that's doomed to fail.
    62  		s.mu.Unlock()
    63  		<-ctx.Done()
    64  		return ctx.Err()
    65  	}
    66  
    67  	if s.maxWaiters > 0 && s.waiters.Len() >= s.maxWaiters {
    68  		return ErrMaxWaiters
    69  	}
    70  
    71  	ready := make(chan struct{})
    72  	w := waiter{n: n, ready: ready}
    73  	elem := s.waiters.PushBack(w)
    74  	s.mu.Unlock()
    75  
    76  	select {
    77  	case <-ctx.Done():
    78  		err := ctx.Err()
    79  		s.mu.Lock()
    80  		select {
    81  		case <-ready:
    82  			// Acquired the semaphore after we were canceled.  Rather than trying to
    83  			// fix up the queue, just pretend we didn't notice the cancellation.
    84  			err = nil
    85  		default:
    86  			isFront := s.waiters.Front() == elem
    87  			s.waiters.Remove(elem)
    88  			// If we're at the front and there're extra tokens left, notify other waiters.
    89  			if isFront && s.size > s.cur {
    90  				s.notifyWaiters()
    91  			}
    92  		}
    93  		s.mu.Unlock()
    94  		return err
    95  
    96  	case <-ready:
    97  		return nil
    98  	}
    99  }
   100  
   101  // TryAcquire acquires the semaphore with a weight of n without blocking.
   102  // On success, returns true. On failure, returns false and leaves the semaphore unchanged.
   103  func (s *Weighted) TryAcquire(n int64) bool {
   104  	s.mu.Lock()
   105  	success := s.size-s.cur >= n && s.waiters.Len() == 0
   106  	if success {
   107  		s.cur += n
   108  	}
   109  	s.mu.Unlock()
   110  	return success
   111  }
   112  
   113  // Release releases the semaphore with a weight of n.
   114  func (s *Weighted) Release(n int64) {
   115  	s.mu.Lock()
   116  	s.cur -= n
   117  	if s.cur < 0 {
   118  		s.mu.Unlock()
   119  		panic("semaphore: released more than held")
   120  	}
   121  	s.notifyWaiters()
   122  	s.mu.Unlock()
   123  }
   124  
   125  func (s *Weighted) NumWaiters() int {
   126  	s.mu.Lock()
   127  	defer s.mu.Unlock()
   128  	return s.waiters.Len()
   129  }
   130  
   131  func (s *Weighted) notifyWaiters() {
   132  	for {
   133  		next := s.waiters.Front()
   134  		if next == nil {
   135  			break // No more waiters blocked.
   136  		}
   137  
   138  		w := next.Value.(waiter)
   139  		if s.size-s.cur < w.n {
   140  			// Not enough tokens for the next waiter.  We could keep going (to try to
   141  			// find a waiter with a smaller request), but under load that could cause
   142  			// starvation for large requests; instead, we leave all remaining waiters
   143  			// blocked.
   144  			//
   145  			// Consider a semaphore used as a read-write lock, with N tokens, N
   146  			// readers, and one writer.  Each reader can Acquire(1) to obtain a read
   147  			// lock.  The writer can Acquire(N) to obtain a write lock, excluding all
   148  			// of the readers.  If we allow the readers to jump ahead in the queue,
   149  			// the writer will starve — there is always one token available for every
   150  			// reader.
   151  			break
   152  		}
   153  
   154  		s.cur += w.n
   155  		s.waiters.Remove(next)
   156  		close(w.ready)
   157  	}
   158  }
   159  

View as plain text