1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "math/rand"
11 "runtime"
12 "sync"
13 "sync/atomic"
14
15 "github.com/syndtr/goleveldb/leveldb/iterator"
16 "github.com/syndtr/goleveldb/leveldb/opt"
17 "github.com/syndtr/goleveldb/leveldb/util"
18 )
19
20 type memdbReleaser struct {
21 once sync.Once
22 m *memDB
23 }
24
25 func (mr *memdbReleaser) Release() {
26 mr.once.Do(func() {
27 mr.m.decref()
28 })
29 }
30
31 func (db *DB) newRawIterator(auxm *memDB, auxt tFiles, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
32 strict := opt.GetStrict(db.s.o.Options, ro, opt.StrictReader)
33 em, fm := db.getMems()
34 v := db.s.version()
35
36 tableIts := v.getIterators(slice, ro)
37 n := len(tableIts) + len(auxt) + 3
38 its := make([]iterator.Iterator, 0, n)
39
40 if auxm != nil {
41 ami := auxm.NewIterator(slice)
42 ami.SetReleaser(&memdbReleaser{m: auxm})
43 its = append(its, ami)
44 }
45 for _, t := range auxt {
46 its = append(its, v.s.tops.newIterator(t, slice, ro))
47 }
48
49 emi := em.NewIterator(slice)
50 emi.SetReleaser(&memdbReleaser{m: em})
51 its = append(its, emi)
52 if fm != nil {
53 fmi := fm.NewIterator(slice)
54 fmi.SetReleaser(&memdbReleaser{m: fm})
55 its = append(its, fmi)
56 }
57 its = append(its, tableIts...)
58 mi := iterator.NewMergedIterator(its, db.s.icmp, strict)
59 mi.SetReleaser(&versionReleaser{v: v})
60 return mi
61 }
62
63 func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter {
64 var islice *util.Range
65 if slice != nil {
66 islice = &util.Range{}
67 if slice.Start != nil {
68 islice.Start = makeInternalKey(nil, slice.Start, keyMaxSeq, keyTypeSeek)
69 }
70 if slice.Limit != nil {
71 islice.Limit = makeInternalKey(nil, slice.Limit, keyMaxSeq, keyTypeSeek)
72 }
73 }
74 rawIter := db.newRawIterator(auxm, auxt, islice, ro)
75 iter := &dbIter{
76 db: db,
77 icmp: db.s.icmp,
78 iter: rawIter,
79 seq: seq,
80 strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
81 disableSampling: db.s.o.GetDisableSeeksCompaction() || db.s.o.GetIteratorSamplingRate() <= 0,
82 key: make([]byte, 0),
83 value: make([]byte, 0),
84 }
85 if !iter.disableSampling {
86 iter.samplingGap = db.iterSamplingRate()
87 }
88 atomic.AddInt32(&db.aliveIters, 1)
89 runtime.SetFinalizer(iter, (*dbIter).Release)
90 return iter
91 }
92
93 func (db *DB) iterSamplingRate() int {
94 return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
95 }
96
97 type dir int
98
99 const (
100 dirReleased dir = iota - 1
101 dirSOI
102 dirEOI
103 dirBackward
104 dirForward
105 )
106
107
108 type dbIter struct {
109 db *DB
110 icmp *iComparer
111 iter iterator.Iterator
112 seq uint64
113 strict bool
114 disableSampling bool
115
116 samplingGap int
117 dir dir
118 key []byte
119 value []byte
120 err error
121 releaser util.Releaser
122 }
123
124 func (i *dbIter) sampleSeek() {
125 if i.disableSampling {
126 return
127 }
128
129 ikey := i.iter.Key()
130 i.samplingGap -= len(ikey) + len(i.iter.Value())
131 for i.samplingGap < 0 {
132 i.samplingGap += i.db.iterSamplingRate()
133 i.db.sampleSeek(ikey)
134 }
135 }
136
137 func (i *dbIter) setErr(err error) {
138 i.err = err
139 i.key = nil
140 i.value = nil
141 }
142
143 func (i *dbIter) iterErr() {
144 if err := i.iter.Error(); err != nil {
145 i.setErr(err)
146 }
147 }
148
149 func (i *dbIter) Valid() bool {
150 return i.err == nil && i.dir > dirEOI
151 }
152
153 func (i *dbIter) First() bool {
154 if i.err != nil {
155 return false
156 } else if i.dir == dirReleased {
157 i.err = ErrIterReleased
158 return false
159 }
160
161 if i.iter.First() {
162 i.dir = dirSOI
163 return i.next()
164 }
165 i.dir = dirEOI
166 i.iterErr()
167 return false
168 }
169
170 func (i *dbIter) Last() bool {
171 if i.err != nil {
172 return false
173 } else if i.dir == dirReleased {
174 i.err = ErrIterReleased
175 return false
176 }
177
178 if i.iter.Last() {
179 return i.prev()
180 }
181 i.dir = dirSOI
182 i.iterErr()
183 return false
184 }
185
186 func (i *dbIter) Seek(key []byte) bool {
187 if i.err != nil {
188 return false
189 } else if i.dir == dirReleased {
190 i.err = ErrIterReleased
191 return false
192 }
193
194 ikey := makeInternalKey(nil, key, i.seq, keyTypeSeek)
195 if i.iter.Seek(ikey) {
196 i.dir = dirSOI
197 return i.next()
198 }
199 i.dir = dirEOI
200 i.iterErr()
201 return false
202 }
203
204 func (i *dbIter) next() bool {
205 for {
206 if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
207 i.sampleSeek()
208 if seq <= i.seq {
209 switch kt {
210 case keyTypeDel:
211
212 i.key = append(i.key[:0], ukey...)
213 i.dir = dirForward
214 case keyTypeVal:
215 if i.dir == dirSOI || i.icmp.uCompare(ukey, i.key) > 0 {
216 i.key = append(i.key[:0], ukey...)
217 i.value = append(i.value[:0], i.iter.Value()...)
218 i.dir = dirForward
219 return true
220 }
221 }
222 }
223 } else if i.strict {
224 i.setErr(kerr)
225 break
226 }
227 if !i.iter.Next() {
228 i.dir = dirEOI
229 i.iterErr()
230 break
231 }
232 }
233 return false
234 }
235
236 func (i *dbIter) Next() bool {
237 if i.dir == dirEOI || i.err != nil {
238 return false
239 } else if i.dir == dirReleased {
240 i.err = ErrIterReleased
241 return false
242 }
243
244 if !i.iter.Next() || (i.dir == dirBackward && !i.iter.Next()) {
245 i.dir = dirEOI
246 i.iterErr()
247 return false
248 }
249 return i.next()
250 }
251
252 func (i *dbIter) prev() bool {
253 i.dir = dirBackward
254 del := true
255 if i.iter.Valid() {
256 for {
257 if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
258 i.sampleSeek()
259 if seq <= i.seq {
260 if !del && i.icmp.uCompare(ukey, i.key) < 0 {
261 return true
262 }
263 del = (kt == keyTypeDel)
264 if !del {
265 i.key = append(i.key[:0], ukey...)
266 i.value = append(i.value[:0], i.iter.Value()...)
267 }
268 }
269 } else if i.strict {
270 i.setErr(kerr)
271 return false
272 }
273 if !i.iter.Prev() {
274 break
275 }
276 }
277 }
278 if del {
279 i.dir = dirSOI
280 i.iterErr()
281 return false
282 }
283 return true
284 }
285
286 func (i *dbIter) Prev() bool {
287 if i.dir == dirSOI || i.err != nil {
288 return false
289 } else if i.dir == dirReleased {
290 i.err = ErrIterReleased
291 return false
292 }
293
294 switch i.dir {
295 case dirEOI:
296 return i.Last()
297 case dirForward:
298 for i.iter.Prev() {
299 if ukey, _, _, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
300 i.sampleSeek()
301 if i.icmp.uCompare(ukey, i.key) < 0 {
302 goto cont
303 }
304 } else if i.strict {
305 i.setErr(kerr)
306 return false
307 }
308 }
309 i.dir = dirSOI
310 i.iterErr()
311 return false
312 }
313
314 cont:
315 return i.prev()
316 }
317
318 func (i *dbIter) Key() []byte {
319 if i.err != nil || i.dir <= dirEOI {
320 return nil
321 }
322 return i.key
323 }
324
325 func (i *dbIter) Value() []byte {
326 if i.err != nil || i.dir <= dirEOI {
327 return nil
328 }
329 return i.value
330 }
331
332 func (i *dbIter) Release() {
333 if i.dir != dirReleased {
334
335 runtime.SetFinalizer(i, nil)
336
337 if i.releaser != nil {
338 i.releaser.Release()
339 i.releaser = nil
340 }
341
342 i.dir = dirReleased
343 i.key = nil
344 i.value = nil
345 i.iter.Release()
346 i.iter = nil
347 atomic.AddInt32(&i.db.aliveIters, -1)
348 i.db = nil
349 }
350 }
351
352 func (i *dbIter) SetReleaser(releaser util.Releaser) {
353 if i.dir == dirReleased {
354 panic(util.ErrReleased)
355 }
356 if i.releaser != nil && releaser != nil {
357 panic(util.ErrHasReleaser)
358 }
359 i.releaser = releaser
360 }
361
362 func (i *dbIter) Error() error {
363 return i.err
364 }
365
View as plain text