1 // Copyright 2019 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package scheduler 16 17 import ( 18 "errors" 19 "sync" 20 ) 21 22 // ErrReceiveDraining indicates the scheduler has shutdown and is draining. 23 var ErrReceiveDraining error = errors.New("pubsub: receive scheduler draining") 24 25 // ReceiveScheduler is a scheduler which is designed for Pub/Sub's Receive flow. 26 // 27 // Each item is added with a given key. Items added to the empty string key are 28 // handled in random order. Items added to any other key are handled 29 // sequentially. 30 type ReceiveScheduler struct { 31 // workers is a channel that represents workers. Rather than a pool, where 32 // worker are "removed" until the pool is empty, the channel is more like a 33 // set of work desks, where workers are "added" until all the desks are full. 34 // 35 // A worker taking an item from the unordered queue (key="") completes a 36 // single item and then goes back to the pool. 37 // 38 // A worker taking an item from an ordered queue (key="something") completes 39 // all work in that queue until the queue is empty, then deletes the queue, 40 // then goes back to the pool. 41 workers chan struct{} 42 done chan struct{} 43 44 mu sync.Mutex 45 m map[string][]func() 46 } 47 48 // NewReceiveScheduler creates a new ReceiveScheduler. 49 // 50 // The workers arg is the number of concurrent calls to handle. If the workers 51 // arg is 0, then a healthy default of 10 workers is used. If less than 0, this 52 // will be set to an large number, similar to PublishScheduler's handler limit. 53 func NewReceiveScheduler(workers int) *ReceiveScheduler { 54 if workers == 0 { 55 workers = 10 56 } else if workers < 0 { 57 workers = 1e9 58 } 59 60 return &ReceiveScheduler{ 61 workers: make(chan struct{}, workers), 62 done: make(chan struct{}), 63 m: make(map[string][]func()), 64 } 65 } 66 67 // Add adds the item to be handled. Add may block. 68 // 69 // Buffering happens above the ReceiveScheduler in the form of a flow controller 70 // that requests batches of messages to pull. A backed up ReceiveScheduler.Add 71 // call causes pushback to the pubsub service (less Receive calls on the 72 // long-lived stream), which keeps memory footprint stable. 73 func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error { 74 select { 75 case <-s.done: 76 return ErrReceiveDraining 77 default: 78 } 79 if key == "" { 80 // Spawn a worker. 81 s.workers <- struct{}{} 82 go func() { 83 // Unordered keys can be handled immediately. 84 handle(item) 85 <-s.workers 86 }() 87 return nil 88 } 89 90 // Add it to the queue. This has to happen before we enter the goroutine 91 // below to prevent a race from the next iteration of the key-loop 92 // adding another item before this one gets queued. 93 94 s.mu.Lock() 95 _, ok := s.m[key] 96 s.m[key] = append(s.m[key], func() { 97 handle(item) 98 }) 99 s.mu.Unlock() 100 if ok { 101 // Someone is already working on this key. 102 return nil 103 } 104 105 // Spawn a worker. 106 s.workers <- struct{}{} 107 108 go func() { 109 defer func() { <-s.workers }() 110 111 // Key-Loop: loop through the available items in the key's queue. 112 for { 113 s.mu.Lock() 114 if len(s.m[key]) == 0 { 115 // We're done processing items - the queue is empty. Delete 116 // the queue from the map and free up the worker. 117 delete(s.m, key) 118 s.mu.Unlock() 119 return 120 } 121 // Pop an item from the queue. 122 next := s.m[key][0] 123 s.m[key] = s.m[key][1:] 124 s.mu.Unlock() 125 126 next() // Handle next in queue. 127 } 128 }() 129 130 return nil 131 } 132 133 // Shutdown begins flushing messages and stops accepting new Add calls. Shutdown 134 // does not block, or wait for all messages to be flushed. 135 func (s *ReceiveScheduler) Shutdown() { 136 select { 137 case <-s.done: 138 default: 139 close(s.done) 140 } 141 } 142