1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "bytes"
11 "fmt"
12 "sort"
13 "sync/atomic"
14
15 "github.com/syndtr/goleveldb/leveldb/cache"
16 "github.com/syndtr/goleveldb/leveldb/iterator"
17 "github.com/syndtr/goleveldb/leveldb/opt"
18 "github.com/syndtr/goleveldb/leveldb/storage"
19 "github.com/syndtr/goleveldb/leveldb/table"
20 "github.com/syndtr/goleveldb/leveldb/util"
21 )
22
23
24 type tFile struct {
25 fd storage.FileDesc
26 seekLeft int32
27 size int64
28 imin, imax internalKey
29 }
30
31
32 func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
33 return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
34 }
35
36
37 func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
38 return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
39 }
40
41
42 func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
43 return !t.after(icmp, umin) && !t.before(icmp, umax)
44 }
45
46
47 func (t *tFile) consumeSeek() int32 {
48 return atomic.AddInt32(&t.seekLeft, -1)
49 }
50
51
52 func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
53 f := &tFile{
54 fd: fd,
55 size: size,
56 imin: imin,
57 imax: imax,
58 }
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 f.seekLeft = int32(size / 16384)
74 if f.seekLeft < 100 {
75 f.seekLeft = 100
76 }
77
78 return f
79 }
80
81 func tableFileFromRecord(r atRecord) *tFile {
82 return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
83 }
84
85
86 type tFiles []*tFile
87
88 func (tf tFiles) Len() int { return len(tf) }
89 func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
90
91
92
93 func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
94 a, b := tf[i], tf[j]
95 n := icmp.Compare(a.imin, b.imin)
96 if n == 0 {
97 return a.fd.Num < b.fd.Num
98 }
99 return n < 0
100 }
101
102
103
104 func (tf tFiles) lessByNum(i, j int) bool {
105 return tf[i].fd.Num > tf[j].fd.Num
106 }
107
108
109 func (tf tFiles) sortByKey(icmp *iComparer) {
110 sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
111 }
112
113
114 func (tf tFiles) sortByNum() {
115 sort.Sort(&tFilesSortByNum{tFiles: tf})
116 }
117
118
119 func (tf tFiles) size() (sum int64) {
120 for _, t := range tf {
121 sum += t.size
122 }
123 return sum
124 }
125
126
127
128 func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
129 return sort.Search(len(tf), func(i int) bool {
130 return icmp.Compare(tf[i].imin, ikey) >= 0
131 })
132 }
133
134
135
136 func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
137 return sort.Search(len(tf), func(i int) bool {
138 return icmp.Compare(tf[i].imax, ikey) >= 0
139 })
140 }
141
142
143
144 func (tf tFiles) searchNumLess(num int64) int {
145 return sort.Search(len(tf), func(i int) bool {
146 return tf[i].fd.Num < num
147 })
148 }
149
150
151
152 func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int {
153 return sort.Search(len(tf), func(i int) bool {
154 return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0
155 })
156 }
157
158
159
160 func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int {
161 return sort.Search(len(tf), func(i int) bool {
162 return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0
163 })
164 }
165
166
167
168 func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
169 if unsorted {
170
171 for _, t := range tf {
172 if t.overlaps(icmp, umin, umax) {
173 return true
174 }
175 }
176 return false
177 }
178
179 i := 0
180 if len(umin) > 0 {
181
182 i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
183 }
184 if i >= len(tf) {
185
186 return false
187 }
188 return !tf[i].before(icmp, umax)
189 }
190
191
192
193
194
195
196 func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
197
198 if len(tf) == 0 {
199 return nil
200 }
201
202
203
204 if !overlapped {
205 var begin, end int
206
207 if umin != nil {
208 index := tf.searchMinUkey(icmp, umin)
209 if index == 0 {
210 begin = 0
211 } else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 {
212
213 begin = index - 1
214 } else {
215 begin = index
216 }
217 }
218
219 if umax != nil {
220 index := tf.searchMaxUkey(icmp, umax)
221 if index == len(tf) {
222 end = len(tf)
223 } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 {
224
225 end = index + 1
226 } else {
227 end = index
228 }
229 } else {
230 end = len(tf)
231 }
232
233 if begin >= end {
234 return nil
235 }
236 dst = make([]*tFile, end-begin)
237 copy(dst, tf[begin:end])
238 return dst
239 }
240
241 dst = dst[:0]
242 for i := 0; i < len(tf); {
243 t := tf[i]
244 if t.overlaps(icmp, umin, umax) {
245 if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
246 umin = t.imin.ukey()
247 dst = dst[:0]
248 i = 0
249 continue
250 } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
251 umax = t.imax.ukey()
252
253 dst = dst[:0]
254 i = 0
255 continue
256 }
257
258 dst = append(dst, t)
259 }
260 i++
261 }
262
263 return dst
264 }
265
266
267 func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
268 for i, t := range tf {
269 if i == 0 {
270 imin, imax = t.imin, t.imax
271 continue
272 }
273 if icmp.Compare(t.imin, imin) < 0 {
274 imin = t.imin
275 }
276 if icmp.Compare(t.imax, imax) > 0 {
277 imax = t.imax
278 }
279 }
280
281 return
282 }
283
284
285 func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
286 if slice != nil {
287 var start, limit int
288 if slice.Start != nil {
289 start = tf.searchMax(icmp, internalKey(slice.Start))
290 }
291 if slice.Limit != nil {
292 limit = tf.searchMin(icmp, internalKey(slice.Limit))
293 } else {
294 limit = tf.Len()
295 }
296 tf = tf[start:limit]
297 }
298 return iterator.NewArrayIndexer(&tFilesArrayIndexer{
299 tFiles: tf,
300 tops: tops,
301 icmp: icmp,
302 slice: slice,
303 ro: ro,
304 })
305 }
306
307
308 type tFilesArrayIndexer struct {
309 tFiles
310 tops *tOps
311 icmp *iComparer
312 slice *util.Range
313 ro *opt.ReadOptions
314 }
315
316 func (a *tFilesArrayIndexer) Search(key []byte) int {
317 return a.searchMax(a.icmp, internalKey(key))
318 }
319
320 func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
321 if i == 0 || i == a.Len()-1 {
322 return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
323 }
324 return a.tops.newIterator(a.tFiles[i], nil, a.ro)
325 }
326
327
328 type tFilesSortByKey struct {
329 tFiles
330 icmp *iComparer
331 }
332
333 func (x *tFilesSortByKey) Less(i, j int) bool {
334 return x.lessByKey(x.icmp, i, j)
335 }
336
337
338 type tFilesSortByNum struct {
339 tFiles
340 }
341
342 func (x *tFilesSortByNum) Less(i, j int) bool {
343 return x.lessByNum(i, j)
344 }
345
346
347 type tOps struct {
348 s *session
349 noSync bool
350 evictRemoved bool
351 fileCache *cache.Cache
352 blockCache *cache.Cache
353 blockBuffer *util.BufferPool
354 }
355
356
357 func (t *tOps) create(tSize int) (*tWriter, error) {
358 fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
359 fw, err := t.s.stor.Create(fd)
360 if err != nil {
361 return nil, err
362 }
363 return &tWriter{
364 t: t,
365 fd: fd,
366 w: fw,
367 tw: table.NewWriter(fw, t.s.o.Options, t.blockBuffer, tSize),
368 }, nil
369 }
370
371
372 func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
373 w, err := t.create(0)
374 if err != nil {
375 return
376 }
377
378 defer func() {
379 if err != nil {
380 if derr := w.drop(); derr != nil {
381 err = fmt.Errorf("error createFrom (%v); error dropping (%v)", err, derr)
382 }
383 }
384 }()
385
386 for src.Next() {
387 err = w.append(src.Key(), src.Value())
388 if err != nil {
389 return
390 }
391 }
392 err = src.Error()
393 if err != nil {
394 return
395 }
396
397 n = w.tw.EntriesLen()
398 f, err = w.finish()
399 return
400 }
401
402
403
404 func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
405 ch = t.fileCache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
406 var r storage.Reader
407 r, err = t.s.stor.Open(f.fd)
408 if err != nil {
409 return 0, nil
410 }
411
412 var blockCache *cache.NamespaceGetter
413 if t.blockCache != nil {
414 blockCache = &cache.NamespaceGetter{Cache: t.blockCache, NS: uint64(f.fd.Num)}
415 }
416
417 var tr *table.Reader
418 tr, err = table.NewReader(r, f.size, f.fd, blockCache, t.blockBuffer, t.s.o.Options)
419 if err != nil {
420 _ = r.Close()
421 return 0, nil
422 }
423 return 1, tr
424
425 })
426 if ch == nil && err == nil {
427 err = ErrClosed
428 }
429 return
430 }
431
432
433
434 func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
435 ch, err := t.open(f)
436 if err != nil {
437 return nil, nil, err
438 }
439 defer ch.Release()
440 return ch.Value().(*table.Reader).Find(key, true, ro)
441 }
442
443
444 func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
445 ch, err := t.open(f)
446 if err != nil {
447 return nil, err
448 }
449 defer ch.Release()
450 return ch.Value().(*table.Reader).FindKey(key, true, ro)
451 }
452
453
454 func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
455 ch, err := t.open(f)
456 if err != nil {
457 return
458 }
459 defer ch.Release()
460 return ch.Value().(*table.Reader).OffsetOf(key)
461 }
462
463
464 func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
465 ch, err := t.open(f)
466 if err != nil {
467 return iterator.NewEmptyIterator(err)
468 }
469 iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
470 iter.SetReleaser(ch)
471 return iter
472 }
473
474
475
476 func (t *tOps) remove(fd storage.FileDesc) {
477 t.fileCache.Delete(0, uint64(fd.Num), func() {
478 if err := t.s.stor.Remove(fd); err != nil {
479 t.s.logf("table@remove removing @%d %q", fd.Num, err)
480 } else {
481 t.s.logf("table@remove removed @%d", fd.Num)
482 }
483 if t.evictRemoved && t.blockCache != nil {
484 t.blockCache.EvictNS(uint64(fd.Num))
485 }
486
487 t.s.reuseFileNum(fd.Num)
488 })
489 }
490
491
492
493 func (t *tOps) close() {
494 t.fileCache.Close(true)
495 if t.blockCache != nil {
496 t.blockCache.Close(false)
497 }
498 }
499
500
501 func newTableOps(s *session) *tOps {
502 var (
503 fileCacher cache.Cacher
504 blockCache *cache.Cache
505 blockBuffer *util.BufferPool
506 )
507 if s.o.GetOpenFilesCacheCapacity() > 0 {
508 fileCacher = s.o.GetOpenFilesCacher().New(s.o.GetOpenFilesCacheCapacity())
509 }
510 if !s.o.GetDisableBlockCache() {
511 var blockCacher cache.Cacher
512 if s.o.GetBlockCacheCapacity() > 0 {
513 blockCacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
514 }
515 blockCache = cache.NewCache(blockCacher)
516 }
517 if !s.o.GetDisableBufferPool() {
518 blockBuffer = util.NewBufferPool(s.o.GetBlockSize() + 5)
519 }
520 return &tOps{
521 s: s,
522 noSync: s.o.GetNoSync(),
523 evictRemoved: s.o.GetBlockCacheEvictRemoved(),
524 fileCache: cache.NewCache(fileCacher),
525 blockCache: blockCache,
526 blockBuffer: blockBuffer,
527 }
528 }
529
530
531
532 type tWriter struct {
533 t *tOps
534
535 fd storage.FileDesc
536 w storage.Writer
537 tw *table.Writer
538
539 first, last []byte
540 }
541
542
543 func (w *tWriter) append(key, value []byte) error {
544 if w.first == nil {
545 w.first = append([]byte(nil), key...)
546 }
547 w.last = append(w.last[:0], key...)
548 return w.tw.Append(key, value)
549 }
550
551
552 func (w *tWriter) empty() bool {
553 return w.first == nil
554 }
555
556
557 func (w *tWriter) close() error {
558 if w.w != nil {
559 if err := w.w.Close(); err != nil {
560 return err
561 }
562 w.w = nil
563 }
564 return nil
565 }
566
567
568 func (w *tWriter) finish() (f *tFile, err error) {
569 defer func() {
570 if cerr := w.close(); cerr != nil {
571 if err == nil {
572 err = cerr
573 } else {
574 err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, cerr)
575 }
576 }
577 }()
578 err = w.tw.Close()
579 if err != nil {
580 return
581 }
582 if !w.t.noSync {
583 err = w.w.Sync()
584 if err != nil {
585 return
586 }
587 }
588 f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
589 return
590 }
591
592
593 func (w *tWriter) drop() error {
594 if err := w.close(); err != nil {
595 return err
596 }
597 w.tw = nil
598 w.first = nil
599 w.last = nil
600 if err := w.t.s.stor.Remove(w.fd); err != nil {
601 return err
602 }
603 w.t.s.reuseFileNum(w.fd.Num)
604 return nil
605 }
606
View as plain text