...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "sync/atomic"
20
21 "golang.org/x/sync/semaphore"
22 )
23
24
25 type flowController struct {
26
27 maxInsertCount int
28
29 maxInsertBytes int
30
31
32 semInsertCount, semInsertBytes *semaphore.Weighted
33
34 countTracked int64
35 bytesTracked int64
36 }
37
38 func newFlowController(maxInserts, maxInsertBytes int) *flowController {
39 fc := &flowController{
40 maxInsertCount: maxInserts,
41 maxInsertBytes: maxInsertBytes,
42 semInsertCount: nil,
43 semInsertBytes: nil,
44 }
45 if maxInserts > 0 {
46 fc.semInsertCount = semaphore.NewWeighted(int64(maxInserts))
47 }
48 if maxInsertBytes > 0 {
49 fc.semInsertBytes = semaphore.NewWeighted(int64(maxInsertBytes))
50 }
51 return fc
52 }
53
54
55
56 func copyFlowController(in *flowController) *flowController {
57 var maxInserts, maxBytes int
58 if in != nil {
59 maxInserts = in.maxInsertCount
60 maxBytes = in.maxInsertBytes
61 }
62 return newFlowController(maxInserts, maxBytes)
63 }
64
65
66
67
68
69
70 func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error {
71 if fc.semInsertCount != nil {
72 if err := fc.semInsertCount.Acquire(ctx, 1); err != nil {
73 return err
74 }
75 }
76 if fc.semInsertBytes != nil {
77 if err := fc.semInsertBytes.Acquire(ctx, fc.bound(sizeBytes)); err != nil {
78 if fc.semInsertCount != nil {
79 fc.semInsertCount.Release(1)
80 }
81 return err
82 }
83 }
84 atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
85 atomic.AddInt64(&fc.countTracked, 1)
86 return nil
87 }
88
89
90
91
92
93
94 func (fc *flowController) tryAcquire(sizeBytes int) bool {
95 if fc.semInsertCount != nil {
96 if !fc.semInsertCount.TryAcquire(1) {
97 return false
98 }
99 }
100 if fc.semInsertBytes != nil {
101 if !fc.semInsertBytes.TryAcquire(fc.bound(sizeBytes)) {
102 if fc.semInsertCount != nil {
103 fc.semInsertCount.Release(1)
104 }
105 return false
106 }
107 }
108 atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
109 atomic.AddInt64(&fc.countTracked, 1)
110 return true
111 }
112
113 func (fc *flowController) release(sizeBytes int) {
114 atomic.AddInt64(&fc.countTracked, -1)
115 atomic.AddInt64(&fc.bytesTracked, (0 - fc.bound(sizeBytes)))
116 if fc.semInsertCount != nil {
117 fc.semInsertCount.Release(1)
118 }
119 if fc.semInsertBytes != nil {
120 fc.semInsertBytes.Release(fc.bound(sizeBytes))
121 }
122 }
123
124
125 func (fc *flowController) bound(sizeBytes int) int64 {
126 if sizeBytes > fc.maxInsertBytes {
127 return int64(fc.maxInsertBytes)
128 }
129 return int64(sizeBytes)
130 }
131
132 func (fc *flowController) count() int {
133 return int(atomic.LoadInt64(&fc.countTracked))
134 }
135
136 func (fc *flowController) bytes() int {
137 return int(atomic.LoadInt64(&fc.bytesTracked))
138 }
139
View as plain text