1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package backend
16
17 import (
18 "bytes"
19 "math"
20 "sync"
21 "sync/atomic"
22 "time"
23
24 bolt "go.etcd.io/bbolt"
25 "go.uber.org/zap"
26 )
27
28 type BucketID int
29
30 type Bucket interface {
31
32
33
34 ID() BucketID
35 Name() []byte
36
37 String() string
38
39
40
41
42 IsSafeRangeBucket() bool
43 }
44
45 type BatchTx interface {
46 ReadTx
47 UnsafeCreateBucket(bucket Bucket)
48 UnsafeDeleteBucket(bucket Bucket)
49 UnsafePut(bucket Bucket, key []byte, value []byte)
50 UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
51 UnsafeDelete(bucket Bucket, key []byte)
52
53 Commit()
54
55 CommitAndStop()
56 LockInsideApply()
57 LockOutsideApply()
58 }
59
60 type batchTx struct {
61 sync.Mutex
62 tx *bolt.Tx
63 backend *backend
64
65 pending int
66 }
67
68
69 func (t *batchTx) Lock() {
70 ValidateCalledInsideUnittest(t.backend.lg)
71 t.lock()
72 }
73
74 func (t *batchTx) lock() {
75 t.Mutex.Lock()
76 }
77
78 func (t *batchTx) LockInsideApply() {
79 t.lock()
80 if t.backend.txPostLockInsideApplyHook != nil {
81
82
83
84
85
86 ValidateCalledInsideApply(t.backend.lg)
87 t.backend.txPostLockInsideApplyHook()
88 }
89 }
90
91 func (t *batchTx) LockOutsideApply() {
92 ValidateCalledOutSideApply(t.backend.lg)
93 t.lock()
94 }
95
96 func (t *batchTx) Unlock() {
97 if t.pending >= t.backend.batchLimit {
98 t.commit(false)
99 }
100 t.Mutex.Unlock()
101 }
102
103
104
105
106
107 func (t *batchTx) RLock() {
108 panic("unexpected RLock")
109 }
110
111 func (t *batchTx) RUnlock() {
112 panic("unexpected RUnlock")
113 }
114
115 func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
116 _, err := t.tx.CreateBucket(bucket.Name())
117 if err != nil && err != bolt.ErrBucketExists {
118 t.backend.lg.Fatal(
119 "failed to create a bucket",
120 zap.Stringer("bucket-name", bucket),
121 zap.Error(err),
122 )
123 }
124 t.pending++
125 }
126
127 func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
128 err := t.tx.DeleteBucket(bucket.Name())
129 if err != nil && err != bolt.ErrBucketNotFound {
130 t.backend.lg.Fatal(
131 "failed to delete a bucket",
132 zap.Stringer("bucket-name", bucket),
133 zap.Error(err),
134 )
135 }
136 t.pending++
137 }
138
139
140 func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
141 t.unsafePut(bucket, key, value, false)
142 }
143
144
145 func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
146 t.unsafePut(bucket, key, value, true)
147 }
148
149 func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
150 bucket := t.tx.Bucket(bucketType.Name())
151 if bucket == nil {
152 t.backend.lg.Fatal(
153 "failed to find a bucket",
154 zap.Stringer("bucket-name", bucketType),
155 zap.Stack("stack"),
156 )
157 }
158 if seq {
159
160
161 bucket.FillPercent = 0.9
162 }
163 if err := bucket.Put(key, value); err != nil {
164 t.backend.lg.Fatal(
165 "failed to write to a bucket",
166 zap.Stringer("bucket-name", bucketType),
167 zap.Error(err),
168 )
169 }
170 t.pending++
171 }
172
173
174 func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
175 bucket := t.tx.Bucket(bucketType.Name())
176 if bucket == nil {
177 t.backend.lg.Fatal(
178 "failed to find a bucket",
179 zap.Stringer("bucket-name", bucketType),
180 zap.Stack("stack"),
181 )
182 }
183 return unsafeRange(bucket.Cursor(), key, endKey, limit)
184 }
185
186 func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
187 if limit <= 0 {
188 limit = math.MaxInt64
189 }
190 var isMatch func(b []byte) bool
191 if len(endKey) > 0 {
192 isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
193 } else {
194 isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
195 limit = 1
196 }
197
198 for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
199 vs = append(vs, cv)
200 keys = append(keys, ck)
201 if limit == int64(len(keys)) {
202 break
203 }
204 }
205 return keys, vs
206 }
207
208
209 func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
210 bucket := t.tx.Bucket(bucketType.Name())
211 if bucket == nil {
212 t.backend.lg.Fatal(
213 "failed to find a bucket",
214 zap.Stringer("bucket-name", bucketType),
215 zap.Stack("stack"),
216 )
217 }
218 err := bucket.Delete(key)
219 if err != nil {
220 t.backend.lg.Fatal(
221 "failed to delete a key",
222 zap.Stringer("bucket-name", bucketType),
223 zap.Error(err),
224 )
225 }
226 t.pending++
227 }
228
229
230 func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
231 return unsafeForEach(t.tx, bucket, visitor)
232 }
233
234 func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
235 if b := tx.Bucket(bucket.Name()); b != nil {
236 return b.ForEach(visitor)
237 }
238 return nil
239 }
240
241
242 func (t *batchTx) Commit() {
243 t.lock()
244 t.commit(false)
245 t.Unlock()
246 }
247
248
249 func (t *batchTx) CommitAndStop() {
250 t.lock()
251 t.commit(true)
252 t.Unlock()
253 }
254
255 func (t *batchTx) safePending() int {
256 t.Mutex.Lock()
257 defer t.Mutex.Unlock()
258 return t.pending
259 }
260
261 func (t *batchTx) commit(stop bool) {
262
263 if t.tx != nil {
264 if t.pending == 0 && !stop {
265 return
266 }
267
268 start := time.Now()
269
270
271 err := t.tx.Commit()
272
273
274 rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
275 spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
276 writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
277 commitSec.Observe(time.Since(start).Seconds())
278 atomic.AddInt64(&t.backend.commits, 1)
279
280 t.pending = 0
281 if err != nil {
282 t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
283 }
284 }
285 if !stop {
286 t.tx = t.backend.begin(true)
287 }
288 }
289
290 type batchTxBuffered struct {
291 batchTx
292 buf txWriteBuffer
293 pendingDeleteOperations int
294 }
295
296 func newBatchTxBuffered(backend *backend) *batchTxBuffered {
297 tx := &batchTxBuffered{
298 batchTx: batchTx{backend: backend},
299 buf: txWriteBuffer{
300 txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
301 bucket2seq: make(map[BucketID]bool),
302 },
303 }
304 tx.Commit()
305 return tx
306 }
307
308 func (t *batchTxBuffered) Unlock() {
309 if t.pending != 0 {
310 t.backend.readTx.Lock()
311
312 t.buf.writeback(&t.backend.readTx.buf)
313 t.backend.readTx.Unlock()
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334 if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
335 t.commit(false)
336 }
337 }
338 t.batchTx.Unlock()
339 }
340
341 func (t *batchTxBuffered) Commit() {
342 t.lock()
343 t.commit(false)
344 t.Unlock()
345 }
346
347 func (t *batchTxBuffered) CommitAndStop() {
348 t.lock()
349 t.commit(true)
350 t.Unlock()
351 }
352
353 func (t *batchTxBuffered) commit(stop bool) {
354
355 t.backend.readTx.Lock()
356 t.unsafeCommit(stop)
357 t.backend.readTx.Unlock()
358 }
359
360 func (t *batchTxBuffered) unsafeCommit(stop bool) {
361 if t.backend.hooks != nil {
362 t.backend.hooks.OnPreCommitUnsafe(t)
363 }
364 if t.backend.readTx.tx != nil {
365
366
367 go func(tx *bolt.Tx, wg *sync.WaitGroup) {
368 wg.Wait()
369 if err := tx.Rollback(); err != nil {
370 t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
371 }
372 }(t.backend.readTx.tx, t.backend.readTx.txWg)
373 t.backend.readTx.reset()
374 }
375
376 t.batchTx.commit(stop)
377 t.pendingDeleteOperations = 0
378
379 if !stop {
380 t.backend.readTx.tx = t.backend.begin(false)
381 }
382 }
383
384 func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
385 t.batchTx.UnsafePut(bucket, key, value)
386 t.buf.put(bucket, key, value)
387 }
388
389 func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
390 t.batchTx.UnsafeSeqPut(bucket, key, value)
391 t.buf.putSeq(bucket, key, value)
392 }
393
394 func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
395 t.batchTx.UnsafeDelete(bucketType, key)
396 t.pendingDeleteOperations++
397 }
398
399 func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
400 t.batchTx.UnsafeDeleteBucket(bucket)
401 t.pendingDeleteOperations++
402 }
403
View as plain text