...

Source file src/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go

Documentation: github.com/syndtr/goleveldb/leveldb/iterator

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package iterator
     8  
     9  import (
    10  	"container/heap"
    11  
    12  	"github.com/syndtr/goleveldb/leveldb/comparer"
    13  	"github.com/syndtr/goleveldb/leveldb/errors"
    14  	"github.com/syndtr/goleveldb/leveldb/util"
    15  )
    16  
    17  type dir int
    18  
    19  const (
    20  	dirReleased dir = iota - 1
    21  	dirSOI
    22  	dirEOI
    23  	dirBackward
    24  	dirForward
    25  )
    26  
    27  type mergedIterator struct {
    28  	cmp    comparer.Comparer
    29  	iters  []Iterator
    30  	strict bool
    31  
    32  	keys     [][]byte
    33  	index    int
    34  	dir      dir
    35  	err      error
    36  	errf     func(err error)
    37  	releaser util.Releaser
    38  
    39  	indexes []int // the heap of iterator indexes
    40  	reverse bool  //nolint: structcheck // if true, indexes is a max-heap
    41  }
    42  
    43  func assertKey(key []byte) []byte {
    44  	if key == nil {
    45  		panic("leveldb/iterator: nil key")
    46  	}
    47  	return key
    48  }
    49  
    50  func (i *mergedIterator) iterErr(iter Iterator) bool {
    51  	if err := iter.Error(); err != nil {
    52  		if i.errf != nil {
    53  			i.errf(err)
    54  		}
    55  		if i.strict || !errors.IsCorrupted(err) {
    56  			i.err = err
    57  			return true
    58  		}
    59  	}
    60  	return false
    61  }
    62  
    63  func (i *mergedIterator) Valid() bool {
    64  	return i.err == nil && i.dir > dirEOI
    65  }
    66  
    67  func (i *mergedIterator) First() bool {
    68  	if i.err != nil {
    69  		return false
    70  	} else if i.dir == dirReleased {
    71  		i.err = ErrIterReleased
    72  		return false
    73  	}
    74  
    75  	h := i.indexHeap()
    76  	h.Reset(false)
    77  	for x, iter := range i.iters {
    78  		switch {
    79  		case iter.First():
    80  			i.keys[x] = assertKey(iter.Key())
    81  			h.Push(x)
    82  		case i.iterErr(iter):
    83  			return false
    84  		default:
    85  			i.keys[x] = nil
    86  		}
    87  	}
    88  	heap.Init(h)
    89  	i.dir = dirSOI
    90  	return i.next()
    91  }
    92  
    93  func (i *mergedIterator) Last() bool {
    94  	if i.err != nil {
    95  		return false
    96  	} else if i.dir == dirReleased {
    97  		i.err = ErrIterReleased
    98  		return false
    99  	}
   100  
   101  	h := i.indexHeap()
   102  	h.Reset(true)
   103  	for x, iter := range i.iters {
   104  		switch {
   105  		case iter.Last():
   106  			i.keys[x] = assertKey(iter.Key())
   107  			h.Push(x)
   108  		case i.iterErr(iter):
   109  			return false
   110  		default:
   111  			i.keys[x] = nil
   112  		}
   113  	}
   114  	heap.Init(h)
   115  	i.dir = dirEOI
   116  	return i.prev()
   117  }
   118  
   119  func (i *mergedIterator) Seek(key []byte) bool {
   120  	if i.err != nil {
   121  		return false
   122  	} else if i.dir == dirReleased {
   123  		i.err = ErrIterReleased
   124  		return false
   125  	}
   126  
   127  	h := i.indexHeap()
   128  	h.Reset(false)
   129  	for x, iter := range i.iters {
   130  		switch {
   131  		case iter.Seek(key):
   132  			i.keys[x] = assertKey(iter.Key())
   133  			h.Push(x)
   134  		case i.iterErr(iter):
   135  			return false
   136  		default:
   137  			i.keys[x] = nil
   138  		}
   139  	}
   140  	heap.Init(h)
   141  	i.dir = dirSOI
   142  	return i.next()
   143  }
   144  
   145  func (i *mergedIterator) next() bool {
   146  	h := i.indexHeap()
   147  	if h.Len() == 0 {
   148  		i.dir = dirEOI
   149  		return false
   150  	}
   151  	i.index = heap.Pop(h).(int)
   152  	i.dir = dirForward
   153  	return true
   154  }
   155  
   156  func (i *mergedIterator) Next() bool {
   157  	if i.dir == dirEOI || i.err != nil {
   158  		return false
   159  	} else if i.dir == dirReleased {
   160  		i.err = ErrIterReleased
   161  		return false
   162  	}
   163  
   164  	switch i.dir {
   165  	case dirSOI:
   166  		return i.First()
   167  	case dirBackward:
   168  		key := append([]byte(nil), i.keys[i.index]...)
   169  		if !i.Seek(key) {
   170  			return false
   171  		}
   172  		return i.Next()
   173  	}
   174  
   175  	x := i.index
   176  	iter := i.iters[x]
   177  	switch {
   178  	case iter.Next():
   179  		i.keys[x] = assertKey(iter.Key())
   180  		heap.Push(i.indexHeap(), x)
   181  	case i.iterErr(iter):
   182  		return false
   183  	default:
   184  		i.keys[x] = nil
   185  	}
   186  	return i.next()
   187  }
   188  
   189  func (i *mergedIterator) prev() bool {
   190  	h := i.indexHeap()
   191  	if h.Len() == 0 {
   192  		i.dir = dirSOI
   193  		return false
   194  	}
   195  	i.index = heap.Pop(h).(int)
   196  	i.dir = dirBackward
   197  	return true
   198  }
   199  
   200  func (i *mergedIterator) Prev() bool {
   201  	if i.dir == dirSOI || i.err != nil {
   202  		return false
   203  	} else if i.dir == dirReleased {
   204  		i.err = ErrIterReleased
   205  		return false
   206  	}
   207  
   208  	switch i.dir {
   209  	case dirEOI:
   210  		return i.Last()
   211  	case dirForward:
   212  		key := append([]byte(nil), i.keys[i.index]...)
   213  		h := i.indexHeap()
   214  		h.Reset(true)
   215  		for x, iter := range i.iters {
   216  			if x == i.index {
   217  				continue
   218  			}
   219  			seek := iter.Seek(key)
   220  			switch {
   221  			case seek && iter.Prev(), !seek && iter.Last():
   222  				i.keys[x] = assertKey(iter.Key())
   223  				h.Push(x)
   224  			case i.iterErr(iter):
   225  				return false
   226  			default:
   227  				i.keys[x] = nil
   228  			}
   229  		}
   230  		heap.Init(h)
   231  	}
   232  
   233  	x := i.index
   234  	iter := i.iters[x]
   235  	switch {
   236  	case iter.Prev():
   237  		i.keys[x] = assertKey(iter.Key())
   238  		heap.Push(i.indexHeap(), x)
   239  	case i.iterErr(iter):
   240  		return false
   241  	default:
   242  		i.keys[x] = nil
   243  	}
   244  	return i.prev()
   245  }
   246  
   247  func (i *mergedIterator) Key() []byte {
   248  	if i.err != nil || i.dir <= dirEOI {
   249  		return nil
   250  	}
   251  	return i.keys[i.index]
   252  }
   253  
   254  func (i *mergedIterator) Value() []byte {
   255  	if i.err != nil || i.dir <= dirEOI {
   256  		return nil
   257  	}
   258  	return i.iters[i.index].Value()
   259  }
   260  
   261  func (i *mergedIterator) Release() {
   262  	if i.dir != dirReleased {
   263  		i.dir = dirReleased
   264  		for _, iter := range i.iters {
   265  			iter.Release()
   266  		}
   267  		i.iters = nil
   268  		i.keys = nil
   269  		i.indexes = nil
   270  		if i.releaser != nil {
   271  			i.releaser.Release()
   272  			i.releaser = nil
   273  		}
   274  	}
   275  }
   276  
   277  func (i *mergedIterator) SetReleaser(releaser util.Releaser) {
   278  	if i.dir == dirReleased {
   279  		panic(util.ErrReleased)
   280  	}
   281  	if i.releaser != nil && releaser != nil {
   282  		panic(util.ErrHasReleaser)
   283  	}
   284  	i.releaser = releaser
   285  }
   286  
   287  func (i *mergedIterator) Error() error {
   288  	return i.err
   289  }
   290  
   291  func (i *mergedIterator) SetErrorCallback(f func(err error)) {
   292  	i.errf = f
   293  }
   294  
   295  func (i *mergedIterator) indexHeap() *indexHeap {
   296  	return (*indexHeap)(i)
   297  }
   298  
   299  // NewMergedIterator returns an iterator that merges its input. Walking the
   300  // resultant iterator will return all key/value pairs of all input iterators
   301  // in strictly increasing key order, as defined by cmp.
   302  // The input's key ranges may overlap, but there are assumed to be no duplicate
   303  // keys: if iters[i] contains a key k then iters[j] will not contain that key k.
   304  // None of the iters may be nil.
   305  //
   306  // If strict is true the any 'corruption errors' (i.e errors.IsCorrupted(err) == true)
   307  // won't be ignored and will halt 'merged iterator', otherwise the iterator will
   308  // continue to the next 'input iterator'.
   309  func NewMergedIterator(iters []Iterator, cmp comparer.Comparer, strict bool) Iterator {
   310  	return &mergedIterator{
   311  		iters:   iters,
   312  		cmp:     cmp,
   313  		strict:  strict,
   314  		keys:    make([][]byte, len(iters)),
   315  		indexes: make([]int, 0, len(iters)),
   316  	}
   317  }
   318  
   319  // indexHeap implements heap.Interface.
   320  type indexHeap mergedIterator
   321  
   322  func (h *indexHeap) Len() int { return len(h.indexes) }
   323  func (h *indexHeap) Less(i, j int) bool {
   324  	i, j = h.indexes[i], h.indexes[j]
   325  	r := h.cmp.Compare(h.keys[i], h.keys[j])
   326  	if h.reverse {
   327  		return r > 0
   328  	}
   329  	return r < 0
   330  }
   331  
   332  func (h *indexHeap) Swap(i, j int) {
   333  	h.indexes[i], h.indexes[j] = h.indexes[j], h.indexes[i]
   334  }
   335  
   336  func (h *indexHeap) Push(value interface{}) {
   337  	h.indexes = append(h.indexes, value.(int))
   338  }
   339  
   340  func (h *indexHeap) Pop() interface{} {
   341  	e := len(h.indexes) - 1
   342  	popped := h.indexes[e]
   343  	h.indexes = h.indexes[:e]
   344  	return popped
   345  }
   346  
   347  func (h *indexHeap) Reset(reverse bool) {
   348  	h.reverse = reverse
   349  	h.indexes = h.indexes[:0]
   350  }
   351  

View as plain text