...

Source file src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go

Documentation: github.com/syndtr/goleveldb/leveldb/storage

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reservefs.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package storage
     8  
     9  import (
    10  	"errors"
    11  	"fmt"
    12  	"io"
    13  	"io/ioutil"
    14  	"os"
    15  	"path/filepath"
    16  	"runtime"
    17  	"sort"
    18  	"strconv"
    19  	"strings"
    20  	"sync"
    21  	"time"
    22  )
    23  
    24  var (
    25  	errFileOpen = errors.New("leveldb/storage: file still open")
    26  	errReadOnly = errors.New("leveldb/storage: storage is read-only")
    27  )
    28  
    29  type fileLock interface {
    30  	release() error
    31  }
    32  
    33  type fileStorageLock struct {
    34  	fs *fileStorage
    35  }
    36  
    37  func (lock *fileStorageLock) Unlock() {
    38  	if lock.fs != nil {
    39  		lock.fs.mu.Lock()
    40  		defer lock.fs.mu.Unlock()
    41  		if lock.fs.slock == lock {
    42  			lock.fs.slock = nil
    43  		}
    44  	}
    45  }
    46  
    47  type int64Slice []int64
    48  
    49  func (p int64Slice) Len() int           { return len(p) }
    50  func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
    51  func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
    52  
    53  func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
    54  	f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
    55  	if err != nil {
    56  		return err
    57  	}
    58  	n, err := f.Write(data)
    59  	if err == nil && n < len(data) {
    60  		err = io.ErrShortWrite
    61  	}
    62  	if err1 := f.Sync(); err == nil {
    63  		err = err1
    64  	}
    65  	if err1 := f.Close(); err == nil {
    66  		err = err1
    67  	}
    68  	return err
    69  }
    70  
    71  const logSizeThreshold = 1024 * 1024 // 1 MiB
    72  
    73  // fileStorage is a file-system backed storage.
    74  type fileStorage struct {
    75  	path     string
    76  	readOnly bool
    77  
    78  	mu      sync.Mutex
    79  	flock   fileLock
    80  	slock   *fileStorageLock
    81  	logw    *os.File
    82  	logSize int64
    83  	buf     []byte
    84  	// Opened file counter; if open < 0 means closed.
    85  	open int
    86  	day  int
    87  }
    88  
    89  // OpenFile returns a new filesystem-backed storage implementation with the given
    90  // path. This also acquire a file lock, so any subsequent attempt to open the
    91  // same path will fail.
    92  //
    93  // The storage must be closed after use, by calling Close method.
    94  func OpenFile(path string, readOnly bool) (Storage, error) {
    95  	if fi, err := os.Stat(path); err == nil {
    96  		if !fi.IsDir() {
    97  			return nil, fmt.Errorf("leveldb/storage: open %s: not a directory", path)
    98  		}
    99  	} else if os.IsNotExist(err) && !readOnly {
   100  		if err := os.MkdirAll(path, 0755); err != nil {
   101  			return nil, err
   102  		}
   103  	} else {
   104  		return nil, err
   105  	}
   106  
   107  	flock, err := newFileLock(filepath.Join(path, "LOCK"), readOnly)
   108  	if err != nil {
   109  		return nil, err
   110  	}
   111  
   112  	defer func() {
   113  		if err != nil {
   114  			if ferr := flock.release(); ferr != nil {
   115  				err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, ferr)
   116  			}
   117  		}
   118  	}()
   119  
   120  	var (
   121  		logw    *os.File
   122  		logSize int64
   123  	)
   124  	if !readOnly {
   125  		logw, err = os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
   126  		if err != nil {
   127  			return nil, err
   128  		}
   129  		logSize, err = logw.Seek(0, os.SEEK_END)
   130  		if err != nil {
   131  			logw.Close()
   132  			return nil, err
   133  		}
   134  	}
   135  
   136  	fs := &fileStorage{
   137  		path:     path,
   138  		readOnly: readOnly,
   139  		flock:    flock,
   140  		logw:     logw,
   141  		logSize:  logSize,
   142  	}
   143  	runtime.SetFinalizer(fs, (*fileStorage).Close)
   144  	return fs, nil
   145  }
   146  
   147  func (fs *fileStorage) Lock() (Locker, error) {
   148  	fs.mu.Lock()
   149  	defer fs.mu.Unlock()
   150  	if fs.open < 0 {
   151  		return nil, ErrClosed
   152  	}
   153  	if fs.readOnly {
   154  		return &fileStorageLock{}, nil
   155  	}
   156  	if fs.slock != nil {
   157  		return nil, ErrLocked
   158  	}
   159  	fs.slock = &fileStorageLock{fs: fs}
   160  	return fs.slock, nil
   161  }
   162  
   163  func itoa(buf []byte, i int, wid int) []byte {
   164  	u := uint(i)
   165  	if u == 0 && wid <= 1 {
   166  		return append(buf, '0')
   167  	}
   168  
   169  	// Assemble decimal in reverse order.
   170  	var b [32]byte
   171  	bp := len(b)
   172  	for ; u > 0 || wid > 0; u /= 10 {
   173  		bp--
   174  		wid--
   175  		b[bp] = byte(u%10) + '0'
   176  	}
   177  	return append(buf, b[bp:]...)
   178  }
   179  
   180  func (fs *fileStorage) printDay(t time.Time) error {
   181  	if fs.day == t.Day() {
   182  		return nil
   183  	}
   184  	fs.day = t.Day()
   185  	_, err := fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n"))
   186  	return err
   187  }
   188  
   189  func (fs *fileStorage) doLog(t time.Time, str string) {
   190  	if fs.logSize > logSizeThreshold {
   191  		// Rotate log file.
   192  		fs.logw.Close()
   193  		fs.logw = nil
   194  		fs.logSize = 0
   195  		if err := rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old")); err != nil {
   196  			return
   197  		}
   198  	}
   199  	if fs.logw == nil {
   200  		var err error
   201  		fs.logw, err = os.OpenFile(filepath.Join(fs.path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
   202  		if err != nil {
   203  			return
   204  		}
   205  		// Force printDay on new log file.
   206  		fs.day = 0
   207  	}
   208  	if err := fs.printDay(t); err != nil {
   209  		return
   210  	}
   211  	hour, min, sec := t.Clock()
   212  	msec := t.Nanosecond() / 1e3
   213  	// time
   214  	fs.buf = itoa(fs.buf[:0], hour, 2)
   215  	fs.buf = append(fs.buf, ':')
   216  	fs.buf = itoa(fs.buf, min, 2)
   217  	fs.buf = append(fs.buf, ':')
   218  	fs.buf = itoa(fs.buf, sec, 2)
   219  	fs.buf = append(fs.buf, '.')
   220  	fs.buf = itoa(fs.buf, msec, 6)
   221  	fs.buf = append(fs.buf, ' ')
   222  	// write
   223  	fs.buf = append(fs.buf, []byte(str)...)
   224  	fs.buf = append(fs.buf, '\n')
   225  	n, _ := fs.logw.Write(fs.buf)
   226  	fs.logSize += int64(n)
   227  }
   228  
   229  func (fs *fileStorage) Log(str string) {
   230  	if !fs.readOnly {
   231  		t := time.Now()
   232  		fs.mu.Lock()
   233  		defer fs.mu.Unlock()
   234  		if fs.open < 0 {
   235  			return
   236  		}
   237  		fs.doLog(t, str)
   238  	}
   239  }
   240  
   241  func (fs *fileStorage) log(str string) {
   242  	if !fs.readOnly {
   243  		fs.doLog(time.Now(), str)
   244  	}
   245  }
   246  
   247  func (fs *fileStorage) setMeta(fd FileDesc) error {
   248  	content := fsGenName(fd) + "\n"
   249  	// Check and backup old CURRENT file.
   250  	currentPath := filepath.Join(fs.path, "CURRENT")
   251  	if _, err := os.Stat(currentPath); err == nil {
   252  		b, err := ioutil.ReadFile(currentPath)
   253  		if err != nil {
   254  			fs.log(fmt.Sprintf("backup CURRENT: %v", err))
   255  			return err
   256  		}
   257  		if string(b) == content {
   258  			// Content not changed, do nothing.
   259  			return nil
   260  		}
   261  		if err := writeFileSynced(currentPath+".bak", b, 0644); err != nil {
   262  			fs.log(fmt.Sprintf("backup CURRENT: %v", err))
   263  			return err
   264  		}
   265  	} else if !os.IsNotExist(err) {
   266  		return err
   267  	}
   268  	path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
   269  	if err := writeFileSynced(path, []byte(content), 0644); err != nil {
   270  		fs.log(fmt.Sprintf("create CURRENT.%d: %v", fd.Num, err))
   271  		return err
   272  	}
   273  	// Replace CURRENT file.
   274  	if err := rename(path, currentPath); err != nil {
   275  		fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
   276  		return err
   277  	}
   278  	// Sync root directory.
   279  	if err := syncDir(fs.path); err != nil {
   280  		fs.log(fmt.Sprintf("syncDir: %v", err))
   281  		return err
   282  	}
   283  	return nil
   284  }
   285  
   286  func (fs *fileStorage) SetMeta(fd FileDesc) error {
   287  	if !FileDescOk(fd) {
   288  		return ErrInvalidFile
   289  	}
   290  	if fs.readOnly {
   291  		return errReadOnly
   292  	}
   293  
   294  	fs.mu.Lock()
   295  	defer fs.mu.Unlock()
   296  	if fs.open < 0 {
   297  		return ErrClosed
   298  	}
   299  	return fs.setMeta(fd)
   300  }
   301  
   302  func (fs *fileStorage) GetMeta() (FileDesc, error) {
   303  	fs.mu.Lock()
   304  	defer fs.mu.Unlock()
   305  	if fs.open < 0 {
   306  		return FileDesc{}, ErrClosed
   307  	}
   308  	dir, err := os.Open(fs.path)
   309  	if err != nil {
   310  		return FileDesc{}, err
   311  	}
   312  	names, err := dir.Readdirnames(0)
   313  	// Close the dir first before checking for Readdirnames error.
   314  	if ce := dir.Close(); ce != nil {
   315  		fs.log(fmt.Sprintf("close dir: %v", ce))
   316  	}
   317  	if err != nil {
   318  		return FileDesc{}, err
   319  	}
   320  	// Try this in order:
   321  	// - CURRENT.[0-9]+ ('pending rename' file, descending order)
   322  	// - CURRENT
   323  	// - CURRENT.bak
   324  	//
   325  	// Skip corrupted file or file that point to a missing target file.
   326  	type currentFile struct {
   327  		name string
   328  		fd   FileDesc
   329  	}
   330  	tryCurrent := func(name string) (*currentFile, error) {
   331  		b, err := ioutil.ReadFile(filepath.Join(fs.path, name))
   332  		if err != nil {
   333  			if os.IsNotExist(err) {
   334  				err = os.ErrNotExist
   335  			}
   336  			return nil, err
   337  		}
   338  		var fd FileDesc
   339  		if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd) {
   340  			fs.log(fmt.Sprintf("%s: corrupted content: %q", name, b))
   341  			err := &ErrCorrupted{
   342  				Err: errors.New("leveldb/storage: corrupted or incomplete CURRENT file"),
   343  			}
   344  			return nil, err
   345  		}
   346  		if _, err := os.Stat(filepath.Join(fs.path, fsGenName(fd))); err != nil {
   347  			if os.IsNotExist(err) {
   348  				fs.log(fmt.Sprintf("%s: missing target file: %s", name, fd))
   349  				err = os.ErrNotExist
   350  			}
   351  			return nil, err
   352  		}
   353  		return &currentFile{name: name, fd: fd}, nil
   354  	}
   355  	tryCurrents := func(names []string) (*currentFile, error) {
   356  		var (
   357  			cur *currentFile
   358  			// Last corruption error.
   359  			lastCerr error
   360  		)
   361  		for _, name := range names {
   362  			var err error
   363  			cur, err = tryCurrent(name)
   364  			if err == nil {
   365  				break
   366  			} else if err == os.ErrNotExist {
   367  				// Fallback to the next file.
   368  			} else if isCorrupted(err) {
   369  				lastCerr = err
   370  				// Fallback to the next file.
   371  			} else {
   372  				// In case the error is due to permission, etc.
   373  				return nil, err
   374  			}
   375  		}
   376  		if cur == nil {
   377  			err := os.ErrNotExist
   378  			if lastCerr != nil {
   379  				err = lastCerr
   380  			}
   381  			return nil, err
   382  		}
   383  		return cur, nil
   384  	}
   385  
   386  	// Try 'pending rename' files.
   387  	var nums []int64
   388  	for _, name := range names {
   389  		if strings.HasPrefix(name, "CURRENT.") && name != "CURRENT.bak" {
   390  			i, err := strconv.ParseInt(name[8:], 10, 64)
   391  			if err == nil {
   392  				nums = append(nums, i)
   393  			}
   394  		}
   395  	}
   396  	var (
   397  		pendCur   *currentFile
   398  		pendErr   = os.ErrNotExist
   399  		pendNames []string
   400  	)
   401  	if len(nums) > 0 {
   402  		sort.Sort(sort.Reverse(int64Slice(nums)))
   403  		pendNames = make([]string, len(nums))
   404  		for i, num := range nums {
   405  			pendNames[i] = fmt.Sprintf("CURRENT.%d", num)
   406  		}
   407  		pendCur, pendErr = tryCurrents(pendNames)
   408  		if pendErr != nil && pendErr != os.ErrNotExist && !isCorrupted(pendErr) {
   409  			return FileDesc{}, pendErr
   410  		}
   411  	}
   412  
   413  	// Try CURRENT and CURRENT.bak.
   414  	curCur, curErr := tryCurrents([]string{"CURRENT", "CURRENT.bak"})
   415  	if curErr != nil && curErr != os.ErrNotExist && !isCorrupted(curErr) {
   416  		return FileDesc{}, curErr
   417  	}
   418  
   419  	// pendCur takes precedence, but guards against obsolete pendCur.
   420  	if pendCur != nil && (curCur == nil || pendCur.fd.Num > curCur.fd.Num) {
   421  		curCur = pendCur
   422  	}
   423  
   424  	if curCur != nil {
   425  		// Restore CURRENT file to proper state.
   426  		if !fs.readOnly && (curCur.name != "CURRENT" || len(pendNames) != 0) {
   427  			// Ignore setMeta errors, however don't delete obsolete files if we
   428  			// catch error.
   429  			if err := fs.setMeta(curCur.fd); err == nil {
   430  				// Remove 'pending rename' files.
   431  				for _, name := range pendNames {
   432  					if err := os.Remove(filepath.Join(fs.path, name)); err != nil {
   433  						fs.log(fmt.Sprintf("remove %s: %v", name, err))
   434  					}
   435  				}
   436  			}
   437  		}
   438  		return curCur.fd, nil
   439  	}
   440  
   441  	// Nothing found.
   442  	if isCorrupted(pendErr) {
   443  		return FileDesc{}, pendErr
   444  	}
   445  	return FileDesc{}, curErr
   446  }
   447  
   448  func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) {
   449  	fs.mu.Lock()
   450  	defer fs.mu.Unlock()
   451  	if fs.open < 0 {
   452  		return nil, ErrClosed
   453  	}
   454  	dir, err := os.Open(fs.path)
   455  	if err != nil {
   456  		return
   457  	}
   458  	names, err := dir.Readdirnames(0)
   459  	// Close the dir first before checking for Readdirnames error.
   460  	if cerr := dir.Close(); cerr != nil {
   461  		fs.log(fmt.Sprintf("close dir: %v", cerr))
   462  	}
   463  	if err == nil {
   464  		for _, name := range names {
   465  			if fd, ok := fsParseName(name); ok && fd.Type&ft != 0 {
   466  				fds = append(fds, fd)
   467  			}
   468  		}
   469  	}
   470  	return
   471  }
   472  
   473  func (fs *fileStorage) Open(fd FileDesc) (Reader, error) {
   474  	if !FileDescOk(fd) {
   475  		return nil, ErrInvalidFile
   476  	}
   477  
   478  	fs.mu.Lock()
   479  	defer fs.mu.Unlock()
   480  	if fs.open < 0 {
   481  		return nil, ErrClosed
   482  	}
   483  	of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_RDONLY, 0)
   484  	if err != nil {
   485  		if fsHasOldName(fd) && os.IsNotExist(err) {
   486  			of, err = os.OpenFile(filepath.Join(fs.path, fsGenOldName(fd)), os.O_RDONLY, 0)
   487  			if err == nil {
   488  				goto ok
   489  			}
   490  		}
   491  		return nil, err
   492  	}
   493  ok:
   494  	fs.open++
   495  	return &fileWrap{File: of, fs: fs, fd: fd}, nil
   496  }
   497  
   498  func (fs *fileStorage) Create(fd FileDesc) (Writer, error) {
   499  	if !FileDescOk(fd) {
   500  		return nil, ErrInvalidFile
   501  	}
   502  	if fs.readOnly {
   503  		return nil, errReadOnly
   504  	}
   505  
   506  	fs.mu.Lock()
   507  	defer fs.mu.Unlock()
   508  	if fs.open < 0 {
   509  		return nil, ErrClosed
   510  	}
   511  	of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
   512  	if err != nil {
   513  		return nil, err
   514  	}
   515  	fs.open++
   516  	return &fileWrap{File: of, fs: fs, fd: fd}, nil
   517  }
   518  
   519  func (fs *fileStorage) Remove(fd FileDesc) error {
   520  	if !FileDescOk(fd) {
   521  		return ErrInvalidFile
   522  	}
   523  	if fs.readOnly {
   524  		return errReadOnly
   525  	}
   526  
   527  	fs.mu.Lock()
   528  	defer fs.mu.Unlock()
   529  	if fs.open < 0 {
   530  		return ErrClosed
   531  	}
   532  	err := os.Remove(filepath.Join(fs.path, fsGenName(fd)))
   533  	if err != nil {
   534  		if fsHasOldName(fd) && os.IsNotExist(err) {
   535  			if e1 := os.Remove(filepath.Join(fs.path, fsGenOldName(fd))); !os.IsNotExist(e1) {
   536  				fs.log(fmt.Sprintf("remove %s: %v (old name)", fd, err))
   537  				err = e1
   538  			}
   539  		} else {
   540  			fs.log(fmt.Sprintf("remove %s: %v", fd, err))
   541  		}
   542  	}
   543  	return err
   544  }
   545  
   546  func (fs *fileStorage) Rename(oldfd, newfd FileDesc) error {
   547  	if !FileDescOk(oldfd) || !FileDescOk(newfd) {
   548  		return ErrInvalidFile
   549  	}
   550  	if oldfd == newfd {
   551  		return nil
   552  	}
   553  	if fs.readOnly {
   554  		return errReadOnly
   555  	}
   556  
   557  	fs.mu.Lock()
   558  	defer fs.mu.Unlock()
   559  	if fs.open < 0 {
   560  		return ErrClosed
   561  	}
   562  	return rename(filepath.Join(fs.path, fsGenName(oldfd)), filepath.Join(fs.path, fsGenName(newfd)))
   563  }
   564  
   565  func (fs *fileStorage) Close() error {
   566  	fs.mu.Lock()
   567  	defer fs.mu.Unlock()
   568  	if fs.open < 0 {
   569  		return ErrClosed
   570  	}
   571  	// Clear the finalizer.
   572  	runtime.SetFinalizer(fs, nil)
   573  
   574  	if fs.open > 0 {
   575  		fs.log(fmt.Sprintf("close: warning, %d files still open", fs.open))
   576  	}
   577  	fs.open = -1
   578  	if fs.logw != nil {
   579  		fs.logw.Close()
   580  	}
   581  	return fs.flock.release()
   582  }
   583  
   584  type fileWrap struct {
   585  	*os.File
   586  	fs     *fileStorage
   587  	fd     FileDesc
   588  	closed bool
   589  }
   590  
   591  func (fw *fileWrap) Sync() error {
   592  	if err := fw.File.Sync(); err != nil {
   593  		return err
   594  	}
   595  	if fw.fd.Type == TypeManifest {
   596  		// Also sync parent directory if file type is manifest.
   597  		// See: https://code.google.com/p/leveldb/issues/detail?id=190.
   598  		if err := syncDir(fw.fs.path); err != nil {
   599  			fw.fs.log(fmt.Sprintf("syncDir: %v", err))
   600  			return err
   601  		}
   602  	}
   603  	return nil
   604  }
   605  
   606  func (fw *fileWrap) Close() error {
   607  	fw.fs.mu.Lock()
   608  	defer fw.fs.mu.Unlock()
   609  	if fw.closed {
   610  		return ErrClosed
   611  	}
   612  	fw.closed = true
   613  	fw.fs.open--
   614  	err := fw.File.Close()
   615  	if err != nil {
   616  		fw.fs.log(fmt.Sprintf("close %s: %v", fw.fd, err))
   617  	}
   618  	return err
   619  }
   620  
   621  func fsGenName(fd FileDesc) string {
   622  	switch fd.Type {
   623  	case TypeManifest:
   624  		return fmt.Sprintf("MANIFEST-%06d", fd.Num)
   625  	case TypeJournal:
   626  		return fmt.Sprintf("%06d.log", fd.Num)
   627  	case TypeTable:
   628  		return fmt.Sprintf("%06d.ldb", fd.Num)
   629  	case TypeTemp:
   630  		return fmt.Sprintf("%06d.tmp", fd.Num)
   631  	default:
   632  		panic("invalid file type")
   633  	}
   634  }
   635  
   636  func fsHasOldName(fd FileDesc) bool {
   637  	return fd.Type == TypeTable
   638  }
   639  
   640  func fsGenOldName(fd FileDesc) string {
   641  	switch fd.Type {
   642  	case TypeTable:
   643  		return fmt.Sprintf("%06d.sst", fd.Num)
   644  	default:
   645  		return fsGenName(fd)
   646  	}
   647  }
   648  
   649  func fsParseName(name string) (fd FileDesc, ok bool) {
   650  	var tail string
   651  	_, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail)
   652  	if err == nil {
   653  		switch tail {
   654  		case "log":
   655  			fd.Type = TypeJournal
   656  		case "ldb", "sst":
   657  			fd.Type = TypeTable
   658  		case "tmp":
   659  			fd.Type = TypeTemp
   660  		default:
   661  			return
   662  		}
   663  		return fd, true
   664  	}
   665  	n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail)
   666  	if n == 1 {
   667  		fd.Type = TypeManifest
   668  		return fd, true
   669  	}
   670  	return
   671  }
   672  
   673  func fsParseNamePtr(name string, fd *FileDesc) bool {
   674  	_fd, ok := fsParseName(name)
   675  	if fd != nil {
   676  		*fd = _fd
   677  	}
   678  	return ok
   679  }
   680  

View as plain text