...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package backend
16
17 import (
18 "math"
19 "sync"
20
21 bolt "go.etcd.io/bbolt"
22 )
23
24
25
26
27
28 type ReadTx interface {
29 Lock()
30 Unlock()
31 RLock()
32 RUnlock()
33
34 UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
35 UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
36 }
37
38
39 type baseReadTx struct {
40
41 mu sync.RWMutex
42 buf txReadBuffer
43
44
45
46 txMu *sync.RWMutex
47 tx *bolt.Tx
48 buckets map[BucketID]*bolt.Bucket
49
50 txWg *sync.WaitGroup
51 }
52
53 func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
54 dups := make(map[string]struct{})
55 getDups := func(k, v []byte) error {
56 dups[string(k)] = struct{}{}
57 return nil
58 }
59 visitNoDup := func(k, v []byte) error {
60 if _, ok := dups[string(k)]; ok {
61 return nil
62 }
63 return visitor(k, v)
64 }
65 if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
66 return err
67 }
68 baseReadTx.txMu.Lock()
69 err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
70 baseReadTx.txMu.Unlock()
71 if err != nil {
72 return err
73 }
74 return baseReadTx.buf.ForEach(bucket, visitor)
75 }
76
77 func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
78 if endKey == nil {
79
80 limit = 1
81 }
82 if limit <= 0 {
83 limit = math.MaxInt64
84 }
85 if limit > 1 && !bucketType.IsSafeRangeBucket() {
86 panic("do not use unsafeRange on non-keys bucket")
87 }
88 keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
89 if int64(len(keys)) == limit {
90 return keys, vals
91 }
92
93
94 bn := bucketType.ID()
95 baseReadTx.txMu.RLock()
96 bucket, ok := baseReadTx.buckets[bn]
97 baseReadTx.txMu.RUnlock()
98 lockHeld := false
99 if !ok {
100 baseReadTx.txMu.Lock()
101 lockHeld = true
102 bucket = baseReadTx.tx.Bucket(bucketType.Name())
103 baseReadTx.buckets[bn] = bucket
104 }
105
106
107 if bucket == nil {
108 if lockHeld {
109 baseReadTx.txMu.Unlock()
110 }
111 return keys, vals
112 }
113 if !lockHeld {
114 baseReadTx.txMu.Lock()
115 }
116 c := bucket.Cursor()
117 baseReadTx.txMu.Unlock()
118
119 k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
120 return append(k2, keys...), append(v2, vals...)
121 }
122
123 type readTx struct {
124 baseReadTx
125 }
126
127 func (rt *readTx) Lock() { rt.mu.Lock() }
128 func (rt *readTx) Unlock() { rt.mu.Unlock() }
129 func (rt *readTx) RLock() { rt.mu.RLock() }
130 func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
131
132 func (rt *readTx) reset() {
133 rt.buf.reset()
134 rt.buckets = make(map[BucketID]*bolt.Bucket)
135 rt.tx = nil
136 rt.txWg = new(sync.WaitGroup)
137 }
138
139 type concurrentReadTx struct {
140 baseReadTx
141 }
142
143 func (rt *concurrentReadTx) Lock() {}
144 func (rt *concurrentReadTx) Unlock() {}
145
146
147 func (rt *concurrentReadTx) RLock() {}
148
149
150 func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
151
View as plain text