...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package scheduler
16
17 import (
18 "errors"
19 "reflect"
20 "sync"
21 "time"
22
23 "google.golang.org/api/support/bundler"
24 )
25
26
27
28
29
30
31
32
33 type PublishScheduler struct {
34
35 DelayThreshold time.Duration
36 BundleCountThreshold int
37 BundleByteThreshold int
38 BundleByteLimit int
39 BufferedByteLimit int
40
41 mu sync.Mutex
42 bundlers sync.Map
43 outstanding sync.Map
44
45 keysMu sync.RWMutex
46
47
48
49 keysWithErrors map[string]struct{}
50
51
52
53
54
55
56
57 workers chan struct{}
58 handle func(bundle interface{})
59 done chan struct{}
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73 func NewPublishScheduler(workers int, handle func(bundle interface{})) *PublishScheduler {
74 if workers == 0 {
75 workers = 10
76 }
77
78 s := PublishScheduler{
79 keysWithErrors: make(map[string]struct{}),
80 workers: make(chan struct{}, workers),
81 handle: handle,
82 done: make(chan struct{}),
83 }
84
85 return &s
86 }
87
88
89
90
91
92
93
94
95
96
97
98 func (s *PublishScheduler) Add(key string, item interface{}, size int) error {
99 select {
100 case <-s.done:
101 return errors.New("draining")
102 default:
103 }
104
105 s.mu.Lock()
106 defer s.mu.Unlock()
107 var b *bundler.Bundler
108 bInterface, ok := s.bundlers.Load(key)
109
110 if !ok {
111 s.outstanding.Store(key, 1)
112 b = bundler.NewBundler(item, func(bundle interface{}) {
113 s.workers <- struct{}{}
114 s.handle(bundle)
115 <-s.workers
116
117 nlen := reflect.ValueOf(bundle).Len()
118 s.mu.Lock()
119 outsInterface, _ := s.outstanding.Load(key)
120 s.outstanding.Store(key, outsInterface.(int)-nlen)
121 if v, _ := s.outstanding.Load(key); v == 0 {
122 s.outstanding.Delete(key)
123 s.bundlers.Delete(key)
124 }
125 s.mu.Unlock()
126 })
127 b.DelayThreshold = s.DelayThreshold
128 b.BundleCountThreshold = s.BundleCountThreshold
129 b.BundleByteThreshold = s.BundleByteThreshold
130 b.BundleByteLimit = s.BundleByteLimit
131 b.BufferedByteLimit = s.BufferedByteLimit
132
133 if b.BufferedByteLimit == 0 {
134 b.BufferedByteLimit = 1e9
135 }
136
137 if key == "" {
138
139
140 b.HandlerLimit = 1e9
141 } else {
142
143 b.HandlerLimit = 1
144 }
145
146 s.bundlers.Store(key, b)
147 } else {
148 b = bInterface.(*bundler.Bundler)
149 oi, _ := s.outstanding.Load(key)
150 s.outstanding.Store(key, oi.(int)+1)
151 }
152
153 return b.Add(item, size)
154 }
155
156
157
158 func (s *PublishScheduler) FlushAndStop() {
159 close(s.done)
160 s.bundlers.Range(func(_, bi interface{}) bool {
161 bi.(*bundler.Bundler).Flush()
162 return true
163 })
164 }
165
166
167 func (s *PublishScheduler) Flush() {
168 var wg sync.WaitGroup
169 s.bundlers.Range(func(_, bi interface{}) bool {
170 wg.Add(1)
171 go func(b *bundler.Bundler) {
172 defer wg.Done()
173 b.Flush()
174 }(bi.(*bundler.Bundler))
175 return true
176 })
177 wg.Wait()
178
179 }
180
181
182
183 func (s *PublishScheduler) IsPaused(orderingKey string) bool {
184 s.keysMu.RLock()
185 defer s.keysMu.RUnlock()
186 _, ok := s.keysWithErrors[orderingKey]
187 return ok
188 }
189
190
191
192
193
194 func (s *PublishScheduler) Pause(orderingKey string) {
195 if orderingKey != "" {
196 s.keysMu.Lock()
197 defer s.keysMu.Unlock()
198 s.keysWithErrors[orderingKey] = struct{}{}
199 }
200 }
201
202
203 func (s *PublishScheduler) Resume(orderingKey string) {
204 s.keysMu.Lock()
205 defer s.keysMu.Unlock()
206 delete(s.keysWithErrors, orderingKey)
207 }
208
View as plain text