...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/backend.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2017 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"fmt"
    19  	"os"
    20  	"time"
    21  
    22  	"go.etcd.io/etcd/raft/v3/raftpb"
    23  	"go.etcd.io/etcd/server/v3/config"
    24  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
    25  	"go.etcd.io/etcd/server/v3/etcdserver/cindex"
    26  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    27  
    28  	"go.uber.org/zap"
    29  )
    30  
    31  func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
    32  	bcfg := backend.DefaultBackendConfig()
    33  	bcfg.Path = cfg.BackendPath()
    34  	bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
    35  	if cfg.BackendBatchLimit != 0 {
    36  		bcfg.BatchLimit = cfg.BackendBatchLimit
    37  		if cfg.Logger != nil {
    38  			cfg.Logger.Info("setting backend batch limit", zap.Int("batch limit", cfg.BackendBatchLimit))
    39  		}
    40  	}
    41  	if cfg.BackendBatchInterval != 0 {
    42  		bcfg.BatchInterval = cfg.BackendBatchInterval
    43  		if cfg.Logger != nil {
    44  			cfg.Logger.Info("setting backend batch interval", zap.Duration("batch interval", cfg.BackendBatchInterval))
    45  		}
    46  	}
    47  	bcfg.BackendFreelistType = cfg.BackendFreelistType
    48  	bcfg.Logger = cfg.Logger
    49  	if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
    50  		// permit 10% excess over quota for disarm
    51  		bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
    52  	}
    53  	bcfg.Mlock = cfg.ExperimentalMemoryMlock
    54  	bcfg.Hooks = hooks
    55  	return backend.New(bcfg)
    56  }
    57  
    58  // openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
    59  func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) {
    60  	snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
    61  	if err != nil {
    62  		return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
    63  	}
    64  	if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
    65  		return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
    66  	}
    67  	return openBackend(cfg, hooks), nil
    68  }
    69  
    70  // openBackend returns a backend using the current etcd db.
    71  func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
    72  	fn := cfg.BackendPath()
    73  
    74  	now, beOpened := time.Now(), make(chan backend.Backend)
    75  	go func() {
    76  		beOpened <- newBackend(cfg, hooks)
    77  	}()
    78  
    79  	select {
    80  	case be := <-beOpened:
    81  		cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now)))
    82  		return be
    83  
    84  	case <-time.After(10 * time.Second):
    85  		cfg.Logger.Info(
    86  			"db file is flocked by another process, or taking too long",
    87  			zap.String("path", fn),
    88  			zap.Duration("took", time.Since(now)),
    89  		)
    90  	}
    91  
    92  	return <-beOpened
    93  }
    94  
    95  // recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
    96  // before updating the backend db after persisting raft snapshot to disk,
    97  // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
    98  // case, replace the db with the snapshot db sent by the leader.
    99  func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
   100  	consistentIndex := uint64(0)
   101  	if beExist {
   102  		consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx())
   103  	}
   104  	if snapshot.Metadata.Index <= consistentIndex {
   105  		return oldbe, nil
   106  	}
   107  	oldbe.Close()
   108  	return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
   109  }
   110  

View as plain text