...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "errors"
20 "sync/atomic"
21
22 "golang.org/x/sync/semaphore"
23 )
24
25
26
27 type LimitExceededBehavior int
28
29 const (
30
31 FlowControlIgnore LimitExceededBehavior = iota
32
33 FlowControlBlock
34
35 FlowControlSignalError
36 )
37
38
39
40 type flowControllerPurpose int
41
42 const (
43 flowControllerPurposeSubscription flowControllerPurpose = iota
44 flowControllerPurposeTopic
45 )
46
47
48 type FlowControlSettings struct {
49
50
51 MaxOutstandingMessages int
52
53
54
55 MaxOutstandingBytes int
56
57
58
59
60
61 LimitExceededBehavior LimitExceededBehavior
62 }
63
64 var (
65
66 ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")
67
68
69 ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
70 )
71
72
73 type flowController struct {
74 maxCount int
75 maxSize int
76 semCount, semSize *semaphore.Weighted
77
78
79
80
81 countRemaining int64
82
83 bytesRemaining int64
84 limitBehavior LimitExceededBehavior
85 purpose flowControllerPurpose
86 }
87
88
89
90
91
92 func newFlowController(fc FlowControlSettings) flowController {
93 f := flowController{
94 maxCount: fc.MaxOutstandingMessages,
95 maxSize: fc.MaxOutstandingBytes,
96 semCount: nil,
97 semSize: nil,
98 limitBehavior: fc.LimitExceededBehavior,
99 }
100 if fc.MaxOutstandingMessages > 0 {
101 f.semCount = semaphore.NewWeighted(int64(fc.MaxOutstandingMessages))
102 }
103 if fc.MaxOutstandingBytes > 0 {
104 f.semSize = semaphore.NewWeighted(int64(fc.MaxOutstandingBytes))
105 }
106 return f
107 }
108
109 func newTopicFlowController(fc FlowControlSettings) flowController {
110 f := newFlowController(fc)
111 f.purpose = flowControllerPurposeTopic
112 return f
113 }
114
115 func newSubscriptionFlowController(fc FlowControlSettings) flowController {
116 f := newFlowController(fc)
117 f.purpose = flowControllerPurposeSubscription
118 return f
119 }
120
121
122
123
124
125
126 func (f *flowController) acquire(ctx context.Context, size int) error {
127 switch f.limitBehavior {
128 case FlowControlIgnore:
129 return nil
130 case FlowControlBlock:
131 if f.semCount != nil {
132 if err := f.semCount.Acquire(ctx, 1); err != nil {
133 return err
134 }
135 }
136 if f.semSize != nil {
137 if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
138 if f.semCount != nil {
139 f.semCount.Release(1)
140 }
141 return err
142 }
143 }
144 case FlowControlSignalError:
145 if f.semCount != nil {
146 if !f.semCount.TryAcquire(1) {
147 return ErrFlowControllerMaxOutstandingMessages
148 }
149 }
150 if f.semSize != nil {
151
152 if !f.semSize.TryAcquire(int64(size)) {
153 if f.semCount != nil {
154 f.semCount.Release(1)
155 }
156 return ErrFlowControllerMaxOutstandingBytes
157 }
158 }
159 }
160
161 if f.semCount != nil {
162 outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
163 f.recordOutstandingMessages(ctx, outstandingMessages)
164 }
165
166 if f.semSize != nil {
167 outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
168 f.recordOutstandingBytes(ctx, outstandingBytes)
169 }
170 return nil
171 }
172
173
174 func (f *flowController) release(ctx context.Context, size int) {
175 if f.limitBehavior == FlowControlIgnore {
176 return
177 }
178
179 if f.semCount != nil {
180 outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
181 f.recordOutstandingMessages(ctx, outstandingMessages)
182 f.semCount.Release(1)
183 }
184 if f.semSize != nil {
185 outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
186 f.recordOutstandingBytes(ctx, outstandingBytes)
187 f.semSize.Release(f.bound(size))
188 }
189 }
190
191 func (f *flowController) bound(size int) int64 {
192 if size > f.maxSize {
193 return int64(f.maxSize)
194 }
195 return int64(size)
196 }
197
198
199
200 func (f *flowController) count() int {
201 return int(atomic.LoadInt64(&f.countRemaining))
202 }
203
204 func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) {
205 if f.purpose == flowControllerPurposeTopic {
206 recordStat(ctx, PublisherOutstandingMessages, n)
207 return
208 }
209
210 recordStat(ctx, OutstandingMessages, n)
211 }
212
213 func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) {
214 if f.purpose == flowControllerPurposeTopic {
215 recordStat(ctx, PublisherOutstandingBytes, n)
216 return
217 }
218
219 recordStat(ctx, OutstandingBytes, n)
220 }
221
View as plain text