...

Source file src/go.etcd.io/etcd/client/v3/maintenance.go

Documentation: go.etcd.io/etcd/client/v3

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package clientv3
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"io"
    21  
    22  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    23  	"go.uber.org/zap"
    24  	"google.golang.org/grpc"
    25  )
    26  
    27  type (
    28  	DefragmentResponse pb.DefragmentResponse
    29  	AlarmResponse      pb.AlarmResponse
    30  	AlarmMember        pb.AlarmMember
    31  	StatusResponse     pb.StatusResponse
    32  	HashKVResponse     pb.HashKVResponse
    33  	MoveLeaderResponse pb.MoveLeaderResponse
    34  )
    35  
    36  type Maintenance interface {
    37  	// AlarmList gets all active alarms.
    38  	AlarmList(ctx context.Context) (*AlarmResponse, error)
    39  
    40  	// AlarmDisarm disarms a given alarm.
    41  	AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
    42  
    43  	// Defragment releases wasted space from internal fragmentation on a given etcd member.
    44  	// Defragment is only needed when deleting a large number of keys and want to reclaim
    45  	// the resources.
    46  	// Defragment is an expensive operation. User should avoid defragmenting multiple members
    47  	// at the same time.
    48  	// To defragment multiple members in the cluster, user need to call defragment multiple
    49  	// times with different endpoints.
    50  	Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
    51  
    52  	// Status gets the status of the endpoint.
    53  	Status(ctx context.Context, endpoint string) (*StatusResponse, error)
    54  
    55  	// HashKV returns a hash of the KV state at the time of the RPC.
    56  	// If revision is zero, the hash is computed on all keys. If the revision
    57  	// is non-zero, the hash is computed on all keys at or below the given revision.
    58  	HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
    59  
    60  	// Snapshot provides a reader for a point-in-time snapshot of etcd.
    61  	// If the context "ctx" is canceled or timed out, reading from returned
    62  	// "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
    63  	Snapshot(ctx context.Context) (io.ReadCloser, error)
    64  
    65  	// MoveLeader requests current leader to transfer its leadership to the transferee.
    66  	// Request must be made to the leader.
    67  	MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
    68  }
    69  
    70  type maintenance struct {
    71  	lg       *zap.Logger
    72  	dial     func(endpoint string) (pb.MaintenanceClient, func(), error)
    73  	remote   pb.MaintenanceClient
    74  	callOpts []grpc.CallOption
    75  }
    76  
    77  func NewMaintenance(c *Client) Maintenance {
    78  	api := &maintenance{
    79  		lg: c.lg,
    80  		dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
    81  			conn, err := c.Dial(endpoint)
    82  			if err != nil {
    83  				return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
    84  			}
    85  
    86  			//get token with established connection
    87  			dctx := c.ctx
    88  			cancel := func() {}
    89  			if c.cfg.DialTimeout > 0 {
    90  				dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
    91  			}
    92  			err = c.getToken(dctx)
    93  			cancel()
    94  			if err != nil {
    95  				conn.Close()
    96  				return nil, nil, fmt.Errorf("failed to getToken from endpoint %s with maintenance client: %v", endpoint, err)
    97  			}
    98  			cancel = func() { conn.Close() }
    99  			return RetryMaintenanceClient(c, conn), cancel, nil
   100  		},
   101  		remote: RetryMaintenanceClient(c, c.conn),
   102  	}
   103  	if c != nil {
   104  		api.callOpts = c.callOpts
   105  	}
   106  	return api
   107  }
   108  
   109  func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
   110  	api := &maintenance{
   111  		lg: c.lg,
   112  		dial: func(string) (pb.MaintenanceClient, func(), error) {
   113  			return remote, func() {}, nil
   114  		},
   115  		remote: remote,
   116  	}
   117  	if c != nil {
   118  		api.callOpts = c.callOpts
   119  	}
   120  	return api
   121  }
   122  
   123  func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
   124  	req := &pb.AlarmRequest{
   125  		Action:   pb.AlarmRequest_GET,
   126  		MemberID: 0,                 // all
   127  		Alarm:    pb.AlarmType_NONE, // all
   128  	}
   129  	resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
   130  	if err == nil {
   131  		return (*AlarmResponse)(resp), nil
   132  	}
   133  	return nil, toErr(ctx, err)
   134  }
   135  
   136  func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
   137  	req := &pb.AlarmRequest{
   138  		Action:   pb.AlarmRequest_DEACTIVATE,
   139  		MemberID: am.MemberID,
   140  		Alarm:    am.Alarm,
   141  	}
   142  
   143  	if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
   144  		ar, err := m.AlarmList(ctx)
   145  		if err != nil {
   146  			return nil, toErr(ctx, err)
   147  		}
   148  		ret := AlarmResponse{}
   149  		for _, am := range ar.Alarms {
   150  			dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
   151  			if derr != nil {
   152  				return nil, toErr(ctx, derr)
   153  			}
   154  			ret.Alarms = append(ret.Alarms, dresp.Alarms...)
   155  		}
   156  		return &ret, nil
   157  	}
   158  
   159  	resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
   160  	if err == nil {
   161  		return (*AlarmResponse)(resp), nil
   162  	}
   163  	return nil, toErr(ctx, err)
   164  }
   165  
   166  func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
   167  	remote, cancel, err := m.dial(endpoint)
   168  	if err != nil {
   169  		return nil, toErr(ctx, err)
   170  	}
   171  	defer cancel()
   172  	resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
   173  	if err != nil {
   174  		return nil, toErr(ctx, err)
   175  	}
   176  	return (*DefragmentResponse)(resp), nil
   177  }
   178  
   179  func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
   180  	remote, cancel, err := m.dial(endpoint)
   181  	if err != nil {
   182  		return nil, toErr(ctx, err)
   183  	}
   184  	defer cancel()
   185  	resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
   186  	if err != nil {
   187  		return nil, toErr(ctx, err)
   188  	}
   189  	return (*StatusResponse)(resp), nil
   190  }
   191  
   192  func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
   193  	remote, cancel, err := m.dial(endpoint)
   194  	if err != nil {
   195  
   196  		return nil, toErr(ctx, err)
   197  	}
   198  	defer cancel()
   199  	resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
   200  	if err != nil {
   201  		return nil, toErr(ctx, err)
   202  	}
   203  	return (*HashKVResponse)(resp), nil
   204  }
   205  
   206  func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
   207  	ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
   208  	if err != nil {
   209  		return nil, toErr(ctx, err)
   210  	}
   211  
   212  	m.lg.Info("opened snapshot stream; downloading")
   213  	pr, pw := io.Pipe()
   214  	go func() {
   215  		for {
   216  			resp, err := ss.Recv()
   217  			if err != nil {
   218  				switch err {
   219  				case io.EOF:
   220  					m.lg.Info("completed snapshot read; closing")
   221  				default:
   222  					m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
   223  				}
   224  				pw.CloseWithError(err)
   225  				return
   226  			}
   227  
   228  			// can "resp == nil && err == nil"
   229  			// before we receive snapshot SHA digest?
   230  			// No, server sends EOF with an empty response
   231  			// after it sends SHA digest at the end
   232  
   233  			if _, werr := pw.Write(resp.Blob); werr != nil {
   234  				pw.CloseWithError(werr)
   235  				return
   236  			}
   237  		}
   238  	}()
   239  	return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
   240  }
   241  
   242  type snapshotReadCloser struct {
   243  	ctx context.Context
   244  	io.ReadCloser
   245  }
   246  
   247  func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
   248  	n, err = rc.ReadCloser.Read(p)
   249  	return n, toErr(rc.ctx, err)
   250  }
   251  
   252  func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
   253  	resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
   254  	return (*MoveLeaderResponse)(resp), toErr(ctx, err)
   255  }
   256  

View as plain text