...

Source file src/go.etcd.io/etcd/client/v3/lease.go

Documentation: go.etcd.io/etcd/client/v3

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package clientv3
    16  
    17  import (
    18  	"context"
    19  	"sync"
    20  	"time"
    21  
    22  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    23  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    24  
    25  	"go.uber.org/zap"
    26  	"google.golang.org/grpc"
    27  	"google.golang.org/grpc/metadata"
    28  )
    29  
    30  type (
    31  	LeaseRevokeResponse pb.LeaseRevokeResponse
    32  	LeaseID             int64
    33  )
    34  
    35  // LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
    36  type LeaseGrantResponse struct {
    37  	*pb.ResponseHeader
    38  	ID    LeaseID
    39  	TTL   int64
    40  	Error string
    41  }
    42  
    43  // LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
    44  type LeaseKeepAliveResponse struct {
    45  	*pb.ResponseHeader
    46  	ID  LeaseID
    47  	TTL int64
    48  }
    49  
    50  // LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
    51  type LeaseTimeToLiveResponse struct {
    52  	*pb.ResponseHeader
    53  	ID LeaseID `json:"id"`
    54  
    55  	// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
    56  	TTL int64 `json:"ttl"`
    57  
    58  	// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
    59  	GrantedTTL int64 `json:"granted-ttl"`
    60  
    61  	// Keys is the list of keys attached to this lease.
    62  	Keys [][]byte `json:"keys"`
    63  }
    64  
    65  // LeaseStatus represents a lease status.
    66  type LeaseStatus struct {
    67  	ID LeaseID `json:"id"`
    68  	// TODO: TTL int64
    69  }
    70  
    71  // LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
    72  type LeaseLeasesResponse struct {
    73  	*pb.ResponseHeader
    74  	Leases []LeaseStatus `json:"leases"`
    75  }
    76  
    77  const (
    78  	// defaultTTL is the assumed lease TTL used for the first keepalive
    79  	// deadline before the actual TTL is known to the client.
    80  	defaultTTL = 5 * time.Second
    81  	// NoLease is a lease ID for the absence of a lease.
    82  	NoLease LeaseID = 0
    83  
    84  	// retryConnWait is how long to wait before retrying request due to an error
    85  	retryConnWait = 500 * time.Millisecond
    86  )
    87  
    88  // LeaseResponseChSize is the size of buffer to store unsent lease responses.
    89  // WARNING: DO NOT UPDATE.
    90  // Only for testing purposes.
    91  var LeaseResponseChSize = 16
    92  
    93  // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
    94  //
    95  // This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
    96  type ErrKeepAliveHalted struct {
    97  	Reason error
    98  }
    99  
   100  func (e ErrKeepAliveHalted) Error() string {
   101  	s := "etcdclient: leases keep alive halted"
   102  	if e.Reason != nil {
   103  		s += ": " + e.Reason.Error()
   104  	}
   105  	return s
   106  }
   107  
   108  type Lease interface {
   109  	// Grant creates a new lease.
   110  	Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
   111  
   112  	// Revoke revokes the given lease.
   113  	Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
   114  
   115  	// TimeToLive retrieves the lease information of the given lease ID.
   116  	TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
   117  
   118  	// Leases retrieves all leases.
   119  	Leases(ctx context.Context) (*LeaseLeasesResponse, error)
   120  
   121  	// KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
   122  	// to the channel are not consumed promptly the channel may become full. When full, the lease
   123  	// client will continue sending keep alive requests to the etcd server, but will drop responses
   124  	// until there is capacity on the channel to send more responses.
   125  	//
   126  	// If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
   127  	// canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
   128  	// containing the error reason.
   129  	//
   130  	// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
   131  	// alive stream is interrupted in some way the client cannot handle itself;
   132  	// given context "ctx" is canceled or timed out.
   133  	//
   134  	// TODO(v4.0): post errors to last keep alive message before closing
   135  	// (see https://github.com/etcd-io/etcd/pull/7866)
   136  	KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
   137  
   138  	// KeepAliveOnce renews the lease once. The response corresponds to the
   139  	// first message from calling KeepAlive. If the response has a recoverable
   140  	// error, KeepAliveOnce will retry the RPC with a new keep alive message.
   141  	//
   142  	// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
   143  	KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
   144  
   145  	// Close releases all resources Lease keeps for efficient communication
   146  	// with the etcd server.
   147  	Close() error
   148  }
   149  
   150  type lessor struct {
   151  	mu sync.Mutex // guards all fields
   152  
   153  	// donec is closed and loopErr is set when recvKeepAliveLoop stops
   154  	donec   chan struct{}
   155  	loopErr error
   156  
   157  	remote pb.LeaseClient
   158  
   159  	stream       pb.Lease_LeaseKeepAliveClient
   160  	streamCancel context.CancelFunc
   161  
   162  	stopCtx    context.Context
   163  	stopCancel context.CancelFunc
   164  
   165  	keepAlives map[LeaseID]*keepAlive
   166  
   167  	// firstKeepAliveTimeout is the timeout for the first keepalive request
   168  	// before the actual TTL is known to the lease client
   169  	firstKeepAliveTimeout time.Duration
   170  
   171  	// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
   172  	firstKeepAliveOnce sync.Once
   173  
   174  	callOpts []grpc.CallOption
   175  
   176  	lg *zap.Logger
   177  }
   178  
   179  // keepAlive multiplexes a keepalive for a lease over multiple channels
   180  type keepAlive struct {
   181  	chs  []chan<- *LeaseKeepAliveResponse
   182  	ctxs []context.Context
   183  	// deadline is the time the keep alive channels close if no response
   184  	deadline time.Time
   185  	// nextKeepAlive is when to send the next keep alive message
   186  	nextKeepAlive time.Time
   187  	// donec is closed on lease revoke, expiration, or cancel.
   188  	donec chan struct{}
   189  }
   190  
   191  func NewLease(c *Client) Lease {
   192  	return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
   193  }
   194  
   195  func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
   196  	l := &lessor{
   197  		donec:                 make(chan struct{}),
   198  		keepAlives:            make(map[LeaseID]*keepAlive),
   199  		remote:                remote,
   200  		firstKeepAliveTimeout: keepAliveTimeout,
   201  		lg:                    c.lg,
   202  	}
   203  	if l.firstKeepAliveTimeout == time.Second {
   204  		l.firstKeepAliveTimeout = defaultTTL
   205  	}
   206  	if c != nil {
   207  		l.callOpts = c.callOpts
   208  	}
   209  	reqLeaderCtx := WithRequireLeader(context.Background())
   210  	l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
   211  	return l
   212  }
   213  
   214  func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
   215  	r := &pb.LeaseGrantRequest{TTL: ttl}
   216  	resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
   217  	if err == nil {
   218  		gresp := &LeaseGrantResponse{
   219  			ResponseHeader: resp.GetHeader(),
   220  			ID:             LeaseID(resp.ID),
   221  			TTL:            resp.TTL,
   222  			Error:          resp.Error,
   223  		}
   224  		return gresp, nil
   225  	}
   226  	return nil, toErr(ctx, err)
   227  }
   228  
   229  func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
   230  	r := &pb.LeaseRevokeRequest{ID: int64(id)}
   231  	resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
   232  	if err == nil {
   233  		return (*LeaseRevokeResponse)(resp), nil
   234  	}
   235  	return nil, toErr(ctx, err)
   236  }
   237  
   238  func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
   239  	r := toLeaseTimeToLiveRequest(id, opts...)
   240  	resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
   241  	if err != nil {
   242  		return nil, toErr(ctx, err)
   243  	}
   244  	gresp := &LeaseTimeToLiveResponse{
   245  		ResponseHeader: resp.GetHeader(),
   246  		ID:             LeaseID(resp.ID),
   247  		TTL:            resp.TTL,
   248  		GrantedTTL:     resp.GrantedTTL,
   249  		Keys:           resp.Keys,
   250  	}
   251  	return gresp, nil
   252  }
   253  
   254  func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
   255  	resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
   256  	if err == nil {
   257  		leases := make([]LeaseStatus, len(resp.Leases))
   258  		for i := range resp.Leases {
   259  			leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
   260  		}
   261  		return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
   262  	}
   263  	return nil, toErr(ctx, err)
   264  }
   265  
   266  func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
   267  	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
   268  
   269  	l.mu.Lock()
   270  	// ensure that recvKeepAliveLoop is still running
   271  	select {
   272  	case <-l.donec:
   273  		err := l.loopErr
   274  		l.mu.Unlock()
   275  		close(ch)
   276  		return ch, ErrKeepAliveHalted{Reason: err}
   277  	default:
   278  	}
   279  	ka, ok := l.keepAlives[id]
   280  	if !ok {
   281  		// create fresh keep alive
   282  		ka = &keepAlive{
   283  			chs:           []chan<- *LeaseKeepAliveResponse{ch},
   284  			ctxs:          []context.Context{ctx},
   285  			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
   286  			nextKeepAlive: time.Now(),
   287  			donec:         make(chan struct{}),
   288  		}
   289  		l.keepAlives[id] = ka
   290  	} else {
   291  		// add channel and context to existing keep alive
   292  		ka.ctxs = append(ka.ctxs, ctx)
   293  		ka.chs = append(ka.chs, ch)
   294  	}
   295  	l.mu.Unlock()
   296  
   297  	if ctx.Done() != nil {
   298  		go l.keepAliveCtxCloser(ctx, id, ka.donec)
   299  	}
   300  	l.firstKeepAliveOnce.Do(func() {
   301  		go l.recvKeepAliveLoop()
   302  		go l.deadlineLoop()
   303  	})
   304  
   305  	return ch, nil
   306  }
   307  
   308  func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
   309  	for {
   310  		resp, err := l.keepAliveOnce(ctx, id)
   311  		if err == nil {
   312  			if resp.TTL <= 0 {
   313  				err = rpctypes.ErrLeaseNotFound
   314  			}
   315  			return resp, err
   316  		}
   317  		if isHaltErr(ctx, err) {
   318  			return nil, toErr(ctx, err)
   319  		}
   320  	}
   321  }
   322  
   323  func (l *lessor) Close() error {
   324  	l.stopCancel()
   325  	// close for synchronous teardown if stream goroutines never launched
   326  	l.firstKeepAliveOnce.Do(func() { close(l.donec) })
   327  	<-l.donec
   328  	return nil
   329  }
   330  
   331  func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
   332  	select {
   333  	case <-donec:
   334  		return
   335  	case <-l.donec:
   336  		return
   337  	case <-ctx.Done():
   338  	}
   339  
   340  	l.mu.Lock()
   341  	defer l.mu.Unlock()
   342  
   343  	ka, ok := l.keepAlives[id]
   344  	if !ok {
   345  		return
   346  	}
   347  
   348  	// close channel and remove context if still associated with keep alive
   349  	for i, c := range ka.ctxs {
   350  		if c == ctx {
   351  			close(ka.chs[i])
   352  			ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
   353  			ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
   354  			break
   355  		}
   356  	}
   357  	// remove if no one more listeners
   358  	if len(ka.chs) == 0 {
   359  		delete(l.keepAlives, id)
   360  	}
   361  }
   362  
   363  // closeRequireLeader scans keepAlives for ctxs that have require leader
   364  // and closes the associated channels.
   365  func (l *lessor) closeRequireLeader() {
   366  	l.mu.Lock()
   367  	defer l.mu.Unlock()
   368  	for _, ka := range l.keepAlives {
   369  		reqIdxs := 0
   370  		// find all required leader channels, close, mark as nil
   371  		for i, ctx := range ka.ctxs {
   372  			md, ok := metadata.FromOutgoingContext(ctx)
   373  			if !ok {
   374  				continue
   375  			}
   376  			ks := md[rpctypes.MetadataRequireLeaderKey]
   377  			if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
   378  				continue
   379  			}
   380  			close(ka.chs[i])
   381  			ka.chs[i] = nil
   382  			reqIdxs++
   383  		}
   384  		if reqIdxs == 0 {
   385  			continue
   386  		}
   387  		// remove all channels that required a leader from keepalive
   388  		newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
   389  		newCtxs := make([]context.Context, len(newChs))
   390  		newIdx := 0
   391  		for i := range ka.chs {
   392  			if ka.chs[i] == nil {
   393  				continue
   394  			}
   395  			newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
   396  			newIdx++
   397  		}
   398  		ka.chs, ka.ctxs = newChs, newCtxs
   399  	}
   400  }
   401  
   402  func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKeepAliveResponse, ferr error) {
   403  	cctx, cancel := context.WithCancel(ctx)
   404  	defer cancel()
   405  
   406  	stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
   407  	if err != nil {
   408  		return nil, toErr(ctx, err)
   409  	}
   410  
   411  	defer func() {
   412  		if err := stream.CloseSend(); err != nil {
   413  			if ferr == nil {
   414  				ferr = toErr(ctx, err)
   415  			}
   416  			return
   417  		}
   418  	}()
   419  
   420  	err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
   421  	if err != nil {
   422  		return nil, toErr(ctx, err)
   423  	}
   424  
   425  	resp, rerr := stream.Recv()
   426  	if rerr != nil {
   427  		return nil, toErr(ctx, rerr)
   428  	}
   429  
   430  	karesp = &LeaseKeepAliveResponse{
   431  		ResponseHeader: resp.GetHeader(),
   432  		ID:             LeaseID(resp.ID),
   433  		TTL:            resp.TTL,
   434  	}
   435  	return karesp, nil
   436  }
   437  
   438  func (l *lessor) recvKeepAliveLoop() (gerr error) {
   439  	defer func() {
   440  		l.mu.Lock()
   441  		close(l.donec)
   442  		l.loopErr = gerr
   443  		for _, ka := range l.keepAlives {
   444  			ka.close()
   445  		}
   446  		l.keepAlives = make(map[LeaseID]*keepAlive)
   447  		l.mu.Unlock()
   448  	}()
   449  
   450  	for {
   451  		stream, err := l.resetRecv()
   452  		if err != nil {
   453  			if canceledByCaller(l.stopCtx, err) {
   454  				return err
   455  			}
   456  		} else {
   457  			for {
   458  				resp, err := stream.Recv()
   459  				if err != nil {
   460  					if canceledByCaller(l.stopCtx, err) {
   461  						return err
   462  					}
   463  
   464  					if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
   465  						l.closeRequireLeader()
   466  					}
   467  					break
   468  				}
   469  
   470  				l.recvKeepAlive(resp)
   471  			}
   472  		}
   473  
   474  		select {
   475  		case <-time.After(retryConnWait):
   476  		case <-l.stopCtx.Done():
   477  			return l.stopCtx.Err()
   478  		}
   479  	}
   480  }
   481  
   482  // resetRecv opens a new lease stream and starts sending keep alive requests.
   483  func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
   484  	sctx, cancel := context.WithCancel(l.stopCtx)
   485  	stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
   486  	if err != nil {
   487  		cancel()
   488  		return nil, err
   489  	}
   490  
   491  	l.mu.Lock()
   492  	defer l.mu.Unlock()
   493  	if l.stream != nil && l.streamCancel != nil {
   494  		l.streamCancel()
   495  	}
   496  
   497  	l.streamCancel = cancel
   498  	l.stream = stream
   499  
   500  	go l.sendKeepAliveLoop(stream)
   501  	return stream, nil
   502  }
   503  
   504  // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
   505  func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
   506  	karesp := &LeaseKeepAliveResponse{
   507  		ResponseHeader: resp.GetHeader(),
   508  		ID:             LeaseID(resp.ID),
   509  		TTL:            resp.TTL,
   510  	}
   511  
   512  	l.mu.Lock()
   513  	defer l.mu.Unlock()
   514  
   515  	ka, ok := l.keepAlives[karesp.ID]
   516  	if !ok {
   517  		return
   518  	}
   519  
   520  	if karesp.TTL <= 0 {
   521  		// lease expired; close all keep alive channels
   522  		delete(l.keepAlives, karesp.ID)
   523  		ka.close()
   524  		return
   525  	}
   526  
   527  	// send update to all channels
   528  	nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
   529  	ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
   530  	for _, ch := range ka.chs {
   531  		select {
   532  		case ch <- karesp:
   533  		default:
   534  			if l.lg != nil {
   535  				l.lg.Warn("lease keepalive response queue is full; dropping response send",
   536  					zap.Int("queue-size", len(ch)),
   537  					zap.Int("queue-capacity", cap(ch)),
   538  				)
   539  			}
   540  		}
   541  		// still advance in order to rate-limit keep-alive sends
   542  		ka.nextKeepAlive = nextKeepAlive
   543  	}
   544  }
   545  
   546  // deadlineLoop reaps any keep alive channels that have not received a response
   547  // within the lease TTL
   548  func (l *lessor) deadlineLoop() {
   549  	for {
   550  		select {
   551  		case <-time.After(time.Second):
   552  		case <-l.donec:
   553  			return
   554  		}
   555  		now := time.Now()
   556  		l.mu.Lock()
   557  		for id, ka := range l.keepAlives {
   558  			if ka.deadline.Before(now) {
   559  				// waited too long for response; lease may be expired
   560  				ka.close()
   561  				delete(l.keepAlives, id)
   562  			}
   563  		}
   564  		l.mu.Unlock()
   565  	}
   566  }
   567  
   568  // sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
   569  func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
   570  	for {
   571  		var tosend []LeaseID
   572  
   573  		now := time.Now()
   574  		l.mu.Lock()
   575  		for id, ka := range l.keepAlives {
   576  			if ka.nextKeepAlive.Before(now) {
   577  				tosend = append(tosend, id)
   578  			}
   579  		}
   580  		l.mu.Unlock()
   581  
   582  		for _, id := range tosend {
   583  			r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
   584  			if err := stream.Send(r); err != nil {
   585  				// TODO do something with this error?
   586  				return
   587  			}
   588  		}
   589  
   590  		select {
   591  		case <-time.After(retryConnWait):
   592  		case <-stream.Context().Done():
   593  			return
   594  		case <-l.donec:
   595  			return
   596  		case <-l.stopCtx.Done():
   597  			return
   598  		}
   599  	}
   600  }
   601  
   602  func (ka *keepAlive) close() {
   603  	close(ka.donec)
   604  	for _, ch := range ka.chs {
   605  		close(ch)
   606  	}
   607  }
   608  

View as plain text