// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "context" "sync/atomic" "golang.org/x/sync/semaphore" ) // Flow controller for write API. Adapted from pubsub. type flowController struct { // The max number of pending write requests. maxInsertCount int // The max pending request bytes. maxInsertBytes int // Semaphores for governing pending inserts. semInsertCount, semInsertBytes *semaphore.Weighted countTracked int64 // Atomic. bytesTracked int64 // Atomic. Only tracked if bytes are bounded. } func newFlowController(maxInserts, maxInsertBytes int) *flowController { fc := &flowController{ maxInsertCount: maxInserts, maxInsertBytes: maxInsertBytes, semInsertCount: nil, semInsertBytes: nil, } if maxInserts > 0 { fc.semInsertCount = semaphore.NewWeighted(int64(maxInserts)) } if maxInsertBytes > 0 { fc.semInsertBytes = semaphore.NewWeighted(int64(maxInsertBytes)) } return fc } // copyFlowController is for creating a new flow controller based on // settings from another. It does not copy flow state. func copyFlowController(in *flowController) *flowController { var maxInserts, maxBytes int if in != nil { maxInserts = in.maxInsertCount maxBytes = in.maxInsertBytes } return newFlowController(maxInserts, maxBytes) } // acquire blocks until one insert of size bytes can proceed or ctx is done. // It returns nil in the first case, or ctx.Err() in the second. // // acquire allows large messages to proceed by treating a size greater than maxSize // as if it were equal to maxSize. func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error { if fc.semInsertCount != nil { if err := fc.semInsertCount.Acquire(ctx, 1); err != nil { return err } } if fc.semInsertBytes != nil { if err := fc.semInsertBytes.Acquire(ctx, fc.bound(sizeBytes)); err != nil { if fc.semInsertCount != nil { fc.semInsertCount.Release(1) } return err } } atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes)) atomic.AddInt64(&fc.countTracked, 1) return nil } // tryAcquire returns false if acquire would block. Otherwise, it behaves like // acquire and returns true. // // tryAcquire allows large inserts to proceed by treating a size greater than // maxSize as if it were equal to maxSize. func (fc *flowController) tryAcquire(sizeBytes int) bool { if fc.semInsertCount != nil { if !fc.semInsertCount.TryAcquire(1) { return false } } if fc.semInsertBytes != nil { if !fc.semInsertBytes.TryAcquire(fc.bound(sizeBytes)) { if fc.semInsertCount != nil { fc.semInsertCount.Release(1) } return false } } atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes)) atomic.AddInt64(&fc.countTracked, 1) return true } func (fc *flowController) release(sizeBytes int) { atomic.AddInt64(&fc.countTracked, -1) atomic.AddInt64(&fc.bytesTracked, (0 - fc.bound(sizeBytes))) if fc.semInsertCount != nil { fc.semInsertCount.Release(1) } if fc.semInsertBytes != nil { fc.semInsertBytes.Release(fc.bound(sizeBytes)) } } // bound normalizes input size to maxInsertBytes if it exceeds the limit. func (fc *flowController) bound(sizeBytes int) int64 { if sizeBytes > fc.maxInsertBytes { return int64(fc.maxInsertBytes) } return int64(sizeBytes) } func (fc *flowController) count() int { return int(atomic.LoadInt64(&fc.countTracked)) } func (fc *flowController) bytes() int { return int(atomic.LoadInt64(&fc.bytesTracked)) }