1 // Copyright 2017 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 // Modified by Boulder to provide a load-shedding mechanism. 5 6 // Package semaphore provides a weighted semaphore implementation. 7 package semaphore // import "golang.org/x/sync/semaphore" 8 9 import ( 10 "container/list" 11 "context" 12 "errors" 13 "sync" 14 ) 15 16 type waiter struct { 17 n int64 18 ready chan<- struct{} // Closed when semaphore acquired. 19 } 20 21 // ErrMaxWaiters is returned when Acquire is called, but there are more than 22 // maxWaiters waiters. 23 var ErrMaxWaiters = errors.New("too many waiters") 24 25 // NewWeighted creates a new weighted semaphore with the given 26 // maximum combined weight for concurrent access. 27 // maxWaiters provides a limit such that calls to Acquire 28 // will immediately error if the number of waiters is that high. 29 // A maxWaiters of zero means no limit. 30 func NewWeighted(n int64, maxWaiters int) *Weighted { 31 w := &Weighted{size: n, maxWaiters: maxWaiters} 32 return w 33 } 34 35 // Weighted provides a way to bound concurrent access to a resource. 36 // The callers can request access with a given weight. 37 type Weighted struct { 38 size int64 39 cur int64 40 mu sync.Mutex 41 waiters list.List 42 maxWaiters int 43 } 44 45 // Acquire acquires the semaphore with a weight of n, blocking until resources 46 // are available or ctx is done. On success, returns nil. On failure, returns 47 // ctx.Err() and leaves the semaphore unchanged. 48 // 49 // If ctx is already done, Acquire may still succeed without blocking. 50 // 51 // If there are maxWaiters waiters, Acquire will return an error immediately. 52 func (s *Weighted) Acquire(ctx context.Context, n int64) error { 53 s.mu.Lock() 54 if s.size-s.cur >= n && s.waiters.Len() == 0 { 55 s.cur += n 56 s.mu.Unlock() 57 return nil 58 } 59 60 if n > s.size { 61 // Don't make other Acquire calls block on one that's doomed to fail. 62 s.mu.Unlock() 63 <-ctx.Done() 64 return ctx.Err() 65 } 66 67 if s.maxWaiters > 0 && s.waiters.Len() >= s.maxWaiters { 68 return ErrMaxWaiters 69 } 70 71 ready := make(chan struct{}) 72 w := waiter{n: n, ready: ready} 73 elem := s.waiters.PushBack(w) 74 s.mu.Unlock() 75 76 select { 77 case <-ctx.Done(): 78 err := ctx.Err() 79 s.mu.Lock() 80 select { 81 case <-ready: 82 // Acquired the semaphore after we were canceled. Rather than trying to 83 // fix up the queue, just pretend we didn't notice the cancellation. 84 err = nil 85 default: 86 isFront := s.waiters.Front() == elem 87 s.waiters.Remove(elem) 88 // If we're at the front and there're extra tokens left, notify other waiters. 89 if isFront && s.size > s.cur { 90 s.notifyWaiters() 91 } 92 } 93 s.mu.Unlock() 94 return err 95 96 case <-ready: 97 return nil 98 } 99 } 100 101 // TryAcquire acquires the semaphore with a weight of n without blocking. 102 // On success, returns true. On failure, returns false and leaves the semaphore unchanged. 103 func (s *Weighted) TryAcquire(n int64) bool { 104 s.mu.Lock() 105 success := s.size-s.cur >= n && s.waiters.Len() == 0 106 if success { 107 s.cur += n 108 } 109 s.mu.Unlock() 110 return success 111 } 112 113 // Release releases the semaphore with a weight of n. 114 func (s *Weighted) Release(n int64) { 115 s.mu.Lock() 116 s.cur -= n 117 if s.cur < 0 { 118 s.mu.Unlock() 119 panic("semaphore: released more than held") 120 } 121 s.notifyWaiters() 122 s.mu.Unlock() 123 } 124 125 func (s *Weighted) NumWaiters() int { 126 s.mu.Lock() 127 defer s.mu.Unlock() 128 return s.waiters.Len() 129 } 130 131 func (s *Weighted) notifyWaiters() { 132 for { 133 next := s.waiters.Front() 134 if next == nil { 135 break // No more waiters blocked. 136 } 137 138 w := next.Value.(waiter) 139 if s.size-s.cur < w.n { 140 // Not enough tokens for the next waiter. We could keep going (to try to 141 // find a waiter with a smaller request), but under load that could cause 142 // starvation for large requests; instead, we leave all remaining waiters 143 // blocked. 144 // 145 // Consider a semaphore used as a read-write lock, with N tokens, N 146 // readers, and one writer. Each reader can Acquire(1) to obtain a read 147 // lock. The writer can Acquire(N) to obtain a write lock, excluding all 148 // of the readers. If we allow the readers to jump ahead in the queue, 149 // the writer will starve — there is always one token available for every 150 // reader. 151 break 152 } 153 154 s.cur += w.n 155 s.waiters.Remove(next) 156 close(w.ready) 157 } 158 } 159