1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "bytes"
19 "errors"
20 "fmt"
21
22 "github.com/google/btree"
23 "go.uber.org/zap"
24 )
25
26 var (
27 ErrRevisionNotFound = errors.New("mvcc: revision not found")
28 )
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 type keyIndex struct {
76 key []byte
77 modified revision
78 generations []generation
79 }
80
81
82 func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
83 rev := revision{main: main, sub: sub}
84
85 if !rev.GreaterThan(ki.modified) {
86 lg.Panic(
87 "'put' with an unexpected smaller revision",
88 zap.Int64("given-revision-main", rev.main),
89 zap.Int64("given-revision-sub", rev.sub),
90 zap.Int64("modified-revision-main", ki.modified.main),
91 zap.Int64("modified-revision-sub", ki.modified.sub),
92 )
93 }
94 if len(ki.generations) == 0 {
95 ki.generations = append(ki.generations, generation{})
96 }
97 g := &ki.generations[len(ki.generations)-1]
98 if len(g.revs) == 0 {
99 keysGauge.Inc()
100 g.created = rev
101 }
102 g.revs = append(g.revs, rev)
103 g.ver++
104 ki.modified = rev
105 }
106
107 func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
108 if len(ki.generations) != 0 {
109 lg.Panic(
110 "'restore' got an unexpected non-empty generations",
111 zap.Int("generations-size", len(ki.generations)),
112 )
113 }
114
115 ki.modified = modified
116 g := generation{created: created, ver: ver, revs: []revision{modified}}
117 ki.generations = append(ki.generations, g)
118 keysGauge.Inc()
119 }
120
121
122
123
124 func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
125 if ki.isEmpty() {
126 lg.Panic(
127 "'tombstone' got an unexpected empty keyIndex",
128 zap.String("key", string(ki.key)),
129 )
130 }
131 if ki.generations[len(ki.generations)-1].isEmpty() {
132 return ErrRevisionNotFound
133 }
134 ki.put(lg, main, sub)
135 ki.generations = append(ki.generations, generation{})
136 keysGauge.Dec()
137 return nil
138 }
139
140
141
142 func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
143 if ki.isEmpty() {
144 lg.Panic(
145 "'get' got an unexpected empty keyIndex",
146 zap.String("key", string(ki.key)),
147 )
148 }
149 g := ki.findGeneration(atRev)
150 if g.isEmpty() {
151 return revision{}, revision{}, 0, ErrRevisionNotFound
152 }
153
154 n := g.walk(func(rev revision) bool { return rev.main > atRev })
155 if n != -1 {
156 return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
157 }
158
159 return revision{}, revision{}, 0, ErrRevisionNotFound
160 }
161
162
163
164
165 func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
166 if ki.isEmpty() {
167 lg.Panic(
168 "'since' got an unexpected empty keyIndex",
169 zap.String("key", string(ki.key)),
170 )
171 }
172 since := revision{rev, 0}
173 var gi int
174
175 for gi = len(ki.generations) - 1; gi > 0; gi-- {
176 g := ki.generations[gi]
177 if g.isEmpty() {
178 continue
179 }
180 if since.GreaterThan(g.created) {
181 break
182 }
183 }
184
185 var revs []revision
186 var last int64
187 for ; gi < len(ki.generations); gi++ {
188 for _, r := range ki.generations[gi].revs {
189 if since.GreaterThan(r) {
190 continue
191 }
192 if r.main == last {
193
194
195 revs[len(revs)-1] = r
196 continue
197 }
198 revs = append(revs, r)
199 last = r.main
200 }
201 }
202 return revs
203 }
204
205
206
207
208
209 func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
210 if ki.isEmpty() {
211 lg.Panic(
212 "'compact' got an unexpected empty keyIndex",
213 zap.String("key", string(ki.key)),
214 )
215 }
216
217 genIdx, revIndex := ki.doCompact(atRev, available)
218
219 g := &ki.generations[genIdx]
220 if !g.isEmpty() {
221
222 if revIndex != -1 {
223 g.revs = g.revs[revIndex:]
224 }
225
226 if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
227 delete(available, g.revs[0])
228 genIdx++
229 }
230 }
231
232
233 ki.generations = ki.generations[genIdx:]
234 }
235
236
237 func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
238 if ki.isEmpty() {
239 return
240 }
241
242 genIdx, revIndex := ki.doCompact(atRev, available)
243 g := &ki.generations[genIdx]
244 if !g.isEmpty() {
245
246 if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
247 delete(available, g.revs[revIndex])
248 }
249 }
250 }
251
252 func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
253
254
255 f := func(rev revision) bool {
256 if rev.main <= atRev {
257 available[rev] = struct{}{}
258 return false
259 }
260 return true
261 }
262
263 genIdx, g := 0, &ki.generations[0]
264
265 for genIdx < len(ki.generations)-1 {
266 if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
267 break
268 }
269 genIdx++
270 g = &ki.generations[genIdx]
271 }
272
273 revIndex = g.walk(f)
274
275 return genIdx, revIndex
276 }
277
278 func (ki *keyIndex) isEmpty() bool {
279 return len(ki.generations) == 1 && ki.generations[0].isEmpty()
280 }
281
282
283
284
285 func (ki *keyIndex) findGeneration(rev int64) *generation {
286 lastg := len(ki.generations) - 1
287 cg := lastg
288
289 for cg >= 0 {
290 if len(ki.generations[cg].revs) == 0 {
291 cg--
292 continue
293 }
294 g := ki.generations[cg]
295 if cg != lastg {
296 if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {
297 return nil
298 }
299 }
300 if g.revs[0].main <= rev {
301 return &ki.generations[cg]
302 }
303 cg--
304 }
305 return nil
306 }
307
308 func (ki *keyIndex) Less(b btree.Item) bool {
309 return bytes.Compare(ki.key, b.(*keyIndex).key) == -1
310 }
311
312 func (ki *keyIndex) equal(b *keyIndex) bool {
313 if !bytes.Equal(ki.key, b.key) {
314 return false
315 }
316 if ki.modified != b.modified {
317 return false
318 }
319 if len(ki.generations) != len(b.generations) {
320 return false
321 }
322 for i := range ki.generations {
323 ag, bg := ki.generations[i], b.generations[i]
324 if !ag.equal(bg) {
325 return false
326 }
327 }
328 return true
329 }
330
331 func (ki *keyIndex) String() string {
332 var s string
333 for _, g := range ki.generations {
334 s += g.String()
335 }
336 return s
337 }
338
339
340 type generation struct {
341 ver int64
342 created revision
343 revs []revision
344 }
345
346 func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
347
348
349
350
351
352
353 func (g *generation) walk(f func(rev revision) bool) int {
354 l := len(g.revs)
355 for i := range g.revs {
356 ok := f(g.revs[l-i-1])
357 if !ok {
358 return l - i - 1
359 }
360 }
361 return -1
362 }
363
364 func (g *generation) String() string {
365 return fmt.Sprintf("g: created[%d] ver[%d], revs %#v\n", g.created, g.ver, g.revs)
366 }
367
368 func (g generation) equal(b generation) bool {
369 if g.ver != b.ver {
370 return false
371 }
372 if len(g.revs) != len(b.revs) {
373 return false
374 }
375
376 for i := range g.revs {
377 ar, br := g.revs[i], b.revs[i]
378 if ar != br {
379 return false
380 }
381 }
382 return true
383 }
384
View as plain text