...
1
2
3
4
5
6
7 package iterator
8
9 import (
10 "container/heap"
11
12 "github.com/syndtr/goleveldb/leveldb/comparer"
13 "github.com/syndtr/goleveldb/leveldb/errors"
14 "github.com/syndtr/goleveldb/leveldb/util"
15 )
16
17 type dir int
18
19 const (
20 dirReleased dir = iota - 1
21 dirSOI
22 dirEOI
23 dirBackward
24 dirForward
25 )
26
27 type mergedIterator struct {
28 cmp comparer.Comparer
29 iters []Iterator
30 strict bool
31
32 keys [][]byte
33 index int
34 dir dir
35 err error
36 errf func(err error)
37 releaser util.Releaser
38
39 indexes []int
40 reverse bool
41 }
42
43 func assertKey(key []byte) []byte {
44 if key == nil {
45 panic("leveldb/iterator: nil key")
46 }
47 return key
48 }
49
50 func (i *mergedIterator) iterErr(iter Iterator) bool {
51 if err := iter.Error(); err != nil {
52 if i.errf != nil {
53 i.errf(err)
54 }
55 if i.strict || !errors.IsCorrupted(err) {
56 i.err = err
57 return true
58 }
59 }
60 return false
61 }
62
63 func (i *mergedIterator) Valid() bool {
64 return i.err == nil && i.dir > dirEOI
65 }
66
67 func (i *mergedIterator) First() bool {
68 if i.err != nil {
69 return false
70 } else if i.dir == dirReleased {
71 i.err = ErrIterReleased
72 return false
73 }
74
75 h := i.indexHeap()
76 h.Reset(false)
77 for x, iter := range i.iters {
78 switch {
79 case iter.First():
80 i.keys[x] = assertKey(iter.Key())
81 h.Push(x)
82 case i.iterErr(iter):
83 return false
84 default:
85 i.keys[x] = nil
86 }
87 }
88 heap.Init(h)
89 i.dir = dirSOI
90 return i.next()
91 }
92
93 func (i *mergedIterator) Last() bool {
94 if i.err != nil {
95 return false
96 } else if i.dir == dirReleased {
97 i.err = ErrIterReleased
98 return false
99 }
100
101 h := i.indexHeap()
102 h.Reset(true)
103 for x, iter := range i.iters {
104 switch {
105 case iter.Last():
106 i.keys[x] = assertKey(iter.Key())
107 h.Push(x)
108 case i.iterErr(iter):
109 return false
110 default:
111 i.keys[x] = nil
112 }
113 }
114 heap.Init(h)
115 i.dir = dirEOI
116 return i.prev()
117 }
118
119 func (i *mergedIterator) Seek(key []byte) bool {
120 if i.err != nil {
121 return false
122 } else if i.dir == dirReleased {
123 i.err = ErrIterReleased
124 return false
125 }
126
127 h := i.indexHeap()
128 h.Reset(false)
129 for x, iter := range i.iters {
130 switch {
131 case iter.Seek(key):
132 i.keys[x] = assertKey(iter.Key())
133 h.Push(x)
134 case i.iterErr(iter):
135 return false
136 default:
137 i.keys[x] = nil
138 }
139 }
140 heap.Init(h)
141 i.dir = dirSOI
142 return i.next()
143 }
144
145 func (i *mergedIterator) next() bool {
146 h := i.indexHeap()
147 if h.Len() == 0 {
148 i.dir = dirEOI
149 return false
150 }
151 i.index = heap.Pop(h).(int)
152 i.dir = dirForward
153 return true
154 }
155
156 func (i *mergedIterator) Next() bool {
157 if i.dir == dirEOI || i.err != nil {
158 return false
159 } else if i.dir == dirReleased {
160 i.err = ErrIterReleased
161 return false
162 }
163
164 switch i.dir {
165 case dirSOI:
166 return i.First()
167 case dirBackward:
168 key := append([]byte(nil), i.keys[i.index]...)
169 if !i.Seek(key) {
170 return false
171 }
172 return i.Next()
173 }
174
175 x := i.index
176 iter := i.iters[x]
177 switch {
178 case iter.Next():
179 i.keys[x] = assertKey(iter.Key())
180 heap.Push(i.indexHeap(), x)
181 case i.iterErr(iter):
182 return false
183 default:
184 i.keys[x] = nil
185 }
186 return i.next()
187 }
188
189 func (i *mergedIterator) prev() bool {
190 h := i.indexHeap()
191 if h.Len() == 0 {
192 i.dir = dirSOI
193 return false
194 }
195 i.index = heap.Pop(h).(int)
196 i.dir = dirBackward
197 return true
198 }
199
200 func (i *mergedIterator) Prev() bool {
201 if i.dir == dirSOI || i.err != nil {
202 return false
203 } else if i.dir == dirReleased {
204 i.err = ErrIterReleased
205 return false
206 }
207
208 switch i.dir {
209 case dirEOI:
210 return i.Last()
211 case dirForward:
212 key := append([]byte(nil), i.keys[i.index]...)
213 h := i.indexHeap()
214 h.Reset(true)
215 for x, iter := range i.iters {
216 if x == i.index {
217 continue
218 }
219 seek := iter.Seek(key)
220 switch {
221 case seek && iter.Prev(), !seek && iter.Last():
222 i.keys[x] = assertKey(iter.Key())
223 h.Push(x)
224 case i.iterErr(iter):
225 return false
226 default:
227 i.keys[x] = nil
228 }
229 }
230 heap.Init(h)
231 }
232
233 x := i.index
234 iter := i.iters[x]
235 switch {
236 case iter.Prev():
237 i.keys[x] = assertKey(iter.Key())
238 heap.Push(i.indexHeap(), x)
239 case i.iterErr(iter):
240 return false
241 default:
242 i.keys[x] = nil
243 }
244 return i.prev()
245 }
246
247 func (i *mergedIterator) Key() []byte {
248 if i.err != nil || i.dir <= dirEOI {
249 return nil
250 }
251 return i.keys[i.index]
252 }
253
254 func (i *mergedIterator) Value() []byte {
255 if i.err != nil || i.dir <= dirEOI {
256 return nil
257 }
258 return i.iters[i.index].Value()
259 }
260
261 func (i *mergedIterator) Release() {
262 if i.dir != dirReleased {
263 i.dir = dirReleased
264 for _, iter := range i.iters {
265 iter.Release()
266 }
267 i.iters = nil
268 i.keys = nil
269 i.indexes = nil
270 if i.releaser != nil {
271 i.releaser.Release()
272 i.releaser = nil
273 }
274 }
275 }
276
277 func (i *mergedIterator) SetReleaser(releaser util.Releaser) {
278 if i.dir == dirReleased {
279 panic(util.ErrReleased)
280 }
281 if i.releaser != nil && releaser != nil {
282 panic(util.ErrHasReleaser)
283 }
284 i.releaser = releaser
285 }
286
287 func (i *mergedIterator) Error() error {
288 return i.err
289 }
290
291 func (i *mergedIterator) SetErrorCallback(f func(err error)) {
292 i.errf = f
293 }
294
295 func (i *mergedIterator) indexHeap() *indexHeap {
296 return (*indexHeap)(i)
297 }
298
299
300
301
302
303
304
305
306
307
308
309 func NewMergedIterator(iters []Iterator, cmp comparer.Comparer, strict bool) Iterator {
310 return &mergedIterator{
311 iters: iters,
312 cmp: cmp,
313 strict: strict,
314 keys: make([][]byte, len(iters)),
315 indexes: make([]int, 0, len(iters)),
316 }
317 }
318
319
320 type indexHeap mergedIterator
321
322 func (h *indexHeap) Len() int { return len(h.indexes) }
323 func (h *indexHeap) Less(i, j int) bool {
324 i, j = h.indexes[i], h.indexes[j]
325 r := h.cmp.Compare(h.keys[i], h.keys[j])
326 if h.reverse {
327 return r > 0
328 }
329 return r < 0
330 }
331
332 func (h *indexHeap) Swap(i, j int) {
333 h.indexes[i], h.indexes[j] = h.indexes[j], h.indexes[i]
334 }
335
336 func (h *indexHeap) Push(value interface{}) {
337 h.indexes = append(h.indexes, value.(int))
338 }
339
340 func (h *indexHeap) Pop() interface{} {
341 e := len(h.indexes) - 1
342 popped := h.indexes[e]
343 h.indexes = h.indexes[:e]
344 return popped
345 }
346
347 func (h *indexHeap) Reset(reverse bool) {
348 h.reverse = reverse
349 h.indexes = h.indexes[:0]
350 }
351
View as plain text