1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "sort"
19 "sync"
20
21 "github.com/google/btree"
22 "go.uber.org/zap"
23 )
24
25 type index interface {
26 Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
27 Range(key, end []byte, atRev int64) ([][]byte, []revision)
28 Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
29 CountRevisions(key, end []byte, atRev int64) int
30 Put(key []byte, rev revision)
31 Tombstone(key []byte, rev revision) error
32 RangeSince(key, end []byte, rev int64) []revision
33 Compact(rev int64) map[revision]struct{}
34 Keep(rev int64) map[revision]struct{}
35 Equal(b index) bool
36
37 Insert(ki *keyIndex)
38 KeyIndex(ki *keyIndex) *keyIndex
39 }
40
41 type treeIndex struct {
42 sync.RWMutex
43 tree *btree.BTree
44 lg *zap.Logger
45 }
46
47 func newTreeIndex(lg *zap.Logger) index {
48 return &treeIndex{
49 tree: btree.New(32),
50 lg: lg,
51 }
52 }
53
54 func (ti *treeIndex) Put(key []byte, rev revision) {
55 keyi := &keyIndex{key: key}
56
57 ti.Lock()
58 defer ti.Unlock()
59 item := ti.tree.Get(keyi)
60 if item == nil {
61 keyi.put(ti.lg, rev.main, rev.sub)
62 ti.tree.ReplaceOrInsert(keyi)
63 return
64 }
65 okeyi := item.(*keyIndex)
66 okeyi.put(ti.lg, rev.main, rev.sub)
67 }
68
69 func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
70 keyi := &keyIndex{key: key}
71 ti.RLock()
72 defer ti.RUnlock()
73 if keyi = ti.keyIndex(keyi); keyi == nil {
74 return revision{}, revision{}, 0, ErrRevisionNotFound
75 }
76 return keyi.get(ti.lg, atRev)
77 }
78
79 func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
80 ti.RLock()
81 defer ti.RUnlock()
82 return ti.keyIndex(keyi)
83 }
84
85 func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
86 if item := ti.tree.Get(keyi); item != nil {
87 return item.(*keyIndex)
88 }
89 return nil
90 }
91
92 func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex) bool) {
93 keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
94
95 ti.RLock()
96 defer ti.RUnlock()
97
98 ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
99 if len(endi.key) > 0 && !item.Less(endi) {
100 return false
101 }
102 if !f(item.(*keyIndex)) {
103 return false
104 }
105 return true
106 })
107 }
108
109 func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
110 if end == nil {
111 rev, _, _, err := ti.Get(key, atRev)
112 if err != nil {
113 return nil, 0
114 }
115 return []revision{rev}, 1
116 }
117 ti.visit(key, end, func(ki *keyIndex) bool {
118 if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
119 if limit <= 0 || len(revs) < limit {
120 revs = append(revs, rev)
121 }
122 total++
123 }
124 return true
125 })
126 return revs, total
127 }
128
129 func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
130 if end == nil {
131 _, _, _, err := ti.Get(key, atRev)
132 if err != nil {
133 return 0
134 }
135 return 1
136 }
137 total := 0
138 ti.visit(key, end, func(ki *keyIndex) bool {
139 if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
140 total++
141 }
142 return true
143 })
144 return total
145 }
146
147 func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
148 if end == nil {
149 rev, _, _, err := ti.Get(key, atRev)
150 if err != nil {
151 return nil, nil
152 }
153 return [][]byte{key}, []revision{rev}
154 }
155 ti.visit(key, end, func(ki *keyIndex) bool {
156 if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
157 revs = append(revs, rev)
158 keys = append(keys, ki.key)
159 }
160 return true
161 })
162 return keys, revs
163 }
164
165 func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
166 keyi := &keyIndex{key: key}
167
168 ti.Lock()
169 defer ti.Unlock()
170 item := ti.tree.Get(keyi)
171 if item == nil {
172 return ErrRevisionNotFound
173 }
174
175 ki := item.(*keyIndex)
176 return ki.tombstone(ti.lg, rev.main, rev.sub)
177 }
178
179
180
181
182 func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
183 keyi := &keyIndex{key: key}
184
185 ti.RLock()
186 defer ti.RUnlock()
187
188 if end == nil {
189 item := ti.tree.Get(keyi)
190 if item == nil {
191 return nil
192 }
193 keyi = item.(*keyIndex)
194 return keyi.since(ti.lg, rev)
195 }
196
197 endi := &keyIndex{key: end}
198 var revs []revision
199 ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
200 if len(endi.key) > 0 && !item.Less(endi) {
201 return false
202 }
203 curKeyi := item.(*keyIndex)
204 revs = append(revs, curKeyi.since(ti.lg, rev)...)
205 return true
206 })
207 sort.Sort(revisions(revs))
208
209 return revs
210 }
211
212 func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
213 available := make(map[revision]struct{})
214 ti.lg.Info("compact tree index", zap.Int64("revision", rev))
215 ti.Lock()
216 clone := ti.tree.Clone()
217 ti.Unlock()
218
219 clone.Ascend(func(item btree.Item) bool {
220 keyi := item.(*keyIndex)
221
222
223 ti.Lock()
224 keyi.compact(ti.lg, rev, available)
225 if keyi.isEmpty() {
226 item := ti.tree.Delete(keyi)
227 if item == nil {
228 ti.lg.Panic("failed to delete during compaction")
229 }
230 }
231 ti.Unlock()
232 return true
233 })
234 return available
235 }
236
237
238 func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
239 available := make(map[revision]struct{})
240 ti.RLock()
241 defer ti.RUnlock()
242 ti.tree.Ascend(func(i btree.Item) bool {
243 keyi := i.(*keyIndex)
244 keyi.keep(rev, available)
245 return true
246 })
247 return available
248 }
249
250 func (ti *treeIndex) Equal(bi index) bool {
251 b := bi.(*treeIndex)
252
253 if ti.tree.Len() != b.tree.Len() {
254 return false
255 }
256
257 equal := true
258
259 ti.tree.Ascend(func(item btree.Item) bool {
260 aki := item.(*keyIndex)
261 bki := b.tree.Get(item).(*keyIndex)
262 if !aki.equal(bki) {
263 equal = false
264 return false
265 }
266 return true
267 })
268
269 return equal
270 }
271
272 func (ti *treeIndex) Insert(ki *keyIndex) {
273 ti.Lock()
274 defer ti.Unlock()
275 ti.tree.ReplaceOrInsert(ki)
276 }
277
View as plain text