...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/v3_server.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"encoding/base64"
    21  	"encoding/binary"
    22  	"strconv"
    23  	"time"
    24  
    25  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    26  	"go.etcd.io/etcd/api/v3/membershippb"
    27  	"go.etcd.io/etcd/pkg/v3/traceutil"
    28  	"go.etcd.io/etcd/raft/v3"
    29  	"go.etcd.io/etcd/server/v3/auth"
    30  	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
    31  	"go.etcd.io/etcd/server/v3/lease"
    32  	"go.etcd.io/etcd/server/v3/lease/leasehttp"
    33  	"go.etcd.io/etcd/server/v3/mvcc"
    34  
    35  	"github.com/gogo/protobuf/proto"
    36  	"go.uber.org/zap"
    37  	"golang.org/x/crypto/bcrypt"
    38  )
    39  
    40  const (
    41  	// In the health case, there might be a small gap (10s of entries) between
    42  	// the applied index and committed index.
    43  	// However, if the committed entries are very heavy to apply, the gap might grow.
    44  	// We should stop accepting new proposals if the gap growing to a certain point.
    45  	maxGapBetweenApplyAndCommitIndex = 5000
    46  	traceThreshold                   = 100 * time.Millisecond
    47  	readIndexRetryTime               = 500 * time.Millisecond
    48  
    49  	// The timeout for the node to catch up its applied index, and is used in
    50  	// lease related operations, such as LeaseRenew and LeaseTimeToLive.
    51  	applyTimeout = time.Second
    52  )
    53  
    54  type RaftKV interface {
    55  	Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
    56  	Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
    57  	DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
    58  	Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
    59  	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
    60  }
    61  
    62  type Lessor interface {
    63  	// LeaseGrant sends LeaseGrant request to raft and apply it after committed.
    64  	LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
    65  	// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
    66  	LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
    67  
    68  	// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
    69  	// is returned.
    70  	LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
    71  
    72  	// LeaseTimeToLive retrieves lease information.
    73  	LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
    74  
    75  	// LeaseLeases lists all leases.
    76  	LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
    77  }
    78  
    79  type Authenticator interface {
    80  	AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
    81  	AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
    82  	AuthStatus(ctx context.Context, r *pb.AuthStatusRequest) (*pb.AuthStatusResponse, error)
    83  	Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
    84  	UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
    85  	UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
    86  	UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
    87  	UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
    88  	UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
    89  	UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
    90  	RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
    91  	RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
    92  	RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
    93  	RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
    94  	RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
    95  	UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
    96  	RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
    97  }
    98  
    99  func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
   100  	trace := traceutil.New("range",
   101  		s.Logger(),
   102  		traceutil.Field{Key: "range_begin", Value: string(r.Key)},
   103  		traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
   104  	)
   105  	ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
   106  
   107  	var resp *pb.RangeResponse
   108  	var err error
   109  	defer func(start time.Time) {
   110  		warnOfExpensiveReadOnlyRangeRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
   111  		if resp != nil {
   112  			trace.AddField(
   113  				traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
   114  				traceutil.Field{Key: "response_revision", Value: resp.Header.Revision},
   115  			)
   116  		}
   117  		trace.LogIfLong(traceThreshold)
   118  	}(time.Now())
   119  
   120  	if !r.Serializable {
   121  		err = s.linearizableReadNotify(ctx)
   122  		trace.Step("agreement among raft nodes before linearized reading")
   123  		if err != nil {
   124  			return nil, err
   125  		}
   126  	}
   127  	chk := func(ai *auth.AuthInfo) error {
   128  		return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
   129  	}
   130  
   131  	get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
   132  	if serr := s.doSerialize(ctx, chk, get); serr != nil {
   133  		err = serr
   134  		return nil, err
   135  	}
   136  	return resp, err
   137  }
   138  
   139  func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
   140  	ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
   141  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
   142  	if err != nil {
   143  		return nil, err
   144  	}
   145  	return resp.(*pb.PutResponse), nil
   146  }
   147  
   148  func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
   149  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
   150  	if err != nil {
   151  		return nil, err
   152  	}
   153  	return resp.(*pb.DeleteRangeResponse), nil
   154  }
   155  
   156  func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
   157  	if isTxnReadonly(r) {
   158  		trace := traceutil.New("transaction",
   159  			s.Logger(),
   160  			traceutil.Field{Key: "read_only", Value: true},
   161  		)
   162  		ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
   163  		if !isTxnSerializable(r) {
   164  			err := s.linearizableReadNotify(ctx)
   165  			trace.Step("agreement among raft nodes before linearized reading")
   166  			if err != nil {
   167  				return nil, err
   168  			}
   169  		}
   170  		var resp *pb.TxnResponse
   171  		var err error
   172  		chk := func(ai *auth.AuthInfo) error {
   173  			return checkTxnAuth(s.authStore, ai, r)
   174  		}
   175  
   176  		defer func(start time.Time) {
   177  			warnOfExpensiveReadOnlyTxnRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
   178  			trace.LogIfLong(traceThreshold)
   179  		}(time.Now())
   180  
   181  		get := func() { resp, _, err = s.applyV3Base.Txn(ctx, r) }
   182  		if serr := s.doSerialize(ctx, chk, get); serr != nil {
   183  			return nil, serr
   184  		}
   185  		return resp, err
   186  	}
   187  
   188  	ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
   189  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
   190  	if err != nil {
   191  		return nil, err
   192  	}
   193  	return resp.(*pb.TxnResponse), nil
   194  }
   195  
   196  func isTxnSerializable(r *pb.TxnRequest) bool {
   197  	for _, u := range r.Success {
   198  		if r := u.GetRequestRange(); r == nil || !r.Serializable {
   199  			return false
   200  		}
   201  	}
   202  	for _, u := range r.Failure {
   203  		if r := u.GetRequestRange(); r == nil || !r.Serializable {
   204  			return false
   205  		}
   206  	}
   207  	return true
   208  }
   209  
   210  func isTxnReadonly(r *pb.TxnRequest) bool {
   211  	for _, u := range r.Success {
   212  		if r := u.GetRequestRange(); r == nil {
   213  			return false
   214  		}
   215  	}
   216  	for _, u := range r.Failure {
   217  		if r := u.GetRequestRange(); r == nil {
   218  			return false
   219  		}
   220  	}
   221  	return true
   222  }
   223  
   224  func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
   225  	startTime := time.Now()
   226  	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
   227  	trace := traceutil.TODO()
   228  	if result != nil && result.trace != nil {
   229  		trace = result.trace
   230  		defer func() {
   231  			trace.LogIfLong(traceThreshold)
   232  		}()
   233  		applyStart := result.trace.GetStartTime()
   234  		result.trace.SetStartTime(startTime)
   235  		trace.InsertStep(0, applyStart, "process raft request")
   236  	}
   237  	if r.Physical && result != nil && result.physc != nil {
   238  		<-result.physc
   239  		// The compaction is done deleting keys; the hash is now settled
   240  		// but the data is not necessarily committed. If there's a crash,
   241  		// the hash may revert to a hash prior to compaction completing
   242  		// if the compaction resumes. Force the finished compaction to
   243  		// commit so it won't resume following a crash.
   244  		s.be.ForceCommit()
   245  		trace.Step("physically apply compaction")
   246  	}
   247  	if err != nil {
   248  		return nil, err
   249  	}
   250  	if result.err != nil {
   251  		return nil, result.err
   252  	}
   253  	resp := result.resp.(*pb.CompactionResponse)
   254  	if resp == nil {
   255  		resp = &pb.CompactionResponse{}
   256  	}
   257  	if resp.Header == nil {
   258  		resp.Header = &pb.ResponseHeader{}
   259  	}
   260  	resp.Header.Revision = s.kv.Rev()
   261  	trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
   262  	return resp, nil
   263  }
   264  
   265  func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
   266  	// no id given? choose one
   267  	for r.ID == int64(lease.NoLease) {
   268  		// only use positive int64 id's
   269  		r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
   270  	}
   271  	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
   272  	if err != nil {
   273  		return nil, err
   274  	}
   275  	return resp.(*pb.LeaseGrantResponse), nil
   276  }
   277  
   278  func (s *EtcdServer) waitAppliedIndex() error {
   279  	select {
   280  	case <-s.ApplyWait():
   281  	case <-s.stopping:
   282  		return ErrStopped
   283  	case <-time.After(applyTimeout):
   284  		return ErrTimeoutWaitAppliedIndex
   285  	}
   286  
   287  	return nil
   288  }
   289  
   290  func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
   291  	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
   292  	if err != nil {
   293  		return nil, err
   294  	}
   295  	return resp.(*pb.LeaseRevokeResponse), nil
   296  }
   297  
   298  func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
   299  	if s.isLeader() {
   300  		// If s.isLeader() returns true, but we fail to ensure the current
   301  		// member's leadership, there are a couple of possibilities:
   302  		//   1. current member gets stuck on writing WAL entries;
   303  		//   2. current member is in network isolation status;
   304  		//   3. current member isn't a leader anymore (possibly due to #1 above).
   305  		// In such case, we just return error to client, so that the client can
   306  		// switch to another member to continue the lease keep-alive operation.
   307  		if !s.ensureLeadership() {
   308  			return -1, lease.ErrNotPrimary
   309  		}
   310  
   311  		if err := s.waitAppliedIndex(); err != nil {
   312  			return 0, err
   313  		}
   314  
   315  		ttl, err := s.lessor.Renew(id)
   316  		if err == nil { // already requested to primary lessor(leader)
   317  			return ttl, nil
   318  		}
   319  		if err != lease.ErrNotPrimary {
   320  			return -1, err
   321  		}
   322  	}
   323  
   324  	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
   325  	defer cancel()
   326  
   327  	// renewals don't go through raft; forward to leader manually
   328  	for cctx.Err() == nil {
   329  		leader, lerr := s.waitLeader(cctx)
   330  		if lerr != nil {
   331  			return -1, lerr
   332  		}
   333  		for _, url := range leader.PeerURLs {
   334  			lurl := url + leasehttp.LeasePrefix
   335  			ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
   336  			if err == nil || err == lease.ErrLeaseNotFound {
   337  				return ttl, err
   338  			}
   339  		}
   340  		// Throttle in case of e.g. connection problems.
   341  		time.Sleep(50 * time.Millisecond)
   342  	}
   343  
   344  	if cctx.Err() == context.DeadlineExceeded {
   345  		return -1, ErrTimeout
   346  	}
   347  	return -1, ErrCanceled
   348  }
   349  
   350  func (s *EtcdServer) checkLeaseTimeToLive(ctx context.Context, leaseID lease.LeaseID) (uint64, error) {
   351  	rev := s.AuthStore().Revision()
   352  	if !s.AuthStore().IsAuthEnabled() {
   353  		return rev, nil
   354  	}
   355  	authInfo, err := s.AuthInfoFromCtx(ctx)
   356  	if err != nil {
   357  		return rev, err
   358  	}
   359  	if authInfo == nil {
   360  		return rev, auth.ErrUserEmpty
   361  	}
   362  
   363  	l := s.lessor.Lookup(leaseID)
   364  	if l != nil {
   365  		for _, key := range l.Keys() {
   366  			if err := s.AuthStore().IsRangePermitted(authInfo, []byte(key), []byte{}); err != nil {
   367  				return 0, err
   368  			}
   369  		}
   370  	}
   371  
   372  	return rev, nil
   373  }
   374  
   375  func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
   376  	if s.isLeader() {
   377  		if err := s.waitAppliedIndex(); err != nil {
   378  			return nil, err
   379  		}
   380  		// primary; timetolive directly from leader
   381  		le := s.lessor.Lookup(lease.LeaseID(r.ID))
   382  		if le == nil {
   383  			return nil, lease.ErrLeaseNotFound
   384  		}
   385  		// TODO: fill out ResponseHeader
   386  		resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
   387  		if r.Keys {
   388  			ks := le.Keys()
   389  			kbs := make([][]byte, len(ks))
   390  			for i := range ks {
   391  				kbs[i] = []byte(ks[i])
   392  			}
   393  			resp.Keys = kbs
   394  		}
   395  		return resp, nil
   396  	}
   397  
   398  	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
   399  	defer cancel()
   400  
   401  	// forward to leader
   402  	for cctx.Err() == nil {
   403  		leader, err := s.waitLeader(cctx)
   404  		if err != nil {
   405  			return nil, err
   406  		}
   407  		for _, url := range leader.PeerURLs {
   408  			lurl := url + leasehttp.LeaseInternalPrefix
   409  			resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
   410  			if err == nil {
   411  				return resp.LeaseTimeToLiveResponse, nil
   412  			}
   413  			if err == lease.ErrLeaseNotFound {
   414  				return nil, err
   415  			}
   416  		}
   417  	}
   418  
   419  	if cctx.Err() == context.DeadlineExceeded {
   420  		return nil, ErrTimeout
   421  	}
   422  	return nil, ErrCanceled
   423  }
   424  
   425  func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
   426  	var rev uint64
   427  	var err error
   428  	if r.Keys {
   429  		// check RBAC permission only if Keys is true
   430  		rev, err = s.checkLeaseTimeToLive(ctx, lease.LeaseID(r.ID))
   431  		if err != nil {
   432  			return nil, err
   433  		}
   434  	}
   435  
   436  	resp, err := s.leaseTimeToLive(ctx, r)
   437  	if err != nil {
   438  		return nil, err
   439  	}
   440  
   441  	if r.Keys {
   442  		if s.AuthStore().IsAuthEnabled() && rev != s.AuthStore().Revision() {
   443  			return nil, auth.ErrAuthOldRevision
   444  		}
   445  	}
   446  	return resp, nil
   447  }
   448  
   449  // LeaseLeases is really ListLeases !???
   450  func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
   451  	ls := s.lessor.Leases()
   452  	lss := make([]*pb.LeaseStatus, len(ls))
   453  	for i := range ls {
   454  		lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
   455  	}
   456  	return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
   457  }
   458  
   459  func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
   460  	leader := s.cluster.Member(s.Leader())
   461  	for leader == nil {
   462  		// wait an election
   463  		dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
   464  		select {
   465  		case <-time.After(dur):
   466  			leader = s.cluster.Member(s.Leader())
   467  		case <-s.stopping:
   468  			return nil, ErrStopped
   469  		case <-ctx.Done():
   470  			return nil, ErrNoLeader
   471  		}
   472  	}
   473  	if leader == nil || len(leader.PeerURLs) == 0 {
   474  		return nil, ErrNoLeader
   475  	}
   476  	return leader, nil
   477  }
   478  
   479  func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
   480  	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
   481  	if err != nil {
   482  		return nil, err
   483  	}
   484  	return resp.(*pb.AlarmResponse), nil
   485  }
   486  
   487  func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
   488  	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
   489  	if err != nil {
   490  		return nil, err
   491  	}
   492  	return resp.(*pb.AuthEnableResponse), nil
   493  }
   494  
   495  func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
   496  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
   497  	if err != nil {
   498  		return nil, err
   499  	}
   500  	return resp.(*pb.AuthDisableResponse), nil
   501  }
   502  
   503  func (s *EtcdServer) AuthStatus(ctx context.Context, r *pb.AuthStatusRequest) (*pb.AuthStatusResponse, error) {
   504  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthStatus: r})
   505  	if err != nil {
   506  		return nil, err
   507  	}
   508  	return resp.(*pb.AuthStatusResponse), nil
   509  }
   510  
   511  func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
   512  	if err := s.linearizableReadNotify(ctx); err != nil {
   513  		return nil, err
   514  	}
   515  
   516  	lg := s.Logger()
   517  
   518  	// fix https://nvd.nist.gov/vuln/detail/CVE-2021-28235
   519  	defer func() {
   520  		if r != nil {
   521  			r.Password = ""
   522  		}
   523  	}()
   524  
   525  	var resp proto.Message
   526  	for {
   527  		checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
   528  		if err != nil {
   529  			if err != auth.ErrAuthNotEnabled {
   530  				lg.Warn(
   531  					"invalid authentication was requested",
   532  					zap.String("user", r.Name),
   533  					zap.Error(err),
   534  				)
   535  			}
   536  			return nil, err
   537  		}
   538  
   539  		st, err := s.AuthStore().GenTokenPrefix()
   540  		if err != nil {
   541  			return nil, err
   542  		}
   543  
   544  		// internalReq doesn't need to have Password because the above s.AuthStore().CheckPassword() already did it.
   545  		// In addition, it will let a WAL entry not record password as a plain text.
   546  		internalReq := &pb.InternalAuthenticateRequest{
   547  			Name:        r.Name,
   548  			SimpleToken: st,
   549  		}
   550  
   551  		resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
   552  		if err != nil {
   553  			return nil, err
   554  		}
   555  		if checkedRevision == s.AuthStore().Revision() {
   556  			break
   557  		}
   558  
   559  		lg.Info("revision when password checked became stale; retrying")
   560  	}
   561  
   562  	return resp.(*pb.AuthenticateResponse), nil
   563  }
   564  
   565  func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
   566  	if r.Options == nil || !r.Options.NoPassword {
   567  		hashedPassword, err := bcrypt.GenerateFromPassword([]byte(r.Password), s.authStore.BcryptCost())
   568  		if err != nil {
   569  			return nil, err
   570  		}
   571  		r.HashedPassword = base64.StdEncoding.EncodeToString(hashedPassword)
   572  		r.Password = ""
   573  	}
   574  
   575  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
   576  	if err != nil {
   577  		return nil, err
   578  	}
   579  	return resp.(*pb.AuthUserAddResponse), nil
   580  }
   581  
   582  func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
   583  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
   584  	if err != nil {
   585  		return nil, err
   586  	}
   587  	return resp.(*pb.AuthUserDeleteResponse), nil
   588  }
   589  
   590  func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
   591  	if r.Password != "" {
   592  		hashedPassword, err := bcrypt.GenerateFromPassword([]byte(r.Password), s.authStore.BcryptCost())
   593  		if err != nil {
   594  			return nil, err
   595  		}
   596  		r.HashedPassword = base64.StdEncoding.EncodeToString(hashedPassword)
   597  		r.Password = ""
   598  	}
   599  
   600  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
   601  	if err != nil {
   602  		return nil, err
   603  	}
   604  	return resp.(*pb.AuthUserChangePasswordResponse), nil
   605  }
   606  
   607  func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
   608  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
   609  	if err != nil {
   610  		return nil, err
   611  	}
   612  	return resp.(*pb.AuthUserGrantRoleResponse), nil
   613  }
   614  
   615  func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
   616  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
   617  	if err != nil {
   618  		return nil, err
   619  	}
   620  	return resp.(*pb.AuthUserGetResponse), nil
   621  }
   622  
   623  func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
   624  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
   625  	if err != nil {
   626  		return nil, err
   627  	}
   628  	return resp.(*pb.AuthUserListResponse), nil
   629  }
   630  
   631  func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
   632  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
   633  	if err != nil {
   634  		return nil, err
   635  	}
   636  	return resp.(*pb.AuthUserRevokeRoleResponse), nil
   637  }
   638  
   639  func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
   640  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
   641  	if err != nil {
   642  		return nil, err
   643  	}
   644  	return resp.(*pb.AuthRoleAddResponse), nil
   645  }
   646  
   647  func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
   648  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
   649  	if err != nil {
   650  		return nil, err
   651  	}
   652  	return resp.(*pb.AuthRoleGrantPermissionResponse), nil
   653  }
   654  
   655  func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
   656  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
   657  	if err != nil {
   658  		return nil, err
   659  	}
   660  	return resp.(*pb.AuthRoleGetResponse), nil
   661  }
   662  
   663  func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
   664  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
   665  	if err != nil {
   666  		return nil, err
   667  	}
   668  	return resp.(*pb.AuthRoleListResponse), nil
   669  }
   670  
   671  func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
   672  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
   673  	if err != nil {
   674  		return nil, err
   675  	}
   676  	return resp.(*pb.AuthRoleRevokePermissionResponse), nil
   677  }
   678  
   679  func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
   680  	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
   681  	if err != nil {
   682  		return nil, err
   683  	}
   684  	return resp.(*pb.AuthRoleDeleteResponse), nil
   685  }
   686  
   687  func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
   688  	result, err := s.processInternalRaftRequestOnce(ctx, r)
   689  	if err != nil {
   690  		return nil, err
   691  	}
   692  	if result.err != nil {
   693  		return nil, result.err
   694  	}
   695  	if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil {
   696  		applyStart := result.trace.GetStartTime()
   697  		// The trace object is created in apply. Here reset the start time to trace
   698  		// the raft request time by the difference between the request start time
   699  		// and apply start time
   700  		result.trace.SetStartTime(startTime)
   701  		result.trace.InsertStep(0, applyStart, "process raft request")
   702  		result.trace.LogIfLong(traceThreshold)
   703  	}
   704  	return result.resp, nil
   705  }
   706  
   707  func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
   708  	return s.raftRequestOnce(ctx, r)
   709  }
   710  
   711  // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
   712  func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
   713  	trace := traceutil.Get(ctx)
   714  	ai, err := s.AuthInfoFromCtx(ctx)
   715  	if err != nil {
   716  		return err
   717  	}
   718  	if ai == nil {
   719  		// chk expects non-nil AuthInfo; use empty credentials
   720  		ai = &auth.AuthInfo{}
   721  	}
   722  	if err = chk(ai); err != nil {
   723  		return err
   724  	}
   725  	trace.Step("get authentication metadata")
   726  	// fetch response for serialized request
   727  	get()
   728  	// check for stale token revision in case the auth store was updated while
   729  	// the request has been handled.
   730  	if ai.Revision != 0 && ai.Revision != s.authStore.Revision() {
   731  		return auth.ErrAuthOldRevision
   732  	}
   733  	return nil
   734  }
   735  
   736  func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
   737  	ai := s.getAppliedIndex()
   738  	ci := s.getCommittedIndex()
   739  	if ci > ai+maxGapBetweenApplyAndCommitIndex {
   740  		return nil, ErrTooManyRequests
   741  	}
   742  
   743  	r.Header = &pb.RequestHeader{
   744  		ID: s.reqIDGen.Next(),
   745  	}
   746  
   747  	// check authinfo if it is not InternalAuthenticateRequest
   748  	if r.Authenticate == nil {
   749  		authInfo, err := s.AuthInfoFromCtx(ctx)
   750  		if err != nil {
   751  			return nil, err
   752  		}
   753  		if authInfo != nil {
   754  			r.Header.Username = authInfo.Username
   755  			r.Header.AuthRevision = authInfo.Revision
   756  		}
   757  	}
   758  
   759  	data, err := r.Marshal()
   760  	if err != nil {
   761  		return nil, err
   762  	}
   763  
   764  	if len(data) > int(s.Cfg.MaxRequestBytes) {
   765  		return nil, ErrRequestTooLarge
   766  	}
   767  
   768  	id := r.ID
   769  	if id == 0 {
   770  		id = r.Header.ID
   771  	}
   772  	ch := s.w.Register(id)
   773  
   774  	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
   775  	defer cancel()
   776  
   777  	start := time.Now()
   778  	err = s.r.Propose(cctx, data)
   779  	if err != nil {
   780  		proposalsFailed.Inc()
   781  		s.w.Trigger(id, nil) // GC wait
   782  		return nil, err
   783  	}
   784  	proposalsPending.Inc()
   785  	defer proposalsPending.Dec()
   786  
   787  	select {
   788  	case x := <-ch:
   789  		return x.(*applyResult), nil
   790  	case <-cctx.Done():
   791  		proposalsFailed.Inc()
   792  		s.w.Trigger(id, nil) // GC wait
   793  		return nil, s.parseProposeCtxErr(cctx.Err(), start)
   794  	case <-s.done:
   795  		return nil, ErrStopped
   796  	}
   797  }
   798  
   799  // Watchable returns a watchable interface attached to the etcdserver.
   800  func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
   801  
   802  func (s *EtcdServer) linearizableReadLoop() {
   803  	for {
   804  		requestId := s.reqIDGen.Next()
   805  		leaderChangedNotifier := s.LeaderChangedNotify()
   806  		select {
   807  		case <-leaderChangedNotifier:
   808  			continue
   809  		case <-s.readwaitc:
   810  		case <-s.stopping:
   811  			return
   812  		}
   813  
   814  		// as a single loop is can unlock multiple reads, it is not very useful
   815  		// to propagate the trace from Txn or Range.
   816  		trace := traceutil.New("linearizableReadLoop", s.Logger())
   817  
   818  		nextnr := newNotifier()
   819  		s.readMu.Lock()
   820  		nr := s.readNotifier
   821  		s.readNotifier = nextnr
   822  		s.readMu.Unlock()
   823  
   824  		confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
   825  		if isStopped(err) {
   826  			return
   827  		}
   828  		if err != nil {
   829  			nr.notify(err)
   830  			continue
   831  		}
   832  
   833  		trace.Step("read index received")
   834  
   835  		trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})
   836  
   837  		appliedIndex := s.getAppliedIndex()
   838  		trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})
   839  
   840  		if appliedIndex < confirmedIndex {
   841  			select {
   842  			case <-s.applyWait.Wait(confirmedIndex):
   843  			case <-s.stopping:
   844  				return
   845  			}
   846  		}
   847  		// unblock all l-reads requested at indices before confirmedIndex
   848  		nr.notify(nil)
   849  		trace.Step("applied index is now lower than readState.Index")
   850  
   851  		trace.LogAllStepsIfLong(traceThreshold)
   852  	}
   853  }
   854  
   855  func isStopped(err error) bool {
   856  	return err == raft.ErrStopped || err == ErrStopped
   857  }
   858  
   859  func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
   860  	err := s.sendReadIndex(requestId)
   861  	if err != nil {
   862  		return 0, err
   863  	}
   864  
   865  	lg := s.Logger()
   866  	errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
   867  	defer errorTimer.Stop()
   868  	retryTimer := time.NewTimer(readIndexRetryTime)
   869  	defer retryTimer.Stop()
   870  
   871  	firstCommitInTermNotifier := s.FirstCommitInTermNotify()
   872  
   873  	for {
   874  		select {
   875  		case rs := <-s.r.readStateC:
   876  			requestIdBytes := uint64ToBigEndianBytes(requestId)
   877  			gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
   878  			if !gotOwnResponse {
   879  				// a previous request might time out. now we should ignore the response of it and
   880  				// continue waiting for the response of the current requests.
   881  				responseId := uint64(0)
   882  				if len(rs.RequestCtx) == 8 {
   883  					responseId = binary.BigEndian.Uint64(rs.RequestCtx)
   884  				}
   885  				lg.Warn(
   886  					"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
   887  					zap.Uint64("sent-request-id", requestId),
   888  					zap.Uint64("received-request-id", responseId),
   889  				)
   890  				slowReadIndex.Inc()
   891  				continue
   892  			}
   893  			return rs.Index, nil
   894  		case <-leaderChangedNotifier:
   895  			readIndexFailed.Inc()
   896  			// return a retryable error.
   897  			return 0, ErrLeaderChanged
   898  		case <-firstCommitInTermNotifier:
   899  			firstCommitInTermNotifier = s.FirstCommitInTermNotify()
   900  			lg.Info("first commit in current term: resending ReadIndex request")
   901  			err := s.sendReadIndex(requestId)
   902  			if err != nil {
   903  				return 0, err
   904  			}
   905  			retryTimer.Reset(readIndexRetryTime)
   906  			continue
   907  		case <-retryTimer.C:
   908  			lg.Warn(
   909  				"waiting for ReadIndex response took too long, retrying",
   910  				zap.Uint64("sent-request-id", requestId),
   911  				zap.Duration("retry-timeout", readIndexRetryTime),
   912  			)
   913  			err := s.sendReadIndex(requestId)
   914  			if err != nil {
   915  				return 0, err
   916  			}
   917  			retryTimer.Reset(readIndexRetryTime)
   918  			continue
   919  		case <-errorTimer.C:
   920  			lg.Warn(
   921  				"timed out waiting for read index response (local node might have slow network)",
   922  				zap.Duration("timeout", s.Cfg.ReqTimeout()),
   923  			)
   924  			slowReadIndex.Inc()
   925  			return 0, ErrTimeout
   926  		case <-s.stopping:
   927  			return 0, ErrStopped
   928  		}
   929  	}
   930  }
   931  
   932  func uint64ToBigEndianBytes(number uint64) []byte {
   933  	byteResult := make([]byte, 8)
   934  	binary.BigEndian.PutUint64(byteResult, number)
   935  	return byteResult
   936  }
   937  
   938  func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
   939  	ctxToSend := uint64ToBigEndianBytes(requestIndex)
   940  
   941  	cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
   942  	err := s.r.ReadIndex(cctx, ctxToSend)
   943  	cancel()
   944  	if err == raft.ErrStopped {
   945  		return err
   946  	}
   947  	if err != nil {
   948  		lg := s.Logger()
   949  		lg.Warn("failed to get read index from Raft", zap.Error(err))
   950  		readIndexFailed.Inc()
   951  		return err
   952  	}
   953  	return nil
   954  }
   955  
   956  func (s *EtcdServer) LinearizableReadNotify(ctx context.Context) error {
   957  	return s.linearizableReadNotify(ctx)
   958  }
   959  
   960  func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
   961  	s.readMu.RLock()
   962  	nc := s.readNotifier
   963  	s.readMu.RUnlock()
   964  
   965  	// signal linearizable loop for current notify if it hasn't been already
   966  	select {
   967  	case s.readwaitc <- struct{}{}:
   968  	default:
   969  	}
   970  
   971  	// wait for read state notification
   972  	select {
   973  	case <-nc.c:
   974  		return nc.err
   975  	case <-ctx.Done():
   976  		return ctx.Err()
   977  	case <-s.done:
   978  		return ErrStopped
   979  	}
   980  }
   981  
   982  func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
   983  	authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
   984  	if authInfo != nil || err != nil {
   985  		return authInfo, err
   986  	}
   987  	if !s.Cfg.ClientCertAuthEnabled {
   988  		return nil, nil
   989  	}
   990  	authInfo = s.AuthStore().AuthInfoFromTLS(ctx)
   991  	return authInfo, nil
   992  }
   993  
   994  func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
   995  	switch r.Action {
   996  	case pb.DowngradeRequest_VALIDATE:
   997  		return s.downgradeValidate(ctx, r.Version)
   998  	case pb.DowngradeRequest_ENABLE:
   999  		return s.downgradeEnable(ctx, r)
  1000  	case pb.DowngradeRequest_CANCEL:
  1001  		return s.downgradeCancel(ctx)
  1002  	default:
  1003  		return nil, ErrUnknownMethod
  1004  	}
  1005  }
  1006  
  1007  func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) {
  1008  	resp := &pb.DowngradeResponse{}
  1009  
  1010  	targetVersion, err := convertToClusterVersion(v)
  1011  	if err != nil {
  1012  		return nil, err
  1013  	}
  1014  
  1015  	// gets leaders commit index and wait for local store to finish applying that index
  1016  	// to avoid using stale downgrade information
  1017  	err = s.linearizableReadNotify(ctx)
  1018  	if err != nil {
  1019  		return nil, err
  1020  	}
  1021  
  1022  	cv := s.ClusterVersion()
  1023  	if cv == nil {
  1024  		return nil, ErrClusterVersionUnavailable
  1025  	}
  1026  	resp.Version = cv.String()
  1027  
  1028  	allowedTargetVersion := membership.AllowedDowngradeVersion(cv)
  1029  	if !targetVersion.Equal(*allowedTargetVersion) {
  1030  		return nil, ErrInvalidDowngradeTargetVersion
  1031  	}
  1032  
  1033  	downgradeInfo := s.cluster.DowngradeInfo()
  1034  	if downgradeInfo.Enabled {
  1035  		// Todo: return the downgrade status along with the error msg
  1036  		return nil, ErrDowngradeInProcess
  1037  	}
  1038  	return resp, nil
  1039  }
  1040  
  1041  func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
  1042  	// validate downgrade capability before starting downgrade
  1043  	v := r.Version
  1044  	lg := s.Logger()
  1045  	if resp, err := s.downgradeValidate(ctx, v); err != nil {
  1046  		lg.Warn("reject downgrade request", zap.Error(err))
  1047  		return resp, err
  1048  	}
  1049  	targetVersion, err := convertToClusterVersion(v)
  1050  	if err != nil {
  1051  		lg.Warn("reject downgrade request", zap.Error(err))
  1052  		return nil, err
  1053  	}
  1054  
  1055  	raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
  1056  	_, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
  1057  	if err != nil {
  1058  		lg.Warn("reject downgrade request", zap.Error(err))
  1059  		return nil, err
  1060  	}
  1061  	resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
  1062  	return &resp, nil
  1063  }
  1064  
  1065  func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
  1066  	// gets leaders commit index and wait for local store to finish applying that index
  1067  	// to avoid using stale downgrade information
  1068  	if err := s.linearizableReadNotify(ctx); err != nil {
  1069  		return nil, err
  1070  	}
  1071  
  1072  	downgradeInfo := s.cluster.DowngradeInfo()
  1073  	if !downgradeInfo.Enabled {
  1074  		return nil, ErrNoInflightDowngrade
  1075  	}
  1076  
  1077  	raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
  1078  	_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
  1079  	if err != nil {
  1080  		return nil, err
  1081  	}
  1082  	resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
  1083  	return &resp, nil
  1084  }
  1085  

View as plain text