...

Source file src/cloud.google.com/go/pubsub/internal/scheduler/receive_scheduler.go

Documentation: cloud.google.com/go/pubsub/internal/scheduler

     1  // Copyright 2019 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package scheduler
    16  
    17  import (
    18  	"errors"
    19  	"sync"
    20  )
    21  
    22  // ErrReceiveDraining indicates the scheduler has shutdown and is draining.
    23  var ErrReceiveDraining error = errors.New("pubsub: receive scheduler draining")
    24  
    25  // ReceiveScheduler is a scheduler which is designed for Pub/Sub's Receive flow.
    26  //
    27  // Each item is added with a given key. Items added to the empty string key are
    28  // handled in random order. Items added to any other key are handled
    29  // sequentially.
    30  type ReceiveScheduler struct {
    31  	// workers is a channel that represents workers. Rather than a pool, where
    32  	// worker are "removed" until the pool is empty, the channel is more like a
    33  	// set of work desks, where workers are "added" until all the desks are full.
    34  	//
    35  	// A worker taking an item from the unordered queue (key="") completes a
    36  	// single item and then goes back to the pool.
    37  	//
    38  	// A worker taking an item from an ordered queue (key="something") completes
    39  	// all work in that queue until the queue is empty, then deletes the queue,
    40  	// then goes back to the pool.
    41  	workers chan struct{}
    42  	done    chan struct{}
    43  
    44  	mu sync.Mutex
    45  	m  map[string][]func()
    46  }
    47  
    48  // NewReceiveScheduler creates a new ReceiveScheduler.
    49  //
    50  // The workers arg is the number of concurrent calls to handle. If the workers
    51  // arg is 0, then a healthy default of 10 workers is used. If less than 0, this
    52  // will be set to an large number, similar to PublishScheduler's handler limit.
    53  func NewReceiveScheduler(workers int) *ReceiveScheduler {
    54  	if workers == 0 {
    55  		workers = 10
    56  	} else if workers < 0 {
    57  		workers = 1e9
    58  	}
    59  
    60  	return &ReceiveScheduler{
    61  		workers: make(chan struct{}, workers),
    62  		done:    make(chan struct{}),
    63  		m:       make(map[string][]func()),
    64  	}
    65  }
    66  
    67  // Add adds the item to be handled. Add may block.
    68  //
    69  // Buffering happens above the ReceiveScheduler in the form of a flow controller
    70  // that requests batches of messages to pull. A backed up ReceiveScheduler.Add
    71  // call causes pushback to the pubsub service (less Receive calls on the
    72  // long-lived stream), which keeps memory footprint stable.
    73  func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error {
    74  	select {
    75  	case <-s.done:
    76  		return ErrReceiveDraining
    77  	default:
    78  	}
    79  	if key == "" {
    80  		// Spawn a worker.
    81  		s.workers <- struct{}{}
    82  		go func() {
    83  			// Unordered keys can be handled immediately.
    84  			handle(item)
    85  			<-s.workers
    86  		}()
    87  		return nil
    88  	}
    89  
    90  	// Add it to the queue. This has to happen before we enter the goroutine
    91  	// below to prevent a race from the next iteration of the key-loop
    92  	// adding another item before this one gets queued.
    93  
    94  	s.mu.Lock()
    95  	_, ok := s.m[key]
    96  	s.m[key] = append(s.m[key], func() {
    97  		handle(item)
    98  	})
    99  	s.mu.Unlock()
   100  	if ok {
   101  		// Someone is already working on this key.
   102  		return nil
   103  	}
   104  
   105  	// Spawn a worker.
   106  	s.workers <- struct{}{}
   107  
   108  	go func() {
   109  		defer func() { <-s.workers }()
   110  
   111  		// Key-Loop: loop through the available items in the key's queue.
   112  		for {
   113  			s.mu.Lock()
   114  			if len(s.m[key]) == 0 {
   115  				// We're done processing items - the queue is empty. Delete
   116  				// the queue from the map and free up the worker.
   117  				delete(s.m, key)
   118  				s.mu.Unlock()
   119  				return
   120  			}
   121  			// Pop an item from the queue.
   122  			next := s.m[key][0]
   123  			s.m[key] = s.m[key][1:]
   124  			s.mu.Unlock()
   125  
   126  			next() // Handle next in queue.
   127  		}
   128  	}()
   129  
   130  	return nil
   131  }
   132  
   133  // Shutdown begins flushing messages and stops accepting new Add calls. Shutdown
   134  // does not block, or wait for all messages to be flushed.
   135  func (s *ReceiveScheduler) Shutdown() {
   136  	select {
   137  	case <-s.done:
   138  	default:
   139  		close(s.done)
   140  	}
   141  }
   142  

View as plain text