...

Source file src/github.com/syndtr/goleveldb/leveldb/session_util.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"fmt"
    11  	"sync/atomic"
    12  	"time"
    13  
    14  	"github.com/syndtr/goleveldb/leveldb/journal"
    15  	"github.com/syndtr/goleveldb/leveldb/storage"
    16  )
    17  
    18  // Logging.
    19  
    20  type dropper struct {
    21  	s  *session
    22  	fd storage.FileDesc
    23  }
    24  
    25  func (d dropper) Drop(err error) {
    26  	if e, ok := err.(*journal.ErrCorrupted); ok {
    27  		d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(int64(e.Size)), e.Reason)
    28  	} else {
    29  		d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
    30  	}
    31  }
    32  
    33  func (s *session) log(v ...interface{})                 { s.stor.Log(fmt.Sprint(v...)) }
    34  func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
    35  
    36  // File utils.
    37  
    38  func (s *session) newTemp() storage.FileDesc {
    39  	num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
    40  	return storage.FileDesc{Type: storage.TypeTemp, Num: num}
    41  }
    42  
    43  // Session state.
    44  
    45  const (
    46  	// maxCachedNumber represents the maximum number of version tasks
    47  	// that can be cached in the ref loop.
    48  	maxCachedNumber = 256
    49  
    50  	// maxCachedTime represents the maximum time for ref loop to cache
    51  	// a version task.
    52  	maxCachedTime = 5 * time.Minute
    53  )
    54  
    55  // vDelta indicates the change information between the next version
    56  // and the currently specified version
    57  type vDelta struct {
    58  	vid     int64
    59  	added   []int64
    60  	deleted []int64
    61  }
    62  
    63  // vTask defines a version task for either reference or release.
    64  type vTask struct {
    65  	vid     int64
    66  	files   []tFiles
    67  	created time.Time
    68  }
    69  
    70  func (s *session) refLoop() {
    71  	var (
    72  		fileRef    = make(map[int64]int)    // Table file reference counter
    73  		ref        = make(map[int64]*vTask) // Current referencing version store
    74  		deltas     = make(map[int64]*vDelta)
    75  		referenced = make(map[int64]struct{})
    76  		released   = make(map[int64]*vDelta)  // Released version that waiting for processing
    77  		abandoned  = make(map[int64]struct{}) // Abandoned version id
    78  		next, last int64
    79  	)
    80  	// addFileRef adds file reference counter with specified file number and
    81  	// reference value
    82  	addFileRef := func(fnum int64, ref int) int {
    83  		ref += fileRef[fnum]
    84  		if ref > 0 {
    85  			fileRef[fnum] = ref
    86  		} else if ref == 0 {
    87  			delete(fileRef, fnum)
    88  		} else {
    89  			panic(fmt.Sprintf("negative ref: %v", fnum))
    90  		}
    91  		return ref
    92  	}
    93  	// skipAbandoned skips useless abandoned version id.
    94  	skipAbandoned := func() bool {
    95  		if _, exist := abandoned[next]; exist {
    96  			delete(abandoned, next)
    97  			return true
    98  		}
    99  		return false
   100  	}
   101  	// applyDelta applies version change to current file reference.
   102  	applyDelta := func(d *vDelta) {
   103  		for _, t := range d.added {
   104  			addFileRef(t, 1)
   105  		}
   106  		for _, t := range d.deleted {
   107  			if addFileRef(t, -1) == 0 {
   108  				s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
   109  			}
   110  		}
   111  	}
   112  
   113  	timer := time.NewTimer(0)
   114  	<-timer.C // discard the initial tick
   115  	defer timer.Stop()
   116  
   117  	// processTasks processes version tasks in strict order.
   118  	//
   119  	// If we want to use delta to reduce the cost of file references and dereferences,
   120  	// we must strictly follow the id of the version, otherwise some files that are
   121  	// being referenced will be deleted.
   122  	//
   123  	// In addition, some db operations (such as iterators) may cause a version to be
   124  	// referenced for a long time. In order to prevent such operations from blocking
   125  	// the entire processing queue, we will properly convert some of the version tasks
   126  	// into full file references and releases.
   127  	processTasks := func() {
   128  		timer.Reset(maxCachedTime)
   129  		// Make sure we don't cache too many version tasks.
   130  		for {
   131  			// Skip any abandoned version number to prevent blocking processing.
   132  			if skipAbandoned() {
   133  				next++
   134  				continue
   135  			}
   136  			// Don't bother the version that has been released.
   137  			if _, exist := released[next]; exist {
   138  				break
   139  			}
   140  			// Ensure the specified version has been referenced.
   141  			if _, exist := ref[next]; !exist {
   142  				break
   143  			}
   144  			if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
   145  				break
   146  			}
   147  			// Convert version task into full file references and releases mode.
   148  			// Reference version(i+1) first and wait version(i) to release.
   149  			// FileRef(i+1) = FileRef(i) + Delta(i)
   150  			for _, tt := range ref[next].files {
   151  				for _, t := range tt {
   152  					addFileRef(t.fd.Num, 1)
   153  				}
   154  			}
   155  			// Note, if some compactions take a long time, even more than 5 minutes,
   156  			// we may miss the corresponding delta information here.
   157  			// Fortunately it will not affect the correctness of the file reference,
   158  			// and we can apply the delta once we receive it.
   159  			if d := deltas[next]; d != nil {
   160  				applyDelta(d)
   161  			}
   162  			referenced[next] = struct{}{}
   163  			delete(ref, next)
   164  			delete(deltas, next)
   165  			next++
   166  		}
   167  
   168  		// Use delta information to process all released versions.
   169  		for {
   170  			if skipAbandoned() {
   171  				next++
   172  				continue
   173  			}
   174  			if d, exist := released[next]; exist {
   175  				if d != nil {
   176  					applyDelta(d)
   177  				}
   178  				delete(released, next)
   179  				next++
   180  				continue
   181  			}
   182  			return
   183  		}
   184  	}
   185  
   186  	for {
   187  		processTasks()
   188  
   189  		select {
   190  		case t := <-s.refCh:
   191  			if _, exist := ref[t.vid]; exist {
   192  				panic("duplicate reference request")
   193  			}
   194  			ref[t.vid] = t
   195  			if t.vid > last {
   196  				last = t.vid
   197  			}
   198  
   199  		case d := <-s.deltaCh:
   200  			if _, exist := ref[d.vid]; !exist {
   201  				if _, exist2 := referenced[d.vid]; !exist2 {
   202  					panic("invalid release request")
   203  				}
   204  				// The reference opt is already expired, apply
   205  				// delta here.
   206  				applyDelta(d)
   207  				continue
   208  			}
   209  			deltas[d.vid] = d
   210  
   211  		case t := <-s.relCh:
   212  			if _, exist := referenced[t.vid]; exist {
   213  				for _, tt := range t.files {
   214  					for _, t := range tt {
   215  						if addFileRef(t.fd.Num, -1) == 0 {
   216  							s.tops.remove(t.fd)
   217  						}
   218  					}
   219  				}
   220  				delete(referenced, t.vid)
   221  				continue
   222  			}
   223  			if _, exist := ref[t.vid]; !exist {
   224  				panic("invalid release request")
   225  			}
   226  			released[t.vid] = deltas[t.vid]
   227  			delete(deltas, t.vid)
   228  			delete(ref, t.vid)
   229  
   230  		case id := <-s.abandon:
   231  			if id >= next {
   232  				abandoned[id] = struct{}{}
   233  			}
   234  
   235  		case <-timer.C:
   236  
   237  		case r := <-s.fileRefCh:
   238  			ref := make(map[int64]int)
   239  			for f, c := range fileRef {
   240  				ref[f] = c
   241  			}
   242  			r <- ref
   243  
   244  		case <-s.closeC:
   245  			s.closeW.Done()
   246  			return
   247  		}
   248  	}
   249  }
   250  
   251  // Get current version. This will incr version ref, must call
   252  // version.release (exactly once) after use.
   253  func (s *session) version() *version {
   254  	s.vmu.Lock()
   255  	defer s.vmu.Unlock()
   256  	s.stVersion.incref()
   257  	return s.stVersion
   258  }
   259  
   260  func (s *session) tLen(level int) int {
   261  	s.vmu.Lock()
   262  	defer s.vmu.Unlock()
   263  	return s.stVersion.tLen(level)
   264  }
   265  
   266  // Set current version to v.
   267  func (s *session) setVersion(r *sessionRecord, v *version) {
   268  	s.vmu.Lock()
   269  	defer s.vmu.Unlock()
   270  	// Hold by session. It is important to call this first before releasing
   271  	// current version, otherwise the still used files might get released.
   272  	v.incref()
   273  	if s.stVersion != nil {
   274  		if r != nil {
   275  			var (
   276  				added   = make([]int64, 0, len(r.addedTables))
   277  				deleted = make([]int64, 0, len(r.deletedTables))
   278  			)
   279  			for _, t := range r.addedTables {
   280  				added = append(added, t.num)
   281  			}
   282  			for _, t := range r.deletedTables {
   283  				deleted = append(deleted, t.num)
   284  			}
   285  			select {
   286  			case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
   287  			case <-v.s.closeC:
   288  				s.log("reference loop already exist")
   289  			}
   290  		}
   291  		// Release current version.
   292  		s.stVersion.releaseNB()
   293  	}
   294  	s.stVersion = v
   295  }
   296  
   297  // Get current unused file number.
   298  func (s *session) nextFileNum() int64 {
   299  	return atomic.LoadInt64(&s.stNextFileNum)
   300  }
   301  
   302  // Set current unused file number to num.
   303  func (s *session) setNextFileNum(num int64) {
   304  	atomic.StoreInt64(&s.stNextFileNum, num)
   305  }
   306  
   307  // Mark file number as used.
   308  func (s *session) markFileNum(num int64) {
   309  	nextFileNum := num + 1
   310  	for {
   311  		old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum
   312  		if old > x {
   313  			x = old
   314  		}
   315  		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
   316  			break
   317  		}
   318  	}
   319  }
   320  
   321  // Allocate a file number.
   322  func (s *session) allocFileNum() int64 {
   323  	return atomic.AddInt64(&s.stNextFileNum, 1) - 1
   324  }
   325  
   326  // Reuse given file number.
   327  func (s *session) reuseFileNum(num int64) {
   328  	for {
   329  		old, x := atomic.LoadInt64(&s.stNextFileNum), num
   330  		if old != x+1 {
   331  			x = old
   332  		}
   333  		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
   334  			break
   335  		}
   336  	}
   337  }
   338  
   339  // Set compaction ptr at given level; need external synchronization.
   340  func (s *session) setCompPtr(level int, ik internalKey) {
   341  	if level >= len(s.stCompPtrs) {
   342  		newCompPtrs := make([]internalKey, level+1)
   343  		copy(newCompPtrs, s.stCompPtrs)
   344  		s.stCompPtrs = newCompPtrs
   345  	}
   346  	s.stCompPtrs[level] = append(internalKey{}, ik...)
   347  }
   348  
   349  // Get compaction ptr at given level; need external synchronization.
   350  func (s *session) getCompPtr(level int) internalKey {
   351  	if level >= len(s.stCompPtrs) {
   352  		return nil
   353  	}
   354  	return s.stCompPtrs[level]
   355  }
   356  
   357  // Manifest related utils.
   358  
   359  // Fill given session record obj with current states; need external
   360  // synchronization.
   361  func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
   362  	r.setNextFileNum(s.nextFileNum())
   363  
   364  	if snapshot {
   365  		if !r.has(recJournalNum) {
   366  			r.setJournalNum(s.stJournalNum)
   367  		}
   368  
   369  		if !r.has(recSeqNum) {
   370  			r.setSeqNum(s.stSeqNum)
   371  		}
   372  
   373  		for level, ik := range s.stCompPtrs {
   374  			if ik != nil {
   375  				r.addCompPtr(level, ik)
   376  			}
   377  		}
   378  
   379  		r.setComparer(s.icmp.uName())
   380  	}
   381  }
   382  
   383  // Mark if record has been committed, this will update session state;
   384  // need external synchronization.
   385  func (s *session) recordCommited(rec *sessionRecord) {
   386  	if rec.has(recJournalNum) {
   387  		s.stJournalNum = rec.journalNum
   388  	}
   389  
   390  	if rec.has(recPrevJournalNum) {
   391  		s.stPrevJournalNum = rec.prevJournalNum
   392  	}
   393  
   394  	if rec.has(recSeqNum) {
   395  		s.stSeqNum = rec.seqNum
   396  	}
   397  
   398  	for _, r := range rec.compPtrs {
   399  		s.setCompPtr(r.level, r.ikey)
   400  	}
   401  }
   402  
   403  // Create a new manifest file; need external synchronization.
   404  func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
   405  	fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
   406  	writer, err := s.stor.Create(fd)
   407  	if err != nil {
   408  		return
   409  	}
   410  	jw := journal.NewWriter(writer)
   411  
   412  	if v == nil {
   413  		v = s.version()
   414  		defer v.release()
   415  	}
   416  	if rec == nil {
   417  		rec = &sessionRecord{}
   418  	}
   419  	s.fillRecord(rec, true)
   420  	v.fillRecord(rec)
   421  
   422  	defer func() {
   423  		if err == nil {
   424  			s.recordCommited(rec)
   425  			if s.manifest != nil {
   426  				s.manifest.Close()
   427  			}
   428  			if s.manifestWriter != nil {
   429  				s.manifestWriter.Close()
   430  			}
   431  			if !s.manifestFd.Zero() {
   432  				err = s.stor.Remove(s.manifestFd)
   433  			}
   434  			s.manifestFd = fd
   435  			s.manifestWriter = writer
   436  			s.manifest = jw
   437  		} else {
   438  			writer.Close()
   439  			if rerr := s.stor.Remove(fd); err != nil {
   440  				err = fmt.Errorf("newManifest error: %v, cleanup error (%v)", err, rerr)
   441  			}
   442  			s.reuseFileNum(fd.Num)
   443  		}
   444  	}()
   445  
   446  	w, err := jw.Next()
   447  	if err != nil {
   448  		return
   449  	}
   450  	err = rec.encode(w)
   451  	if err != nil {
   452  		return
   453  	}
   454  	err = jw.Flush()
   455  	if err != nil {
   456  		return
   457  	}
   458  	if !s.o.GetNoSync() {
   459  		err = writer.Sync()
   460  		if err != nil {
   461  			return
   462  		}
   463  	}
   464  	err = s.stor.SetMeta(fd)
   465  	return
   466  }
   467  
   468  // Flush record to disk.
   469  func (s *session) flushManifest(rec *sessionRecord) (err error) {
   470  	s.fillRecord(rec, false)
   471  	w, err := s.manifest.Next()
   472  	if err != nil {
   473  		return
   474  	}
   475  	err = rec.encode(w)
   476  	if err != nil {
   477  		return
   478  	}
   479  	err = s.manifest.Flush()
   480  	if err != nil {
   481  		return
   482  	}
   483  	if !s.o.GetNoSync() {
   484  		err = s.manifestWriter.Sync()
   485  		if err != nil {
   486  			return
   487  		}
   488  	}
   489  	s.recordCommited(rec)
   490  	return
   491  }
   492  

View as plain text