// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package scheduler import ( "errors" "sync" ) // ErrReceiveDraining indicates the scheduler has shutdown and is draining. var ErrReceiveDraining error = errors.New("pubsub: receive scheduler draining") // ReceiveScheduler is a scheduler which is designed for Pub/Sub's Receive flow. // // Each item is added with a given key. Items added to the empty string key are // handled in random order. Items added to any other key are handled // sequentially. type ReceiveScheduler struct { // workers is a channel that represents workers. Rather than a pool, where // worker are "removed" until the pool is empty, the channel is more like a // set of work desks, where workers are "added" until all the desks are full. // // A worker taking an item from the unordered queue (key="") completes a // single item and then goes back to the pool. // // A worker taking an item from an ordered queue (key="something") completes // all work in that queue until the queue is empty, then deletes the queue, // then goes back to the pool. workers chan struct{} done chan struct{} mu sync.Mutex m map[string][]func() } // NewReceiveScheduler creates a new ReceiveScheduler. // // The workers arg is the number of concurrent calls to handle. If the workers // arg is 0, then a healthy default of 10 workers is used. If less than 0, this // will be set to an large number, similar to PublishScheduler's handler limit. func NewReceiveScheduler(workers int) *ReceiveScheduler { if workers == 0 { workers = 10 } else if workers < 0 { workers = 1e9 } return &ReceiveScheduler{ workers: make(chan struct{}, workers), done: make(chan struct{}), m: make(map[string][]func()), } } // Add adds the item to be handled. Add may block. // // Buffering happens above the ReceiveScheduler in the form of a flow controller // that requests batches of messages to pull. A backed up ReceiveScheduler.Add // call causes pushback to the pubsub service (less Receive calls on the // long-lived stream), which keeps memory footprint stable. func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error { select { case <-s.done: return ErrReceiveDraining default: } if key == "" { // Spawn a worker. s.workers <- struct{}{} go func() { // Unordered keys can be handled immediately. handle(item) <-s.workers }() return nil } // Add it to the queue. This has to happen before we enter the goroutine // below to prevent a race from the next iteration of the key-loop // adding another item before this one gets queued. s.mu.Lock() _, ok := s.m[key] s.m[key] = append(s.m[key], func() { handle(item) }) s.mu.Unlock() if ok { // Someone is already working on this key. return nil } // Spawn a worker. s.workers <- struct{}{} go func() { defer func() { <-s.workers }() // Key-Loop: loop through the available items in the key's queue. for { s.mu.Lock() if len(s.m[key]) == 0 { // We're done processing items - the queue is empty. Delete // the queue from the map and free up the worker. delete(s.m, key) s.mu.Unlock() return } // Pop an item from the queue. next := s.m[key][0] s.m[key] = s.m[key][1:] s.mu.Unlock() next() // Handle next in queue. } }() return nil } // Shutdown begins flushing messages and stops accepting new Add calls. Shutdown // does not block, or wait for all messages to be flushed. func (s *ReceiveScheduler) Shutdown() { select { case <-s.done: default: close(s.done) } }