...

Source file src/cloud.google.com/go/pubsub/flow_controller.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2017 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"sync/atomic"
    21  
    22  	"golang.org/x/sync/semaphore"
    23  )
    24  
    25  // LimitExceededBehavior configures the behavior that flowController can use in case
    26  // the flow control limits are exceeded.
    27  type LimitExceededBehavior int
    28  
    29  const (
    30  	// FlowControlIgnore disables flow control.
    31  	FlowControlIgnore LimitExceededBehavior = iota
    32  	// FlowControlBlock signals to wait until the request can be made without exceeding the limit.
    33  	FlowControlBlock
    34  	// FlowControlSignalError signals an error to the caller of acquire.
    35  	FlowControlSignalError
    36  )
    37  
    38  // flowControllerPurpose indicates whether a flowController is for a topic or a
    39  // subscription.
    40  type flowControllerPurpose int
    41  
    42  const (
    43  	flowControllerPurposeSubscription flowControllerPurpose = iota
    44  	flowControllerPurposeTopic
    45  )
    46  
    47  // FlowControlSettings controls flow control for messages while publishing or subscribing.
    48  type FlowControlSettings struct {
    49  	// MaxOutstandingMessages is the maximum number of buffered messages to be published.
    50  	// If less than or equal to zero, this is disabled.
    51  	MaxOutstandingMessages int
    52  
    53  	// MaxOutstandingBytes is the maximum size of buffered messages to be published.
    54  	// If less than or equal to zero, this is disabled.
    55  	MaxOutstandingBytes int
    56  
    57  	// LimitExceededBehavior configures the behavior when trying to publish
    58  	// additional messages while the flow controller is full. The available options
    59  	// are Ignore (disable, default), Block, and SignalError (publish
    60  	// results will return an error).
    61  	LimitExceededBehavior LimitExceededBehavior
    62  }
    63  
    64  var (
    65  	// ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages.
    66  	ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")
    67  
    68  	// ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes.
    69  	ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
    70  )
    71  
    72  // flowController implements flow control for publishing and subscribing.
    73  type flowController struct {
    74  	maxCount          int
    75  	maxSize           int                 // max total size of messages
    76  	semCount, semSize *semaphore.Weighted // enforces max number and size of messages
    77  	// Number of calls to acquire - number of calls to release. This can go
    78  	// negative if semCount == nil and a large acquire is followed by multiple
    79  	// small releases.
    80  	// Atomic.
    81  	countRemaining int64
    82  	// Number of outstanding bytes remaining. Atomic.
    83  	bytesRemaining int64
    84  	limitBehavior  LimitExceededBehavior
    85  	purpose        flowControllerPurpose
    86  }
    87  
    88  // newFlowController creates a new flowController that ensures no more than
    89  // maxCount messages or maxSize bytes are outstanding at once. If maxCount or
    90  // maxSize is < 1, then an unlimited number of messages or bytes is permitted,
    91  // respectively.
    92  func newFlowController(fc FlowControlSettings) flowController {
    93  	f := flowController{
    94  		maxCount:      fc.MaxOutstandingMessages,
    95  		maxSize:       fc.MaxOutstandingBytes,
    96  		semCount:      nil,
    97  		semSize:       nil,
    98  		limitBehavior: fc.LimitExceededBehavior,
    99  	}
   100  	if fc.MaxOutstandingMessages > 0 {
   101  		f.semCount = semaphore.NewWeighted(int64(fc.MaxOutstandingMessages))
   102  	}
   103  	if fc.MaxOutstandingBytes > 0 {
   104  		f.semSize = semaphore.NewWeighted(int64(fc.MaxOutstandingBytes))
   105  	}
   106  	return f
   107  }
   108  
   109  func newTopicFlowController(fc FlowControlSettings) flowController {
   110  	f := newFlowController(fc)
   111  	f.purpose = flowControllerPurposeTopic
   112  	return f
   113  }
   114  
   115  func newSubscriptionFlowController(fc FlowControlSettings) flowController {
   116  	f := newFlowController(fc)
   117  	f.purpose = flowControllerPurposeSubscription
   118  	return f
   119  }
   120  
   121  // acquire allocates space for a message: the message count and its size.
   122  //
   123  // In FlowControlSignalError mode, large messages greater than maxSize
   124  // will be result in an error. In other modes, large messages will be treated
   125  // as if it were equal to maxSize.
   126  func (f *flowController) acquire(ctx context.Context, size int) error {
   127  	switch f.limitBehavior {
   128  	case FlowControlIgnore:
   129  		return nil
   130  	case FlowControlBlock:
   131  		if f.semCount != nil {
   132  			if err := f.semCount.Acquire(ctx, 1); err != nil {
   133  				return err
   134  			}
   135  		}
   136  		if f.semSize != nil {
   137  			if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
   138  				if f.semCount != nil {
   139  					f.semCount.Release(1)
   140  				}
   141  				return err
   142  			}
   143  		}
   144  	case FlowControlSignalError:
   145  		if f.semCount != nil {
   146  			if !f.semCount.TryAcquire(1) {
   147  				return ErrFlowControllerMaxOutstandingMessages
   148  			}
   149  		}
   150  		if f.semSize != nil {
   151  			// Try to acquire the full size of the message here.
   152  			if !f.semSize.TryAcquire(int64(size)) {
   153  				if f.semCount != nil {
   154  					f.semCount.Release(1)
   155  				}
   156  				return ErrFlowControllerMaxOutstandingBytes
   157  			}
   158  		}
   159  	}
   160  
   161  	if f.semCount != nil {
   162  		outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
   163  		f.recordOutstandingMessages(ctx, outstandingMessages)
   164  	}
   165  
   166  	if f.semSize != nil {
   167  		outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
   168  		f.recordOutstandingBytes(ctx, outstandingBytes)
   169  	}
   170  	return nil
   171  }
   172  
   173  // release notes that one message of size bytes is no longer outstanding.
   174  func (f *flowController) release(ctx context.Context, size int) {
   175  	if f.limitBehavior == FlowControlIgnore {
   176  		return
   177  	}
   178  
   179  	if f.semCount != nil {
   180  		outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
   181  		f.recordOutstandingMessages(ctx, outstandingMessages)
   182  		f.semCount.Release(1)
   183  	}
   184  	if f.semSize != nil {
   185  		outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
   186  		f.recordOutstandingBytes(ctx, outstandingBytes)
   187  		f.semSize.Release(f.bound(size))
   188  	}
   189  }
   190  
   191  func (f *flowController) bound(size int) int64 {
   192  	if size > f.maxSize {
   193  		return int64(f.maxSize)
   194  	}
   195  	return int64(size)
   196  }
   197  
   198  // count returns the number of outstanding messages.
   199  // if maxCount is 0, this will always return 0.
   200  func (f *flowController) count() int {
   201  	return int(atomic.LoadInt64(&f.countRemaining))
   202  }
   203  
   204  func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) {
   205  	if f.purpose == flowControllerPurposeTopic {
   206  		recordStat(ctx, PublisherOutstandingMessages, n)
   207  		return
   208  	}
   209  
   210  	recordStat(ctx, OutstandingMessages, n)
   211  }
   212  
   213  func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) {
   214  	if f.purpose == flowControllerPurposeTopic {
   215  		recordStat(ctx, PublisherOutstandingBytes, n)
   216  		return
   217  	}
   218  
   219  	recordStat(ctx, OutstandingBytes, n)
   220  }
   221  

View as plain text