1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "fmt"
11 "sync/atomic"
12 "time"
13 "unsafe"
14
15 "github.com/syndtr/goleveldb/leveldb/iterator"
16 "github.com/syndtr/goleveldb/leveldb/opt"
17 "github.com/syndtr/goleveldb/leveldb/util"
18 )
19
20 type tSet struct {
21 level int
22 table *tFile
23 }
24
25 type version struct {
26 id int64
27 s *session
28
29 levels []tFiles
30
31
32
33
34 cLevel int
35 cScore float64
36
37 cSeek unsafe.Pointer
38
39 closing bool
40 ref int
41 released bool
42 }
43
44
45 func newVersion(s *session) *version {
46 id := atomic.AddInt64(&s.ntVersionID, 1)
47 nv := &version{s: s, id: id - 1}
48 return nv
49 }
50
51 func (v *version) incref() {
52 if v.released {
53 panic("already released")
54 }
55
56 v.ref++
57 if v.ref == 1 {
58 select {
59 case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
60
61 case <-v.s.closeC:
62 v.s.log("reference loop already exist")
63 }
64 }
65 }
66
67 func (v *version) releaseNB() {
68 v.ref--
69 if v.ref > 0 {
70 return
71 } else if v.ref < 0 {
72 panic("negative version ref")
73 }
74 select {
75 case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}:
76
77 case <-v.s.closeC:
78 v.s.log("reference loop already exist")
79 }
80
81 v.released = true
82 }
83
84 func (v *version) release() {
85 v.s.vmu.Lock()
86 v.releaseNB()
87 v.s.vmu.Unlock()
88 }
89
90 func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) {
91 ukey := ikey.ukey()
92
93
94 if aux != nil {
95 for _, t := range aux {
96 if t.overlaps(v.s.icmp, ukey, ukey) {
97 if !f(-1, t) {
98 return
99 }
100 }
101 }
102
103 if lf != nil && !lf(-1) {
104 return
105 }
106 }
107
108
109 for level, tables := range v.levels {
110 if len(tables) == 0 {
111 continue
112 }
113
114 if level == 0 {
115
116
117 for _, t := range tables {
118 if t.overlaps(v.s.icmp, ukey, ukey) {
119 if !f(level, t) {
120 return
121 }
122 }
123 }
124 } else {
125 if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) {
126 t := tables[i]
127 if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
128 if !f(level, t) {
129 return
130 }
131 }
132 }
133 }
134
135 if lf != nil && !lf(level) {
136 return
137 }
138 }
139 }
140
141 func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
142 if v.closing {
143 return nil, false, ErrClosed
144 }
145
146 ukey := ikey.ukey()
147 sampleSeeks := !v.s.o.GetDisableSeeksCompaction()
148
149 var (
150 tset *tSet
151 tseek bool
152
153
154 zfound bool
155 zseq uint64
156 zkt keyType
157 zval []byte
158 )
159
160 err = ErrNotFound
161
162
163
164 v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
165 if sampleSeeks && level >= 0 && !tseek {
166 if tset == nil {
167 tset = &tSet{level, t}
168 } else {
169 tseek = true
170 }
171 }
172
173 var (
174 fikey, fval []byte
175 ferr error
176 )
177 if noValue {
178 fikey, ferr = v.s.tops.findKey(t, ikey, ro)
179 } else {
180 fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
181 }
182
183 switch ferr {
184 case nil:
185 case ErrNotFound:
186 return true
187 default:
188 err = ferr
189 return false
190 }
191
192 if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil {
193 if v.s.icmp.uCompare(ukey, fukey) == 0 {
194
195 if level <= 0 {
196 if fseq >= zseq {
197 zfound = true
198 zseq = fseq
199 zkt = fkt
200 zval = fval
201 }
202 } else {
203 switch fkt {
204 case keyTypeVal:
205 value = fval
206 err = nil
207 case keyTypeDel:
208 default:
209 panic("leveldb: invalid internalKey type")
210 }
211 return false
212 }
213 }
214 } else {
215 err = fkerr
216 return false
217 }
218
219 return true
220 }, func(level int) bool {
221 if zfound {
222 switch zkt {
223 case keyTypeVal:
224 value = zval
225 err = nil
226 case keyTypeDel:
227 default:
228 panic("leveldb: invalid internalKey type")
229 }
230 return false
231 }
232
233 return true
234 })
235
236 if tseek && tset.table.consumeSeek() <= 0 {
237 tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
238 }
239
240 return
241 }
242
243 func (v *version) sampleSeek(ikey internalKey) (tcomp bool) {
244 var tset *tSet
245
246 v.walkOverlapping(nil, ikey, func(level int, t *tFile) bool {
247 if tset == nil {
248 tset = &tSet{level, t}
249 return true
250 }
251 if tset.table.consumeSeek() <= 0 {
252 tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
253 }
254 return false
255 }, nil)
256
257 return
258 }
259
260 func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) {
261 strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader)
262 for level, tables := range v.levels {
263 if level == 0 {
264
265 for _, t := range tables {
266 its = append(its, v.s.tops.newIterator(t, slice, ro))
267 }
268 } else if len(tables) != 0 {
269 its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict))
270 }
271 }
272 return
273 }
274
275 func (v *version) newStaging() *versionStaging {
276 return &versionStaging{base: v}
277 }
278
279
280 func (v *version) spawn(r *sessionRecord, trivial bool) *version {
281 staging := v.newStaging()
282 staging.commit(r)
283 return staging.finish(trivial)
284 }
285
286 func (v *version) fillRecord(r *sessionRecord) {
287 for level, tables := range v.levels {
288 for _, t := range tables {
289 r.addTableFile(level, t)
290 }
291 }
292 }
293
294 func (v *version) tLen(level int) int {
295 if level < len(v.levels) {
296 return len(v.levels[level])
297 }
298 return 0
299 }
300
301 func (v *version) offsetOf(ikey internalKey) (n int64, err error) {
302 for level, tables := range v.levels {
303 for _, t := range tables {
304 if v.s.icmp.Compare(t.imax, ikey) <= 0 {
305
306 n += t.size
307 } else if v.s.icmp.Compare(t.imin, ikey) > 0 {
308
309 if level > 0 {
310
311
312
313 break
314 }
315 } else {
316
317
318 if m, err := v.s.tops.offsetOf(t, ikey); err == nil {
319 n += m
320 } else {
321 return 0, err
322 }
323 }
324 }
325 }
326
327 return
328 }
329
330 func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
331 if maxLevel > 0 {
332 if len(v.levels) == 0 {
333 return maxLevel
334 }
335 if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) {
336 var overlaps tFiles
337 for ; level < maxLevel; level++ {
338 if pLevel := level + 1; pLevel >= len(v.levels) {
339 return maxLevel
340 } else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) {
341 break
342 }
343 if gpLevel := level + 2; gpLevel < len(v.levels) {
344 overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
345 if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) {
346 break
347 }
348 }
349 }
350 }
351 }
352 return
353 }
354
355 func (v *version) computeCompaction() {
356
357 bestLevel := int(-1)
358 bestScore := float64(-1)
359
360 statFiles := make([]int, len(v.levels))
361 statSizes := make([]string, len(v.levels))
362 statScore := make([]string, len(v.levels))
363 statTotSize := int64(0)
364
365 for level, tables := range v.levels {
366 var score float64
367 size := tables.size()
368 if level == 0 {
369
370
371
372
373
374
375
376
377
378
379
380 score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
381 } else {
382 score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
383 }
384
385 if score > bestScore {
386 bestLevel = level
387 bestScore = score
388 }
389
390 statFiles[level] = len(tables)
391 statSizes[level] = shortenb(size)
392 statScore[level] = fmt.Sprintf("%.2f", score)
393 statTotSize += size
394 }
395
396 v.cLevel = bestLevel
397 v.cScore = bestScore
398
399 v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(statTotSize), statSizes, statScore)
400 }
401
402 func (v *version) needCompaction() bool {
403 return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil
404 }
405
406 type tablesScratch struct {
407 added map[int64]atRecord
408 deleted map[int64]struct{}
409 }
410
411 type versionStaging struct {
412 base *version
413 levels []tablesScratch
414 }
415
416 func (p *versionStaging) getScratch(level int) *tablesScratch {
417 if level >= len(p.levels) {
418 newLevels := make([]tablesScratch, level+1)
419 copy(newLevels, p.levels)
420 p.levels = newLevels
421 }
422 return &(p.levels[level])
423 }
424
425 func (p *versionStaging) commit(r *sessionRecord) {
426
427 for _, r := range r.deletedTables {
428 scratch := p.getScratch(r.level)
429 if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 {
430 if scratch.deleted == nil {
431 scratch.deleted = make(map[int64]struct{})
432 }
433 scratch.deleted[r.num] = struct{}{}
434 }
435 if scratch.added != nil {
436 delete(scratch.added, r.num)
437 }
438 }
439
440
441 for _, r := range r.addedTables {
442 scratch := p.getScratch(r.level)
443 if scratch.added == nil {
444 scratch.added = make(map[int64]atRecord)
445 }
446 scratch.added[r.num] = r
447 if scratch.deleted != nil {
448 delete(scratch.deleted, r.num)
449 }
450 }
451 }
452
453 func (p *versionStaging) finish(trivial bool) *version {
454
455 nv := newVersion(p.base.s)
456 numLevel := len(p.levels)
457 if len(p.base.levels) > numLevel {
458 numLevel = len(p.base.levels)
459 }
460 nv.levels = make([]tFiles, numLevel)
461 for level := 0; level < numLevel; level++ {
462 var baseTabels tFiles
463 if level < len(p.base.levels) {
464 baseTabels = p.base.levels[level]
465 }
466
467 if level < len(p.levels) {
468 scratch := p.levels[level]
469
470
471 if len(scratch.added) == 0 && len(scratch.deleted) == 0 {
472 nv.levels[level] = baseTabels
473 continue
474 }
475
476 var nt tFiles
477
478 if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
479 nt = make(tFiles, 0, n)
480 }
481
482
483 for _, t := range baseTabels {
484 if _, ok := scratch.deleted[t.fd.Num]; ok {
485 continue
486 }
487 if _, ok := scratch.added[t.fd.Num]; ok {
488 continue
489 }
490 nt = append(nt, t)
491 }
492
493
494 if len(scratch.added) == 0 {
495 nv.levels[level] = nt
496 continue
497 }
498
499
500
501
502
503
504
505
506
507
508
509 if trivial && len(scratch.added) > 0 {
510 added := make(tFiles, 0, len(scratch.added))
511 for _, r := range scratch.added {
512 added = append(added, tableFileFromRecord(r))
513 }
514 if level == 0 {
515 added.sortByNum()
516 index := nt.searchNumLess(added[len(added)-1].fd.Num)
517 nt = append(nt[:index], append(added, nt[index:]...)...)
518 } else {
519 added.sortByKey(p.base.s.icmp)
520 _, amax := added.getRange(p.base.s.icmp)
521 index := nt.searchMin(p.base.s.icmp, amax)
522 nt = append(nt[:index], append(added, nt[index:]...)...)
523 }
524 nv.levels[level] = nt
525 continue
526 }
527
528
529 for _, r := range scratch.added {
530 nt = append(nt, tableFileFromRecord(r))
531 }
532
533 if len(nt) != 0 {
534
535 if level == 0 {
536 nt.sortByNum()
537 } else {
538 nt.sortByKey(p.base.s.icmp)
539 }
540
541 nv.levels[level] = nt
542 }
543 } else {
544 nv.levels[level] = baseTabels
545 }
546 }
547
548
549 n := len(nv.levels)
550 for ; n > 0 && nv.levels[n-1] == nil; n-- {
551 }
552 nv.levels = nv.levels[:n]
553
554
555 nv.computeCompaction()
556
557 return nv
558 }
559
560 type versionReleaser struct {
561 v *version
562 once bool
563 }
564
565 func (vr *versionReleaser) Release() {
566 v := vr.v
567 v.s.vmu.Lock()
568 if !vr.once {
569 v.releaseNB()
570 vr.once = true
571 }
572 v.s.vmu.Unlock()
573 }
574
View as plain text