...

Source file src/go.etcd.io/etcd/server/v3/wal/wal.go

Documentation: go.etcd.io/etcd/server/v3/wal

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package wal
    16  
    17  import (
    18  	"bytes"
    19  	"errors"
    20  	"fmt"
    21  	"hash/crc32"
    22  	"io"
    23  	"os"
    24  	"path/filepath"
    25  	"strings"
    26  	"sync"
    27  	"time"
    28  
    29  	"go.etcd.io/etcd/client/pkg/v3/fileutil"
    30  	"go.etcd.io/etcd/pkg/v3/pbutil"
    31  	"go.etcd.io/etcd/raft/v3"
    32  	"go.etcd.io/etcd/raft/v3/raftpb"
    33  	"go.etcd.io/etcd/server/v3/wal/walpb"
    34  
    35  	"go.uber.org/zap"
    36  )
    37  
    38  const (
    39  	metadataType int64 = iota + 1
    40  	entryType
    41  	stateType
    42  	crcType
    43  	snapshotType
    44  
    45  	// warnSyncDuration is the amount of time allotted to an fsync before
    46  	// logging a warning
    47  	warnSyncDuration = time.Second
    48  )
    49  
    50  var (
    51  	// SegmentSizeBytes is the preallocated size of each wal segment file.
    52  	// The actual size might be larger than this. In general, the default
    53  	// value should be used, but this is defined as an exported variable
    54  	// so that tests can set a different segment size.
    55  	SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
    56  
    57  	ErrMetadataConflict = errors.New("wal: conflicting metadata found")
    58  	ErrFileNotFound     = errors.New("wal: file not found")
    59  	ErrCRCMismatch      = errors.New("wal: crc mismatch")
    60  	ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
    61  	ErrSnapshotNotFound = errors.New("wal: snapshot not found")
    62  	ErrSliceOutOfRange  = errors.New("wal: slice bounds out of range")
    63  	ErrDecoderNotFound  = errors.New("wal: decoder not found")
    64  	crcTable            = crc32.MakeTable(crc32.Castagnoli)
    65  )
    66  
    67  // WAL is a logical representation of the stable storage.
    68  // WAL is either in read mode or append mode but not both.
    69  // A newly created WAL is in append mode, and ready for appending records.
    70  // A just opened WAL is in read mode, and ready for reading records.
    71  // The WAL will be ready for appending after reading out all the previous records.
    72  type WAL struct {
    73  	lg *zap.Logger
    74  
    75  	dir string // the living directory of the underlay files
    76  
    77  	// dirFile is a fd for the wal directory for syncing on Rename
    78  	dirFile *os.File
    79  
    80  	metadata []byte           // metadata recorded at the head of each WAL
    81  	state    raftpb.HardState // hardstate recorded at the head of WAL
    82  
    83  	start     walpb.Snapshot // snapshot to start reading
    84  	decoder   *decoder       // decoder to decode records
    85  	readClose func() error   // closer for decode reader
    86  
    87  	unsafeNoSync bool // if set, do not fsync
    88  
    89  	mu      sync.Mutex
    90  	enti    uint64   // index of the last entry saved to the wal
    91  	encoder *encoder // encoder to encode records
    92  
    93  	locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
    94  	fp    *filePipeline
    95  }
    96  
    97  // Create creates a WAL ready for appending records. The given metadata is
    98  // recorded at the head of each WAL file, and can be retrieved with ReadAll
    99  // after the file is Open.
   100  func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
   101  	if Exist(dirpath) {
   102  		return nil, os.ErrExist
   103  	}
   104  
   105  	if lg == nil {
   106  		lg = zap.NewNop()
   107  	}
   108  
   109  	// keep temporary wal directory so WAL initialization appears atomic
   110  	tmpdirpath := filepath.Clean(dirpath) + ".tmp"
   111  	if fileutil.Exist(tmpdirpath) {
   112  		if err := os.RemoveAll(tmpdirpath); err != nil {
   113  			return nil, err
   114  		}
   115  	}
   116  	defer os.RemoveAll(tmpdirpath)
   117  
   118  	if err := fileutil.CreateDirAll(lg, tmpdirpath); err != nil {
   119  		lg.Warn(
   120  			"failed to create a temporary WAL directory",
   121  			zap.String("tmp-dir-path", tmpdirpath),
   122  			zap.String("dir-path", dirpath),
   123  			zap.Error(err),
   124  		)
   125  		return nil, err
   126  	}
   127  
   128  	p := filepath.Join(tmpdirpath, walName(0, 0))
   129  	f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
   130  	if err != nil {
   131  		lg.Warn(
   132  			"failed to flock an initial WAL file",
   133  			zap.String("path", p),
   134  			zap.Error(err),
   135  		)
   136  		return nil, err
   137  	}
   138  	if _, err = f.Seek(0, io.SeekEnd); err != nil {
   139  		lg.Warn(
   140  			"failed to seek an initial WAL file",
   141  			zap.String("path", p),
   142  			zap.Error(err),
   143  		)
   144  		return nil, err
   145  	}
   146  	if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
   147  		lg.Warn(
   148  			"failed to preallocate an initial WAL file",
   149  			zap.String("path", p),
   150  			zap.Int64("segment-bytes", SegmentSizeBytes),
   151  			zap.Error(err),
   152  		)
   153  		return nil, err
   154  	}
   155  
   156  	w := &WAL{
   157  		lg:       lg,
   158  		dir:      dirpath,
   159  		metadata: metadata,
   160  	}
   161  	w.encoder, err = newFileEncoder(f.File, 0)
   162  	if err != nil {
   163  		return nil, err
   164  	}
   165  	w.locks = append(w.locks, f)
   166  	if err = w.saveCrc(0); err != nil {
   167  		return nil, err
   168  	}
   169  	if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
   170  		return nil, err
   171  	}
   172  	if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
   173  		return nil, err
   174  	}
   175  
   176  	logDirPath := w.dir
   177  	if w, err = w.renameWAL(tmpdirpath); err != nil {
   178  		lg.Warn(
   179  			"failed to rename the temporary WAL directory",
   180  			zap.String("tmp-dir-path", tmpdirpath),
   181  			zap.String("dir-path", logDirPath),
   182  			zap.Error(err),
   183  		)
   184  		return nil, err
   185  	}
   186  
   187  	var perr error
   188  	defer func() {
   189  		if perr != nil {
   190  			w.cleanupWAL(lg)
   191  		}
   192  	}()
   193  
   194  	// directory was renamed; sync parent dir to persist rename
   195  	pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
   196  	if perr != nil {
   197  		lg.Warn(
   198  			"failed to open the parent data directory",
   199  			zap.String("parent-dir-path", filepath.Dir(w.dir)),
   200  			zap.String("dir-path", w.dir),
   201  			zap.Error(perr),
   202  		)
   203  		return nil, perr
   204  	}
   205  	dirCloser := func() error {
   206  		if perr = pdir.Close(); perr != nil {
   207  			lg.Warn(
   208  				"failed to close the parent data directory file",
   209  				zap.String("parent-dir-path", filepath.Dir(w.dir)),
   210  				zap.String("dir-path", w.dir),
   211  				zap.Error(perr),
   212  			)
   213  			return perr
   214  		}
   215  		return nil
   216  	}
   217  	start := time.Now()
   218  	if perr = fileutil.Fsync(pdir); perr != nil {
   219  		dirCloser()
   220  		lg.Warn(
   221  			"failed to fsync the parent data directory file",
   222  			zap.String("parent-dir-path", filepath.Dir(w.dir)),
   223  			zap.String("dir-path", w.dir),
   224  			zap.Error(perr),
   225  		)
   226  		return nil, perr
   227  	}
   228  	walFsyncSec.Observe(time.Since(start).Seconds())
   229  	if err = dirCloser(); err != nil {
   230  		return nil, err
   231  	}
   232  
   233  	return w, nil
   234  }
   235  
   236  func (w *WAL) SetUnsafeNoFsync() {
   237  	w.unsafeNoSync = true
   238  }
   239  
   240  func (w *WAL) cleanupWAL(lg *zap.Logger) {
   241  	var err error
   242  	if err = w.Close(); err != nil {
   243  		lg.Panic("failed to close WAL during cleanup", zap.Error(err))
   244  	}
   245  	brokenDirName := fmt.Sprintf("%s.broken.%v", w.dir, time.Now().Format("20060102.150405.999999"))
   246  	if err = os.Rename(w.dir, brokenDirName); err != nil {
   247  		lg.Panic(
   248  			"failed to rename WAL during cleanup",
   249  			zap.Error(err),
   250  			zap.String("source-path", w.dir),
   251  			zap.String("rename-path", brokenDirName),
   252  		)
   253  	}
   254  }
   255  
   256  func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
   257  	if err := os.RemoveAll(w.dir); err != nil {
   258  		return nil, err
   259  	}
   260  	// On non-Windows platforms, hold the lock while renaming. Releasing
   261  	// the lock and trying to reacquire it quickly can be flaky because
   262  	// it's possible the process will fork to spawn a process while this is
   263  	// happening. The fds are set up as close-on-exec by the Go runtime,
   264  	// but there is a window between the fork and the exec where another
   265  	// process holds the lock.
   266  	if err := os.Rename(tmpdirpath, w.dir); err != nil {
   267  		if _, ok := err.(*os.LinkError); ok {
   268  			return w.renameWALUnlock(tmpdirpath)
   269  		}
   270  		return nil, err
   271  	}
   272  	w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
   273  	df, err := fileutil.OpenDir(w.dir)
   274  	w.dirFile = df
   275  	return w, err
   276  }
   277  
   278  func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
   279  	// rename of directory with locked files doesn't work on windows/cifs;
   280  	// close the WAL to release the locks so the directory can be renamed.
   281  	w.lg.Info(
   282  		"closing WAL to release flock and retry directory renaming",
   283  		zap.String("from", tmpdirpath),
   284  		zap.String("to", w.dir),
   285  	)
   286  	w.Close()
   287  
   288  	if err := os.Rename(tmpdirpath, w.dir); err != nil {
   289  		return nil, err
   290  	}
   291  
   292  	// reopen and relock
   293  	newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
   294  	if oerr != nil {
   295  		return nil, oerr
   296  	}
   297  	if _, _, _, err := newWAL.ReadAll(); err != nil {
   298  		newWAL.Close()
   299  		return nil, err
   300  	}
   301  	return newWAL, nil
   302  }
   303  
   304  // Open opens the WAL at the given snap.
   305  // The snap SHOULD have been previously saved to the WAL, or the following
   306  // ReadAll will fail.
   307  // The returned WAL is ready to read and the first record will be the one after
   308  // the given snap. The WAL cannot be appended to before reading out all of its
   309  // previous records.
   310  func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
   311  	w, err := openAtIndex(lg, dirpath, snap, true)
   312  	if err != nil {
   313  		return nil, err
   314  	}
   315  	if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
   316  		return nil, err
   317  	}
   318  	return w, nil
   319  }
   320  
   321  // OpenForRead only opens the wal files for read.
   322  // Write on a read only wal panics.
   323  func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
   324  	return openAtIndex(lg, dirpath, snap, false)
   325  }
   326  
   327  func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
   328  	if lg == nil {
   329  		lg = zap.NewNop()
   330  	}
   331  	names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
   332  	if err != nil {
   333  		return nil, err
   334  	}
   335  
   336  	rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
   337  	if err != nil {
   338  		return nil, err
   339  	}
   340  
   341  	// create a WAL ready for reading
   342  	w := &WAL{
   343  		lg:        lg,
   344  		dir:       dirpath,
   345  		start:     snap,
   346  		decoder:   newDecoder(rs...),
   347  		readClose: closer,
   348  		locks:     ls,
   349  	}
   350  
   351  	if write {
   352  		// write reuses the file descriptors from read; don't close so
   353  		// WAL can append without dropping the file lock
   354  		w.readClose = nil
   355  		if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
   356  			closer()
   357  			return nil, err
   358  		}
   359  		w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
   360  	}
   361  
   362  	return w, nil
   363  }
   364  
   365  func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
   366  	names, err := readWALNames(lg, dirpath)
   367  	if err != nil {
   368  		return nil, -1, err
   369  	}
   370  
   371  	nameIndex, ok := searchIndex(lg, names, snap.Index)
   372  	if !ok || !isValidSeq(lg, names[nameIndex:]) {
   373  		err = ErrFileNotFound
   374  		return nil, -1, err
   375  	}
   376  
   377  	return names, nameIndex, nil
   378  }
   379  
   380  func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) {
   381  	rcs := make([]io.ReadCloser, 0)
   382  	rs := make([]fileutil.FileReader, 0)
   383  	ls := make([]*fileutil.LockedFile, 0)
   384  	for _, name := range names[nameIndex:] {
   385  		p := filepath.Join(dirpath, name)
   386  		var f *os.File
   387  		if write {
   388  			l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
   389  			if err != nil {
   390  				closeAll(lg, rcs...)
   391  				return nil, nil, nil, err
   392  			}
   393  			ls = append(ls, l)
   394  			rcs = append(rcs, l)
   395  			f = l.File
   396  		} else {
   397  			rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
   398  			if err != nil {
   399  				closeAll(lg, rcs...)
   400  				return nil, nil, nil, err
   401  			}
   402  			ls = append(ls, nil)
   403  			rcs = append(rcs, rf)
   404  			f = rf
   405  		}
   406  		fileReader := fileutil.NewFileReader(f)
   407  		rs = append(rs, fileReader)
   408  	}
   409  
   410  	closer := func() error { return closeAll(lg, rcs...) }
   411  
   412  	return rs, ls, closer, nil
   413  }
   414  
   415  // ReadAll reads out records of the current WAL.
   416  // If opened in write mode, it must read out all records until EOF. Or an error
   417  // will be returned.
   418  // If opened in read mode, it will try to read all records if possible.
   419  // If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
   420  // If loaded snap doesn't match with the expected one, it will return
   421  // all the records and error ErrSnapshotMismatch.
   422  // TODO: detect not-last-snap error.
   423  // TODO: maybe loose the checking of match.
   424  // After ReadAll, the WAL will be ready for appending new records.
   425  //
   426  // ReadAll suppresses WAL entries that got overridden (i.e. a newer entry with the same index
   427  // exists in the log). Such a situation can happen in cases described in figure 7. of the
   428  // RAFT paper (http://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14.pdf).
   429  //
   430  // ReadAll may return uncommitted yet entries, that are subject to be overriden.
   431  // Do not apply entries that have index > state.commit, as they are subject to change.
   432  func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
   433  	w.mu.Lock()
   434  	defer w.mu.Unlock()
   435  
   436  	rec := &walpb.Record{}
   437  
   438  	if w.decoder == nil {
   439  		return nil, state, nil, ErrDecoderNotFound
   440  	}
   441  	decoder := w.decoder
   442  
   443  	var match bool
   444  	for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
   445  		switch rec.Type {
   446  		case entryType:
   447  			e := mustUnmarshalEntry(rec.Data)
   448  			// 0 <= e.Index-w.start.Index - 1 < len(ents)
   449  			if e.Index > w.start.Index {
   450  				// prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0"
   451  				up := e.Index - w.start.Index - 1
   452  				if up > uint64(len(ents)) {
   453  					// return error before append call causes runtime panic
   454  					return nil, state, nil, ErrSliceOutOfRange
   455  				}
   456  				// The line below is potentially overriding some 'uncommitted' entries.
   457  				ents = append(ents[:up], e)
   458  			}
   459  			w.enti = e.Index
   460  
   461  		case stateType:
   462  			state = mustUnmarshalState(rec.Data)
   463  
   464  		case metadataType:
   465  			if metadata != nil && !bytes.Equal(metadata, rec.Data) {
   466  				state.Reset()
   467  				return nil, state, nil, ErrMetadataConflict
   468  			}
   469  			metadata = rec.Data
   470  
   471  		case crcType:
   472  			crc := decoder.crc.Sum32()
   473  			// current crc of decoder must match the crc of the record.
   474  			// do no need to match 0 crc, since the decoder is a new one at this case.
   475  			if crc != 0 && rec.Validate(crc) != nil {
   476  				state.Reset()
   477  				return nil, state, nil, ErrCRCMismatch
   478  			}
   479  			decoder.updateCRC(rec.Crc)
   480  
   481  		case snapshotType:
   482  			var snap walpb.Snapshot
   483  			pbutil.MustUnmarshal(&snap, rec.Data)
   484  			if snap.Index == w.start.Index {
   485  				if snap.Term != w.start.Term {
   486  					state.Reset()
   487  					return nil, state, nil, ErrSnapshotMismatch
   488  				}
   489  				match = true
   490  			}
   491  
   492  		default:
   493  			state.Reset()
   494  			return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
   495  		}
   496  	}
   497  
   498  	switch w.tail() {
   499  	case nil:
   500  		// We do not have to read out all entries in read mode.
   501  		// The last record maybe a partial written one, so
   502  		// ErrunexpectedEOF might be returned.
   503  		if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
   504  			state.Reset()
   505  			return nil, state, nil, err
   506  		}
   507  	default:
   508  		// We must read all the entries if WAL is opened in write mode.
   509  		if !errors.Is(err, io.EOF) {
   510  			state.Reset()
   511  			return nil, state, nil, err
   512  		}
   513  		// decodeRecord() will return io.EOF if it detects a zero record,
   514  		// but this zero record may be followed by non-zero records from
   515  		// a torn write. Overwriting some of these non-zero records, but
   516  		// not all, will cause CRC errors on WAL open. Since the records
   517  		// were never fully synced to disk in the first place, it's safe
   518  		// to zero them out to avoid any CRC errors from new writes.
   519  		if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
   520  			return nil, state, nil, err
   521  		}
   522  		if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
   523  			return nil, state, nil, err
   524  		}
   525  	}
   526  
   527  	err = nil
   528  	if !match {
   529  		err = ErrSnapshotNotFound
   530  	}
   531  
   532  	// close decoder, disable reading
   533  	if w.readClose != nil {
   534  		w.readClose()
   535  		w.readClose = nil
   536  	}
   537  	w.start = walpb.Snapshot{}
   538  
   539  	w.metadata = metadata
   540  
   541  	if w.tail() != nil {
   542  		// create encoder (chain crc with the decoder), enable appending
   543  		w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
   544  		if err != nil {
   545  			return
   546  		}
   547  	}
   548  	w.decoder = nil
   549  
   550  	return metadata, state, ents, err
   551  }
   552  
   553  // ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory.
   554  // Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate.
   555  func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) {
   556  	var snaps []walpb.Snapshot
   557  	var state raftpb.HardState
   558  	var err error
   559  
   560  	rec := &walpb.Record{}
   561  	names, err := readWALNames(lg, walDir)
   562  	if err != nil {
   563  		return nil, err
   564  	}
   565  
   566  	// open wal files in read mode, so that there is no conflict
   567  	// when the same WAL is opened elsewhere in write mode
   568  	rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false)
   569  	if err != nil {
   570  		return nil, err
   571  	}
   572  	defer func() {
   573  		if closer != nil {
   574  			closer()
   575  		}
   576  	}()
   577  
   578  	// create a new decoder from the readers on the WAL files
   579  	decoder := newDecoder(rs...)
   580  
   581  	for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
   582  		switch rec.Type {
   583  		case snapshotType:
   584  			var loadedSnap walpb.Snapshot
   585  			pbutil.MustUnmarshal(&loadedSnap, rec.Data)
   586  			snaps = append(snaps, loadedSnap)
   587  		case stateType:
   588  			state = mustUnmarshalState(rec.Data)
   589  		case crcType:
   590  			crc := decoder.crc.Sum32()
   591  			// current crc of decoder must match the crc of the record.
   592  			// do no need to match 0 crc, since the decoder is a new one at this case.
   593  			if crc != 0 && rec.Validate(crc) != nil {
   594  				return nil, ErrCRCMismatch
   595  			}
   596  			decoder.updateCRC(rec.Crc)
   597  		}
   598  	}
   599  	// We do not have to read out all the WAL entries
   600  	// as the decoder is opened in read mode.
   601  	if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
   602  		return nil, err
   603  	}
   604  
   605  	// filter out any snaps that are newer than the committed hardstate
   606  	n := 0
   607  	for _, s := range snaps {
   608  		if s.Index <= state.Commit {
   609  			snaps[n] = s
   610  			n++
   611  		}
   612  	}
   613  	snaps = snaps[:n:n]
   614  	return snaps, nil
   615  }
   616  
   617  // Verify reads through the given WAL and verifies that it is not corrupted.
   618  // It creates a new decoder to read through the records of the given WAL.
   619  // It does not conflict with any open WAL, but it is recommended not to
   620  // call this function after opening the WAL for writing.
   621  // If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
   622  // If the loaded snap doesn't match with the expected one, it will
   623  // return error ErrSnapshotMismatch.
   624  func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardState, error) {
   625  	var metadata []byte
   626  	var err error
   627  	var match bool
   628  	var state raftpb.HardState
   629  
   630  	rec := &walpb.Record{}
   631  
   632  	if lg == nil {
   633  		lg = zap.NewNop()
   634  	}
   635  	names, nameIndex, err := selectWALFiles(lg, walDir, snap)
   636  	if err != nil {
   637  		return nil, err
   638  	}
   639  
   640  	// open wal files in read mode, so that there is no conflict
   641  	// when the same WAL is opened elsewhere in write mode
   642  	rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false)
   643  	if err != nil {
   644  		return nil, err
   645  	}
   646  	defer func() {
   647  		if closer != nil {
   648  			closer()
   649  		}
   650  	}()
   651  
   652  	// create a new decoder from the readers on the WAL files
   653  	decoder := newDecoder(rs...)
   654  
   655  	for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
   656  		switch rec.Type {
   657  		case metadataType:
   658  			if metadata != nil && !bytes.Equal(metadata, rec.Data) {
   659  				return nil, ErrMetadataConflict
   660  			}
   661  			metadata = rec.Data
   662  		case crcType:
   663  			crc := decoder.crc.Sum32()
   664  			// Current crc of decoder must match the crc of the record.
   665  			// We need not match 0 crc, since the decoder is a new one at this point.
   666  			if crc != 0 && rec.Validate(crc) != nil {
   667  				return nil, ErrCRCMismatch
   668  			}
   669  			decoder.updateCRC(rec.Crc)
   670  		case snapshotType:
   671  			var loadedSnap walpb.Snapshot
   672  			pbutil.MustUnmarshal(&loadedSnap, rec.Data)
   673  			if loadedSnap.Index == snap.Index {
   674  				if loadedSnap.Term != snap.Term {
   675  					return nil, ErrSnapshotMismatch
   676  				}
   677  				match = true
   678  			}
   679  		// We ignore all entry and state type records as these
   680  		// are not necessary for validating the WAL contents
   681  		case entryType:
   682  		case stateType:
   683  			pbutil.MustUnmarshal(&state, rec.Data)
   684  		default:
   685  			return nil, fmt.Errorf("unexpected block type %d", rec.Type)
   686  		}
   687  	}
   688  
   689  	// We do not have to read out all the WAL entries
   690  	// as the decoder is opened in read mode.
   691  	if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
   692  		return nil, err
   693  	}
   694  
   695  	if !match {
   696  		return nil, ErrSnapshotNotFound
   697  	}
   698  
   699  	return &state, nil
   700  }
   701  
   702  // cut closes current file written and creates a new one ready to append.
   703  // cut first creates a temp wal file and writes necessary headers into it.
   704  // Then cut atomically rename temp wal file to a wal file.
   705  func (w *WAL) cut() error {
   706  	// close old wal file; truncate to avoid wasting space if an early cut
   707  	off, serr := w.tail().Seek(0, io.SeekCurrent)
   708  	if serr != nil {
   709  		return serr
   710  	}
   711  
   712  	if err := w.tail().Truncate(off); err != nil {
   713  		return err
   714  	}
   715  
   716  	if err := w.sync(); err != nil {
   717  		return err
   718  	}
   719  
   720  	fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
   721  
   722  	// create a temp wal file with name sequence + 1, or truncate the existing one
   723  	newTail, err := w.fp.Open()
   724  	if err != nil {
   725  		return err
   726  	}
   727  
   728  	// update writer and save the previous crc
   729  	w.locks = append(w.locks, newTail)
   730  	prevCrc := w.encoder.crc.Sum32()
   731  	w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
   732  	if err != nil {
   733  		return err
   734  	}
   735  
   736  	if err = w.saveCrc(prevCrc); err != nil {
   737  		return err
   738  	}
   739  
   740  	if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
   741  		return err
   742  	}
   743  
   744  	if err = w.saveState(&w.state); err != nil {
   745  		return err
   746  	}
   747  
   748  	// atomically move temp wal file to wal file
   749  	if err = w.sync(); err != nil {
   750  		return err
   751  	}
   752  
   753  	off, err = w.tail().Seek(0, io.SeekCurrent)
   754  	if err != nil {
   755  		return err
   756  	}
   757  
   758  	if err = os.Rename(newTail.Name(), fpath); err != nil {
   759  		return err
   760  	}
   761  	start := time.Now()
   762  	if err = fileutil.Fsync(w.dirFile); err != nil {
   763  		return err
   764  	}
   765  	walFsyncSec.Observe(time.Since(start).Seconds())
   766  
   767  	// reopen newTail with its new path so calls to Name() match the wal filename format
   768  	newTail.Close()
   769  
   770  	if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
   771  		return err
   772  	}
   773  	if _, err = newTail.Seek(off, io.SeekStart); err != nil {
   774  		return err
   775  	}
   776  
   777  	w.locks[len(w.locks)-1] = newTail
   778  
   779  	prevCrc = w.encoder.crc.Sum32()
   780  	w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
   781  	if err != nil {
   782  		return err
   783  	}
   784  
   785  	w.lg.Info("created a new WAL segment", zap.String("path", fpath))
   786  	return nil
   787  }
   788  
   789  func (w *WAL) sync() error {
   790  	if w.encoder != nil {
   791  		if err := w.encoder.flush(); err != nil {
   792  			return err
   793  		}
   794  	}
   795  
   796  	if w.unsafeNoSync {
   797  		return nil
   798  	}
   799  
   800  	start := time.Now()
   801  	err := fileutil.Fdatasync(w.tail().File)
   802  
   803  	took := time.Since(start)
   804  	if took > warnSyncDuration {
   805  		w.lg.Warn(
   806  			"slow fdatasync",
   807  			zap.Duration("took", took),
   808  			zap.Duration("expected-duration", warnSyncDuration),
   809  		)
   810  	}
   811  	walFsyncSec.Observe(took.Seconds())
   812  
   813  	return err
   814  }
   815  
   816  func (w *WAL) Sync() error {
   817  	return w.sync()
   818  }
   819  
   820  // ReleaseLockTo releases the locks, which has smaller index than the given index
   821  // except the largest one among them.
   822  // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
   823  // lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
   824  func (w *WAL) ReleaseLockTo(index uint64) error {
   825  	w.mu.Lock()
   826  	defer w.mu.Unlock()
   827  
   828  	if len(w.locks) == 0 {
   829  		return nil
   830  	}
   831  
   832  	var smaller int
   833  	found := false
   834  	for i, l := range w.locks {
   835  		_, lockIndex, err := parseWALName(filepath.Base(l.Name()))
   836  		if err != nil {
   837  			return err
   838  		}
   839  		if lockIndex >= index {
   840  			smaller = i - 1
   841  			found = true
   842  			break
   843  		}
   844  	}
   845  
   846  	// if no lock index is greater than the release index, we can
   847  	// release lock up to the last one(excluding).
   848  	if !found {
   849  		smaller = len(w.locks) - 1
   850  	}
   851  
   852  	if smaller <= 0 {
   853  		return nil
   854  	}
   855  
   856  	for i := 0; i < smaller; i++ {
   857  		if w.locks[i] == nil {
   858  			continue
   859  		}
   860  		w.locks[i].Close()
   861  	}
   862  	w.locks = w.locks[smaller:]
   863  
   864  	return nil
   865  }
   866  
   867  // Close closes the current WAL file and directory.
   868  func (w *WAL) Close() error {
   869  	w.mu.Lock()
   870  	defer w.mu.Unlock()
   871  
   872  	if w.fp != nil {
   873  		w.fp.Close()
   874  		w.fp = nil
   875  	}
   876  
   877  	if w.tail() != nil {
   878  		if err := w.sync(); err != nil {
   879  			return err
   880  		}
   881  	}
   882  	for _, l := range w.locks {
   883  		if l == nil {
   884  			continue
   885  		}
   886  		if err := l.Close(); err != nil {
   887  			w.lg.Error("failed to close WAL", zap.Error(err))
   888  		}
   889  	}
   890  
   891  	return w.dirFile.Close()
   892  }
   893  
   894  func (w *WAL) saveEntry(e *raftpb.Entry) error {
   895  	// TODO: add MustMarshalTo to reduce one allocation.
   896  	b := pbutil.MustMarshal(e)
   897  	rec := &walpb.Record{Type: entryType, Data: b}
   898  	if err := w.encoder.encode(rec); err != nil {
   899  		return err
   900  	}
   901  	w.enti = e.Index
   902  	return nil
   903  }
   904  
   905  func (w *WAL) saveState(s *raftpb.HardState) error {
   906  	if raft.IsEmptyHardState(*s) {
   907  		return nil
   908  	}
   909  	w.state = *s
   910  	b := pbutil.MustMarshal(s)
   911  	rec := &walpb.Record{Type: stateType, Data: b}
   912  	return w.encoder.encode(rec)
   913  }
   914  
   915  func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
   916  	w.mu.Lock()
   917  	defer w.mu.Unlock()
   918  
   919  	// short cut, do not call sync
   920  	if raft.IsEmptyHardState(st) && len(ents) == 0 {
   921  		return nil
   922  	}
   923  
   924  	mustSync := raft.MustSync(st, w.state, len(ents))
   925  
   926  	// TODO(xiangli): no more reference operator
   927  	for i := range ents {
   928  		if err := w.saveEntry(&ents[i]); err != nil {
   929  			return err
   930  		}
   931  	}
   932  	if err := w.saveState(&st); err != nil {
   933  		return err
   934  	}
   935  
   936  	curOff, err := w.tail().Seek(0, io.SeekCurrent)
   937  	if err != nil {
   938  		return err
   939  	}
   940  	if curOff < SegmentSizeBytes {
   941  		if mustSync {
   942  			// gofail: var walBeforeSync struct{}
   943  			err = w.sync()
   944  			// gofail: var walAfterSync struct{}
   945  			return err
   946  		}
   947  		return nil
   948  	}
   949  
   950  	return w.cut()
   951  }
   952  
   953  func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
   954  	if err := walpb.ValidateSnapshotForWrite(&e); err != nil {
   955  		return err
   956  	}
   957  
   958  	b := pbutil.MustMarshal(&e)
   959  
   960  	w.mu.Lock()
   961  	defer w.mu.Unlock()
   962  
   963  	rec := &walpb.Record{Type: snapshotType, Data: b}
   964  	if err := w.encoder.encode(rec); err != nil {
   965  		return err
   966  	}
   967  	// update enti only when snapshot is ahead of last index
   968  	if w.enti < e.Index {
   969  		w.enti = e.Index
   970  	}
   971  	return w.sync()
   972  }
   973  
   974  func (w *WAL) saveCrc(prevCrc uint32) error {
   975  	return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
   976  }
   977  
   978  func (w *WAL) tail() *fileutil.LockedFile {
   979  	if len(w.locks) > 0 {
   980  		return w.locks[len(w.locks)-1]
   981  	}
   982  	return nil
   983  }
   984  
   985  func (w *WAL) seq() uint64 {
   986  	t := w.tail()
   987  	if t == nil {
   988  		return 0
   989  	}
   990  	seq, _, err := parseWALName(filepath.Base(t.Name()))
   991  	if err != nil {
   992  		w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
   993  	}
   994  	return seq
   995  }
   996  
   997  func closeAll(lg *zap.Logger, rcs ...io.ReadCloser) error {
   998  	stringArr := make([]string, 0)
   999  	for _, f := range rcs {
  1000  		if err := f.Close(); err != nil {
  1001  			lg.Warn("failed to close: ", zap.Error(err))
  1002  			stringArr = append(stringArr, err.Error())
  1003  		}
  1004  	}
  1005  	if len(stringArr) == 0 {
  1006  		return nil
  1007  	}
  1008  	return errors.New(strings.Join(stringArr, ", "))
  1009  }
  1010  

View as plain text