...

Source file src/github.com/syndtr/goleveldb/leveldb/session.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"fmt"
    11  	"io"
    12  	"os"
    13  	"sync"
    14  
    15  	"github.com/syndtr/goleveldb/leveldb/errors"
    16  	"github.com/syndtr/goleveldb/leveldb/journal"
    17  	"github.com/syndtr/goleveldb/leveldb/opt"
    18  	"github.com/syndtr/goleveldb/leveldb/storage"
    19  )
    20  
    21  // ErrManifestCorrupted records manifest corruption. This error will be
    22  // wrapped with errors.ErrCorrupted.
    23  type ErrManifestCorrupted struct {
    24  	Field  string
    25  	Reason string
    26  }
    27  
    28  func (e *ErrManifestCorrupted) Error() string {
    29  	return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
    30  }
    31  
    32  func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
    33  	return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
    34  }
    35  
    36  // session represent a persistent database session.
    37  type session struct {
    38  	// Need 64-bit alignment.
    39  	stNextFileNum    int64 // current unused file number
    40  	stJournalNum     int64 // current journal file number; need external synchronization
    41  	stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
    42  	stTempFileNum    int64
    43  	stSeqNum         uint64 // last mem compacted seq; need external synchronization
    44  
    45  	stor     *iStorage
    46  	storLock storage.Locker
    47  	o        *cachedOptions
    48  	icmp     *iComparer
    49  	tops     *tOps
    50  
    51  	manifest       *journal.Writer
    52  	manifestWriter storage.Writer
    53  	manifestFd     storage.FileDesc
    54  
    55  	stCompPtrs  []internalKey // compaction pointers; need external synchronization
    56  	stVersion   *version      // current version
    57  	ntVersionID int64         // next version id to assign
    58  	refCh       chan *vTask
    59  	relCh       chan *vTask
    60  	deltaCh     chan *vDelta
    61  	abandon     chan int64
    62  	closeC      chan struct{}
    63  	closeW      sync.WaitGroup
    64  	vmu         sync.Mutex
    65  
    66  	// Testing fields
    67  	fileRefCh chan chan map[int64]int // channel used to pass current reference stat
    68  }
    69  
    70  // Creates new initialized session instance.
    71  func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
    72  	if stor == nil {
    73  		return nil, os.ErrInvalid
    74  	}
    75  	storLock, err := stor.Lock()
    76  	if err != nil {
    77  		return
    78  	}
    79  	s = &session{
    80  		stor:      newIStorage(stor),
    81  		storLock:  storLock,
    82  		refCh:     make(chan *vTask),
    83  		relCh:     make(chan *vTask),
    84  		deltaCh:   make(chan *vDelta),
    85  		abandon:   make(chan int64),
    86  		fileRefCh: make(chan chan map[int64]int),
    87  		closeC:    make(chan struct{}),
    88  	}
    89  	s.setOptions(o)
    90  	s.tops = newTableOps(s)
    91  
    92  	s.closeW.Add(1)
    93  	go s.refLoop()
    94  	s.setVersion(nil, newVersion(s))
    95  	s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
    96  	return
    97  }
    98  
    99  // Close session.
   100  func (s *session) close() {
   101  	s.tops.close()
   102  	if s.manifest != nil {
   103  		s.manifest.Close()
   104  	}
   105  	if s.manifestWriter != nil {
   106  		s.manifestWriter.Close()
   107  	}
   108  	s.manifest = nil
   109  	s.manifestWriter = nil
   110  	s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionID})
   111  
   112  	// Close all background goroutines
   113  	close(s.closeC)
   114  	s.closeW.Wait()
   115  }
   116  
   117  // Release session lock.
   118  func (s *session) release() {
   119  	s.storLock.Unlock()
   120  }
   121  
   122  // Create a new database session; need external synchronization.
   123  func (s *session) create() error {
   124  	// create manifest
   125  	return s.newManifest(nil, nil)
   126  }
   127  
   128  // Recover a database session; need external synchronization.
   129  func (s *session) recover() (err error) {
   130  	defer func() {
   131  		if os.IsNotExist(err) {
   132  			// Don't return os.ErrNotExist if the underlying storage contains
   133  			// other files that belong to LevelDB. So the DB won't get trashed.
   134  			if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
   135  				err = &errors.ErrCorrupted{Err: errors.New("database entry point either missing or corrupted")}
   136  			}
   137  		}
   138  	}()
   139  
   140  	fd, err := s.stor.GetMeta()
   141  	if err != nil {
   142  		return
   143  	}
   144  
   145  	reader, err := s.stor.Open(fd)
   146  	if err != nil {
   147  		return
   148  	}
   149  	defer reader.Close()
   150  
   151  	var (
   152  		// Options.
   153  		strict = s.o.GetStrict(opt.StrictManifest)
   154  
   155  		jr      = journal.NewReader(reader, dropper{s, fd}, strict, true)
   156  		rec     = &sessionRecord{}
   157  		staging = s.stVersion.newStaging()
   158  	)
   159  	for {
   160  		var r io.Reader
   161  		r, err = jr.Next()
   162  		if err != nil {
   163  			if err == io.EOF {
   164  				err = nil
   165  				break
   166  			}
   167  			return errors.SetFd(err, fd)
   168  		}
   169  
   170  		err = rec.decode(r)
   171  		if err == nil {
   172  			// save compact pointers
   173  			for _, r := range rec.compPtrs {
   174  				s.setCompPtr(r.level, r.ikey)
   175  			}
   176  			// commit record to version staging
   177  			staging.commit(rec)
   178  		} else {
   179  			err = errors.SetFd(err, fd)
   180  			if strict || !errors.IsCorrupted(err) {
   181  				return
   182  			}
   183  			s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
   184  		}
   185  		rec.resetCompPtrs()
   186  		rec.resetAddedTables()
   187  		rec.resetDeletedTables()
   188  	}
   189  
   190  	switch {
   191  	case !rec.has(recComparer):
   192  		return newErrManifestCorrupted(fd, "comparer", "missing")
   193  	case rec.comparer != s.icmp.uName():
   194  		return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
   195  	case !rec.has(recNextFileNum):
   196  		return newErrManifestCorrupted(fd, "next-file-num", "missing")
   197  	case !rec.has(recJournalNum):
   198  		return newErrManifestCorrupted(fd, "journal-file-num", "missing")
   199  	case !rec.has(recSeqNum):
   200  		return newErrManifestCorrupted(fd, "seq-num", "missing")
   201  	}
   202  
   203  	s.manifestFd = fd
   204  	s.setVersion(rec, staging.finish(false))
   205  	s.setNextFileNum(rec.nextFileNum)
   206  	s.recordCommited(rec)
   207  	return nil
   208  }
   209  
   210  // Commit session; need external synchronization.
   211  func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
   212  	v := s.version()
   213  	defer v.release()
   214  
   215  	// spawn new version based on current version
   216  	nv := v.spawn(r, trivial)
   217  
   218  	// abandon useless version id to prevent blocking version processing loop.
   219  	defer func() {
   220  		if err != nil {
   221  			s.abandon <- nv.id
   222  			s.logf("commit@abandon useless vid D%d", nv.id)
   223  		}
   224  	}()
   225  
   226  	if s.manifest == nil {
   227  		// manifest journal writer not yet created, create one
   228  		err = s.newManifest(r, nv)
   229  	} else if s.manifest.Size() >= s.o.GetMaxManifestFileSize() {
   230  		// pass nil sessionRecord to avoid over-reference table file
   231  		err = s.newManifest(nil, nv)
   232  	} else {
   233  		err = s.flushManifest(r)
   234  	}
   235  
   236  	// finally, apply new version if no error rise
   237  	if err == nil {
   238  		s.setVersion(r, nv)
   239  	}
   240  
   241  	return
   242  }
   243  

View as plain text