...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/flow_controller.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"sync/atomic"
    20  
    21  	"golang.org/x/sync/semaphore"
    22  )
    23  
    24  // Flow controller for write API.  Adapted from pubsub.
    25  type flowController struct {
    26  	// The max number of pending write requests.
    27  	maxInsertCount int
    28  	// The max pending request bytes.
    29  	maxInsertBytes int
    30  
    31  	// Semaphores for governing pending inserts.
    32  	semInsertCount, semInsertBytes *semaphore.Weighted
    33  
    34  	countTracked int64 // Atomic.
    35  	bytesTracked int64 // Atomic.  Only tracked if bytes are bounded.
    36  }
    37  
    38  func newFlowController(maxInserts, maxInsertBytes int) *flowController {
    39  	fc := &flowController{
    40  		maxInsertCount: maxInserts,
    41  		maxInsertBytes: maxInsertBytes,
    42  		semInsertCount: nil,
    43  		semInsertBytes: nil,
    44  	}
    45  	if maxInserts > 0 {
    46  		fc.semInsertCount = semaphore.NewWeighted(int64(maxInserts))
    47  	}
    48  	if maxInsertBytes > 0 {
    49  		fc.semInsertBytes = semaphore.NewWeighted(int64(maxInsertBytes))
    50  	}
    51  	return fc
    52  }
    53  
    54  // copyFlowController is for creating a new flow controller based on
    55  // settings from another.  It does not copy flow state.
    56  func copyFlowController(in *flowController) *flowController {
    57  	var maxInserts, maxBytes int
    58  	if in != nil {
    59  		maxInserts = in.maxInsertCount
    60  		maxBytes = in.maxInsertBytes
    61  	}
    62  	return newFlowController(maxInserts, maxBytes)
    63  }
    64  
    65  // acquire blocks until one insert of size bytes can proceed or ctx is done.
    66  // It returns nil in the first case, or ctx.Err() in the second.
    67  //
    68  // acquire allows large messages to proceed by treating a size greater than maxSize
    69  // as if it were equal to maxSize.
    70  func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error {
    71  	if fc.semInsertCount != nil {
    72  		if err := fc.semInsertCount.Acquire(ctx, 1); err != nil {
    73  			return err
    74  		}
    75  	}
    76  	if fc.semInsertBytes != nil {
    77  		if err := fc.semInsertBytes.Acquire(ctx, fc.bound(sizeBytes)); err != nil {
    78  			if fc.semInsertCount != nil {
    79  				fc.semInsertCount.Release(1)
    80  			}
    81  			return err
    82  		}
    83  	}
    84  	atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
    85  	atomic.AddInt64(&fc.countTracked, 1)
    86  	return nil
    87  }
    88  
    89  // tryAcquire returns false if acquire would block. Otherwise, it behaves like
    90  // acquire and returns true.
    91  //
    92  // tryAcquire allows large inserts to proceed by treating a size greater than
    93  // maxSize as if it were equal to maxSize.
    94  func (fc *flowController) tryAcquire(sizeBytes int) bool {
    95  	if fc.semInsertCount != nil {
    96  		if !fc.semInsertCount.TryAcquire(1) {
    97  			return false
    98  		}
    99  	}
   100  	if fc.semInsertBytes != nil {
   101  		if !fc.semInsertBytes.TryAcquire(fc.bound(sizeBytes)) {
   102  			if fc.semInsertCount != nil {
   103  				fc.semInsertCount.Release(1)
   104  			}
   105  			return false
   106  		}
   107  	}
   108  	atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
   109  	atomic.AddInt64(&fc.countTracked, 1)
   110  	return true
   111  }
   112  
   113  func (fc *flowController) release(sizeBytes int) {
   114  	atomic.AddInt64(&fc.countTracked, -1)
   115  	atomic.AddInt64(&fc.bytesTracked, (0 - fc.bound(sizeBytes)))
   116  	if fc.semInsertCount != nil {
   117  		fc.semInsertCount.Release(1)
   118  	}
   119  	if fc.semInsertBytes != nil {
   120  		fc.semInsertBytes.Release(fc.bound(sizeBytes))
   121  	}
   122  }
   123  
   124  // bound normalizes input size to maxInsertBytes if it exceeds the limit.
   125  func (fc *flowController) bound(sizeBytes int) int64 {
   126  	if sizeBytes > fc.maxInsertBytes {
   127  		return int64(fc.maxInsertBytes)
   128  	}
   129  	return int64(sizeBytes)
   130  }
   131  
   132  func (fc *flowController) count() int {
   133  	return int(atomic.LoadInt64(&fc.countTracked))
   134  }
   135  
   136  func (fc *flowController) bytes() int {
   137  	return int(atomic.LoadInt64(&fc.bytesTracked))
   138  }
   139  

View as plain text