...

Source file src/cloud.google.com/go/pubsub/internal/scheduler/publish_scheduler.go

Documentation: cloud.google.com/go/pubsub/internal/scheduler

     1  // Copyright 2019 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package scheduler
    16  
    17  import (
    18  	"errors"
    19  	"reflect"
    20  	"sync"
    21  	"time"
    22  
    23  	"google.golang.org/api/support/bundler"
    24  )
    25  
    26  // PublishScheduler is a scheduler which is designed for Pub/Sub's Publish flow.
    27  // It bundles items before handling them. All items in this PublishScheduler use
    28  // the same handler.
    29  //
    30  // Each item is added with a given key. Items added to the empty string key are
    31  // handled in random order. Items added to any other key are handled
    32  // sequentially.
    33  type PublishScheduler struct {
    34  	// Settings passed down to each bundler that gets created.
    35  	DelayThreshold       time.Duration
    36  	BundleCountThreshold int
    37  	BundleByteThreshold  int
    38  	BundleByteLimit      int
    39  	BufferedByteLimit    int
    40  
    41  	mu          sync.Mutex
    42  	bundlers    sync.Map // keys -> *bundler.Bundler
    43  	outstanding sync.Map // keys -> num outstanding messages
    44  
    45  	keysMu sync.RWMutex
    46  	// keysWithErrors tracks ordering keys that cannot accept new messages.
    47  	// A bundler might not accept new messages if publishing has failed
    48  	// for a specific ordering key, and can be resumed with topic.ResumePublish().
    49  	keysWithErrors map[string]struct{}
    50  
    51  	// workers is a channel that represents workers. Rather than a pool, where
    52  	// worker are "removed" until the pool is empty, the channel is more like a
    53  	// set of work desks, where workers are "added" until all the desks are full.
    54  	//
    55  	// workers does not restrict the amount of goroutines in the bundlers.
    56  	// Rather, it acts as the flow control for completion of bundler work.
    57  	workers chan struct{}
    58  	handle  func(bundle interface{})
    59  	done    chan struct{}
    60  }
    61  
    62  // NewPublishScheduler returns a new PublishScheduler.
    63  //
    64  // The workers arg is the number of workers that will operate on the queue of
    65  // work. A reasonably large number of workers is highly recommended. If the
    66  // workers arg is 0, then a healthy default of 10 workers is used.
    67  //
    68  // The scheduler does not use a parent context. If it did, canceling that
    69  // context would immediately stop the scheduler without waiting for
    70  // undelivered messages.
    71  //
    72  // The scheduler should be stopped only with FlushAndStop.
    73  func NewPublishScheduler(workers int, handle func(bundle interface{})) *PublishScheduler {
    74  	if workers == 0 {
    75  		workers = 10
    76  	}
    77  
    78  	s := PublishScheduler{
    79  		keysWithErrors: make(map[string]struct{}),
    80  		workers:        make(chan struct{}, workers),
    81  		handle:         handle,
    82  		done:           make(chan struct{}),
    83  	}
    84  
    85  	return &s
    86  }
    87  
    88  // Add adds an item to the scheduler at a given key.
    89  //
    90  // Add never blocks. Buffering happens in the scheduler's publishers. There is
    91  // no flow control.
    92  //
    93  // Since ordered keys require only a single outstanding RPC at once, it is
    94  // possible to send ordered key messages to Topic.Publish (and subsequently to
    95  // PublishScheduler.Add) faster than the bundler can publish them to the
    96  // Pub/Sub service, resulting in a backed up queue of Pub/Sub bundles. Each
    97  // item in the bundler queue is a goroutine.
    98  func (s *PublishScheduler) Add(key string, item interface{}, size int) error {
    99  	select {
   100  	case <-s.done:
   101  		return errors.New("draining")
   102  	default:
   103  	}
   104  
   105  	s.mu.Lock()
   106  	defer s.mu.Unlock()
   107  	var b *bundler.Bundler
   108  	bInterface, ok := s.bundlers.Load(key)
   109  
   110  	if !ok {
   111  		s.outstanding.Store(key, 1)
   112  		b = bundler.NewBundler(item, func(bundle interface{}) {
   113  			s.workers <- struct{}{}
   114  			s.handle(bundle)
   115  			<-s.workers
   116  
   117  			nlen := reflect.ValueOf(bundle).Len()
   118  			s.mu.Lock()
   119  			outsInterface, _ := s.outstanding.Load(key)
   120  			s.outstanding.Store(key, outsInterface.(int)-nlen)
   121  			if v, _ := s.outstanding.Load(key); v == 0 {
   122  				s.outstanding.Delete(key)
   123  				s.bundlers.Delete(key)
   124  			}
   125  			s.mu.Unlock()
   126  		})
   127  		b.DelayThreshold = s.DelayThreshold
   128  		b.BundleCountThreshold = s.BundleCountThreshold
   129  		b.BundleByteThreshold = s.BundleByteThreshold
   130  		b.BundleByteLimit = s.BundleByteLimit
   131  		b.BufferedByteLimit = s.BufferedByteLimit
   132  
   133  		if b.BufferedByteLimit == 0 {
   134  			b.BufferedByteLimit = 1e9
   135  		}
   136  
   137  		if key == "" {
   138  			// There's no way to express "unlimited" in the bundler, so we use
   139  			// some high number.
   140  			b.HandlerLimit = 1e9
   141  		} else {
   142  			// HandlerLimit=1 causes the bundler to act as a sequential queue.
   143  			b.HandlerLimit = 1
   144  		}
   145  
   146  		s.bundlers.Store(key, b)
   147  	} else {
   148  		b = bInterface.(*bundler.Bundler)
   149  		oi, _ := s.outstanding.Load(key)
   150  		s.outstanding.Store(key, oi.(int)+1)
   151  	}
   152  
   153  	return b.Add(item, size)
   154  }
   155  
   156  // FlushAndStop begins flushing items from bundlers and from the scheduler. It
   157  // blocks until all items have been flushed.
   158  func (s *PublishScheduler) FlushAndStop() {
   159  	close(s.done)
   160  	s.bundlers.Range(func(_, bi interface{}) bool {
   161  		bi.(*bundler.Bundler).Flush()
   162  		return true
   163  	})
   164  }
   165  
   166  // Flush waits until all bundlers are sent.
   167  func (s *PublishScheduler) Flush() {
   168  	var wg sync.WaitGroup
   169  	s.bundlers.Range(func(_, bi interface{}) bool {
   170  		wg.Add(1)
   171  		go func(b *bundler.Bundler) {
   172  			defer wg.Done()
   173  			b.Flush()
   174  		}(bi.(*bundler.Bundler))
   175  		return true
   176  	})
   177  	wg.Wait()
   178  
   179  }
   180  
   181  // IsPaused checks if the bundler associated with an ordering keys is
   182  // paused.
   183  func (s *PublishScheduler) IsPaused(orderingKey string) bool {
   184  	s.keysMu.RLock()
   185  	defer s.keysMu.RUnlock()
   186  	_, ok := s.keysWithErrors[orderingKey]
   187  	return ok
   188  }
   189  
   190  // Pause pauses the bundler associated with the provided ordering key,
   191  // preventing it from accepting new messages. Any outstanding messages
   192  // that haven't been published will error. If orderingKey is empty,
   193  // this is a no-op.
   194  func (s *PublishScheduler) Pause(orderingKey string) {
   195  	if orderingKey != "" {
   196  		s.keysMu.Lock()
   197  		defer s.keysMu.Unlock()
   198  		s.keysWithErrors[orderingKey] = struct{}{}
   199  	}
   200  }
   201  
   202  // Resume resumes accepting message with the provided ordering key.
   203  func (s *PublishScheduler) Resume(orderingKey string) {
   204  	s.keysMu.Lock()
   205  	defer s.keysMu.Unlock()
   206  	delete(s.keysWithErrors, orderingKey)
   207  }
   208  

View as plain text