...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc/maintenance.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v3rpc
    16  
    17  import (
    18  	"context"
    19  	"crypto/sha256"
    20  	"io"
    21  	"time"
    22  
    23  	"github.com/dustin/go-humanize"
    24  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    25  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    26  	"go.etcd.io/etcd/api/v3/version"
    27  	"go.etcd.io/etcd/raft/v3"
    28  	"go.etcd.io/etcd/server/v3/auth"
    29  	"go.etcd.io/etcd/server/v3/etcdserver"
    30  	"go.etcd.io/etcd/server/v3/mvcc"
    31  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    32  
    33  	"go.uber.org/zap"
    34  )
    35  
    36  type KVGetter interface {
    37  	KV() mvcc.WatchableKV
    38  }
    39  
    40  type BackendGetter interface {
    41  	Backend() backend.Backend
    42  }
    43  
    44  type Alarmer interface {
    45  	// Alarms is implemented in Server interface located in etcdserver/server.go
    46  	// It returns a list of alarms present in the AlarmStore
    47  	Alarms() []*pb.AlarmMember
    48  	Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
    49  }
    50  
    51  type Downgrader interface {
    52  	Downgrade(ctx context.Context, dr *pb.DowngradeRequest) (*pb.DowngradeResponse, error)
    53  }
    54  
    55  type LeaderTransferrer interface {
    56  	MoveLeader(ctx context.Context, lead, target uint64) error
    57  }
    58  
    59  type AuthGetter interface {
    60  	AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
    61  	AuthStore() auth.AuthStore
    62  }
    63  
    64  type ClusterStatusGetter interface {
    65  	IsLearner() bool
    66  }
    67  
    68  type maintenanceServer struct {
    69  	lg     *zap.Logger
    70  	rg     etcdserver.RaftStatusGetter
    71  	hasher mvcc.HashStorage
    72  	kg     KVGetter
    73  	bg     BackendGetter
    74  	a      Alarmer
    75  	lt     LeaderTransferrer
    76  	hdr    header
    77  	cs     ClusterStatusGetter
    78  	d      Downgrader
    79  }
    80  
    81  func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
    82  	srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s}
    83  	if srv.lg == nil {
    84  		srv.lg = zap.NewNop()
    85  	}
    86  	return &authMaintenanceServer{srv, s}
    87  }
    88  
    89  func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
    90  	ms.lg.Info("starting defragment")
    91  	err := ms.bg.Backend().Defrag()
    92  	if err != nil {
    93  		ms.lg.Warn("failed to defragment", zap.Error(err))
    94  		return nil, err
    95  	}
    96  	ms.lg.Info("finished defragment")
    97  	return &pb.DefragmentResponse{}, nil
    98  }
    99  
   100  // big enough size to hold >1 OS pages in the buffer
   101  const snapshotSendBufferSize = 32 * 1024
   102  
   103  func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
   104  	snap := ms.bg.Backend().Snapshot()
   105  	pr, pw := io.Pipe()
   106  
   107  	defer pr.Close()
   108  
   109  	go func() {
   110  		snap.WriteTo(pw)
   111  		if err := snap.Close(); err != nil {
   112  			ms.lg.Warn("failed to close snapshot", zap.Error(err))
   113  		}
   114  		pw.Close()
   115  	}()
   116  
   117  	// record SHA digest of snapshot data
   118  	// used for integrity checks during snapshot restore operation
   119  	h := sha256.New()
   120  
   121  	sent := int64(0)
   122  	total := snap.Size()
   123  	size := humanize.Bytes(uint64(total))
   124  
   125  	start := time.Now()
   126  	ms.lg.Info("sending database snapshot to client",
   127  		zap.Int64("total-bytes", total),
   128  		zap.String("size", size),
   129  	)
   130  	for total-sent > 0 {
   131  		// buffer just holds read bytes from stream
   132  		// response size is multiple of OS page size, fetched in boltdb
   133  		// e.g. 4*1024
   134  		// NOTE: srv.Send does not wait until the message is received by the client.
   135  		// Therefore the buffer can not be safely reused between Send operations
   136  		buf := make([]byte, snapshotSendBufferSize)
   137  
   138  		n, err := io.ReadFull(pr, buf)
   139  		if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
   140  			return togRPCError(err)
   141  		}
   142  		sent += int64(n)
   143  
   144  		// if total is x * snapshotSendBufferSize. it is possible that
   145  		// resp.RemainingBytes == 0
   146  		// resp.Blob == zero byte but not nil
   147  		// does this make server response sent to client nil in proto
   148  		// and client stops receiving from snapshot stream before
   149  		// server sends snapshot SHA?
   150  		// No, the client will still receive non-nil response
   151  		// until server closes the stream with EOF
   152  		resp := &pb.SnapshotResponse{
   153  			RemainingBytes: uint64(total - sent),
   154  			Blob:           buf[:n],
   155  		}
   156  		if err = srv.Send(resp); err != nil {
   157  			return togRPCError(err)
   158  		}
   159  		h.Write(buf[:n])
   160  	}
   161  
   162  	// send SHA digest for integrity checks
   163  	// during snapshot restore operation
   164  	sha := h.Sum(nil)
   165  
   166  	ms.lg.Info("sending database sha256 checksum to client",
   167  		zap.Int64("total-bytes", total),
   168  		zap.Int("checksum-size", len(sha)),
   169  	)
   170  	hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
   171  	if err := srv.Send(hresp); err != nil {
   172  		return togRPCError(err)
   173  	}
   174  
   175  	ms.lg.Info("successfully sent database snapshot to client",
   176  		zap.Int64("total-bytes", total),
   177  		zap.String("size", size),
   178  		zap.String("took", humanize.Time(start)),
   179  	)
   180  	return nil
   181  }
   182  
   183  func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
   184  	h, rev, err := ms.hasher.Hash()
   185  	if err != nil {
   186  		return nil, togRPCError(err)
   187  	}
   188  	resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h}
   189  	ms.hdr.fill(resp.Header)
   190  	return resp, nil
   191  }
   192  
   193  func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
   194  	h, rev, err := ms.hasher.HashByRev(r.Revision)
   195  	if err != nil {
   196  		return nil, togRPCError(err)
   197  	}
   198  
   199  	resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: h.CompactRevision}
   200  	ms.hdr.fill(resp.Header)
   201  	return resp, nil
   202  }
   203  
   204  func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
   205  	resp, err := ms.a.Alarm(ctx, ar)
   206  	if err != nil {
   207  		return nil, togRPCError(err)
   208  	}
   209  	if resp.Header == nil {
   210  		resp.Header = &pb.ResponseHeader{}
   211  	}
   212  	ms.hdr.fill(resp.Header)
   213  	return resp, nil
   214  }
   215  
   216  func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
   217  	hdr := &pb.ResponseHeader{}
   218  	ms.hdr.fill(hdr)
   219  	resp := &pb.StatusResponse{
   220  		Header:           hdr,
   221  		Version:          version.Version,
   222  		Leader:           uint64(ms.rg.Leader()),
   223  		RaftIndex:        ms.rg.CommittedIndex(),
   224  		RaftAppliedIndex: ms.rg.AppliedIndex(),
   225  		RaftTerm:         ms.rg.Term(),
   226  		DbSize:           ms.bg.Backend().Size(),
   227  		DbSizeInUse:      ms.bg.Backend().SizeInUse(),
   228  		IsLearner:        ms.cs.IsLearner(),
   229  	}
   230  	if resp.Leader == raft.None {
   231  		resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error())
   232  	}
   233  	for _, a := range ms.a.Alarms() {
   234  		resp.Errors = append(resp.Errors, a.String())
   235  	}
   236  	return resp, nil
   237  }
   238  
   239  func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
   240  	if ms.rg.ID() != ms.rg.Leader() {
   241  		return nil, rpctypes.ErrGRPCNotLeader
   242  	}
   243  
   244  	if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
   245  		return nil, togRPCError(err)
   246  	}
   247  	return &pb.MoveLeaderResponse{}, nil
   248  }
   249  
   250  func (ms *maintenanceServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
   251  	resp, err := ms.d.Downgrade(ctx, r)
   252  	if err != nil {
   253  		return nil, togRPCError(err)
   254  	}
   255  	resp.Header = &pb.ResponseHeader{}
   256  	ms.hdr.fill(resp.Header)
   257  	return resp, nil
   258  }
   259  
   260  type authMaintenanceServer struct {
   261  	*maintenanceServer
   262  	ag AuthGetter
   263  }
   264  
   265  func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
   266  	authInfo, err := ams.ag.AuthInfoFromCtx(ctx)
   267  	if err != nil {
   268  		return err
   269  	}
   270  
   271  	return ams.ag.AuthStore().IsAdminPermitted(authInfo)
   272  }
   273  
   274  func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
   275  	if err := ams.isAuthenticated(ctx); err != nil {
   276  		return nil, err
   277  	}
   278  
   279  	return ams.maintenanceServer.Defragment(ctx, sr)
   280  }
   281  
   282  func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
   283  	if err := ams.isAuthenticated(srv.Context()); err != nil {
   284  		return err
   285  	}
   286  
   287  	return ams.maintenanceServer.Snapshot(sr, srv)
   288  }
   289  
   290  func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
   291  	if err := ams.isAuthenticated(ctx); err != nil {
   292  		return nil, err
   293  	}
   294  
   295  	return ams.maintenanceServer.Hash(ctx, r)
   296  }
   297  
   298  func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
   299  	if err := ams.isAuthenticated(ctx); err != nil {
   300  		return nil, err
   301  	}
   302  	return ams.maintenanceServer.HashKV(ctx, r)
   303  }
   304  
   305  func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
   306  	return ams.maintenanceServer.Status(ctx, ar)
   307  }
   308  
   309  func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
   310  	return ams.maintenanceServer.MoveLeader(ctx, tr)
   311  }
   312  
   313  func (ams *authMaintenanceServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
   314  	return ams.maintenanceServer.Downgrade(ctx, r)
   315  }
   316  

View as plain text