...

Source file src/go.etcd.io/etcd/server/v3/proxy/grpcproxy/lease.go

Documentation: go.etcd.io/etcd/server/v3/proxy/grpcproxy

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package grpcproxy
    16  
    17  import (
    18  	"context"
    19  	"io"
    20  	"sync"
    21  	"sync/atomic"
    22  	"time"
    23  
    24  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    25  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    26  	"go.etcd.io/etcd/client/v3"
    27  
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/codes"
    30  	"google.golang.org/grpc/metadata"
    31  	"google.golang.org/grpc/status"
    32  )
    33  
    34  type leaseProxy struct {
    35  	// leaseClient handles req from LeaseGrant() that requires a lease ID.
    36  	leaseClient pb.LeaseClient
    37  
    38  	lessor clientv3.Lease
    39  
    40  	ctx context.Context
    41  
    42  	leader *leader
    43  
    44  	// mu protects adding outstanding leaseProxyStream through wg.
    45  	mu sync.RWMutex
    46  
    47  	// wg waits until all outstanding leaseProxyStream quit.
    48  	wg sync.WaitGroup
    49  }
    50  
    51  func NewLeaseProxy(ctx context.Context, c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) {
    52  	cctx, cancel := context.WithCancel(ctx)
    53  	lp := &leaseProxy{
    54  		leaseClient: pb.NewLeaseClient(c.ActiveConnection()),
    55  		lessor:      c.Lease,
    56  		ctx:         cctx,
    57  		leader:      newLeader(cctx, c.Watcher),
    58  	}
    59  	ch := make(chan struct{})
    60  	go func() {
    61  		defer close(ch)
    62  		<-lp.leader.stopNotify()
    63  		lp.mu.Lock()
    64  		select {
    65  		case <-lp.ctx.Done():
    66  		case <-lp.leader.disconnectNotify():
    67  			cancel()
    68  		}
    69  		<-lp.ctx.Done()
    70  		lp.mu.Unlock()
    71  		lp.wg.Wait()
    72  	}()
    73  	return lp, ch
    74  }
    75  
    76  func (lp *leaseProxy) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
    77  	rp, err := lp.leaseClient.LeaseGrant(ctx, cr, grpc.WaitForReady(true))
    78  	if err != nil {
    79  		return nil, err
    80  	}
    81  	lp.leader.gotLeader()
    82  	return rp, nil
    83  }
    84  
    85  func (lp *leaseProxy) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
    86  	r, err := lp.lessor.Revoke(ctx, clientv3.LeaseID(rr.ID))
    87  	if err != nil {
    88  		return nil, err
    89  	}
    90  	lp.leader.gotLeader()
    91  	return (*pb.LeaseRevokeResponse)(r), nil
    92  }
    93  
    94  func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
    95  	var (
    96  		r   *clientv3.LeaseTimeToLiveResponse
    97  		err error
    98  	)
    99  	if rr.Keys {
   100  		r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID), clientv3.WithAttachedKeys())
   101  	} else {
   102  		r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID))
   103  	}
   104  	if err != nil {
   105  		return nil, err
   106  	}
   107  	rp := &pb.LeaseTimeToLiveResponse{
   108  		Header:     r.ResponseHeader,
   109  		ID:         int64(r.ID),
   110  		TTL:        r.TTL,
   111  		GrantedTTL: r.GrantedTTL,
   112  		Keys:       r.Keys,
   113  	}
   114  	return rp, err
   115  }
   116  
   117  func (lp *leaseProxy) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
   118  	r, err := lp.lessor.Leases(ctx)
   119  	if err != nil {
   120  		return nil, err
   121  	}
   122  	leases := make([]*pb.LeaseStatus, len(r.Leases))
   123  	for i := range r.Leases {
   124  		leases[i] = &pb.LeaseStatus{ID: int64(r.Leases[i].ID)}
   125  	}
   126  	rp := &pb.LeaseLeasesResponse{
   127  		Header: r.ResponseHeader,
   128  		Leases: leases,
   129  	}
   130  	return rp, err
   131  }
   132  
   133  func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
   134  	lp.mu.Lock()
   135  	select {
   136  	case <-lp.ctx.Done():
   137  		lp.mu.Unlock()
   138  		return lp.ctx.Err()
   139  	default:
   140  		lp.wg.Add(1)
   141  	}
   142  	lp.mu.Unlock()
   143  
   144  	ctx, cancel := context.WithCancel(stream.Context())
   145  	lps := leaseProxyStream{
   146  		stream:          stream,
   147  		lessor:          lp.lessor,
   148  		keepAliveLeases: make(map[int64]*atomicCounter),
   149  		respc:           make(chan *pb.LeaseKeepAliveResponse),
   150  		ctx:             ctx,
   151  		cancel:          cancel,
   152  	}
   153  
   154  	errc := make(chan error, 2)
   155  
   156  	var lostLeaderC <-chan struct{}
   157  	if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
   158  		v := md[rpctypes.MetadataRequireLeaderKey]
   159  		if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
   160  			lostLeaderC = lp.leader.lostNotify()
   161  			// if leader is known to be lost at creation time, avoid
   162  			// letting events through at all
   163  			select {
   164  			case <-lostLeaderC:
   165  				lp.wg.Done()
   166  				return rpctypes.ErrNoLeader
   167  			default:
   168  			}
   169  		}
   170  	}
   171  	stopc := make(chan struct{}, 3)
   172  	go func() {
   173  		defer func() { stopc <- struct{}{} }()
   174  		if err := lps.recvLoop(); err != nil {
   175  			errc <- err
   176  		}
   177  	}()
   178  
   179  	go func() {
   180  		defer func() { stopc <- struct{}{} }()
   181  		if err := lps.sendLoop(); err != nil {
   182  			errc <- err
   183  		}
   184  	}()
   185  
   186  	// tears down LeaseKeepAlive stream if leader goes down or entire leaseProxy is terminated.
   187  	go func() {
   188  		defer func() { stopc <- struct{}{} }()
   189  		select {
   190  		case <-lostLeaderC:
   191  		case <-ctx.Done():
   192  		case <-lp.ctx.Done():
   193  		}
   194  	}()
   195  
   196  	var err error
   197  	select {
   198  	case <-stopc:
   199  		stopc <- struct{}{}
   200  	case err = <-errc:
   201  	}
   202  	cancel()
   203  
   204  	// recv/send may only shutdown after function exits;
   205  	// this goroutine notifies lease proxy that the stream is through
   206  	go func() {
   207  		<-stopc
   208  		<-stopc
   209  		<-stopc
   210  		lps.close()
   211  		close(errc)
   212  		lp.wg.Done()
   213  	}()
   214  
   215  	select {
   216  	case <-lostLeaderC:
   217  		return rpctypes.ErrNoLeader
   218  	case <-lp.leader.disconnectNotify():
   219  		return status.Error(codes.Canceled, "the client connection is closing")
   220  	default:
   221  		if err != nil {
   222  			return err
   223  		}
   224  		return ctx.Err()
   225  	}
   226  }
   227  
   228  type leaseProxyStream struct {
   229  	stream pb.Lease_LeaseKeepAliveServer
   230  
   231  	lessor clientv3.Lease
   232  	// wg tracks keepAliveLoop goroutines
   233  	wg sync.WaitGroup
   234  	// mu protects keepAliveLeases
   235  	mu sync.RWMutex
   236  	// keepAliveLeases tracks how many outstanding keepalive requests which need responses are on a lease.
   237  	keepAliveLeases map[int64]*atomicCounter
   238  	// respc receives lease keepalive responses from etcd backend
   239  	respc chan *pb.LeaseKeepAliveResponse
   240  
   241  	ctx    context.Context
   242  	cancel context.CancelFunc
   243  }
   244  
   245  func (lps *leaseProxyStream) recvLoop() error {
   246  	for {
   247  		rr, err := lps.stream.Recv()
   248  		if err == io.EOF {
   249  			return nil
   250  		}
   251  		if err != nil {
   252  			return err
   253  		}
   254  		lps.mu.Lock()
   255  		neededResps, ok := lps.keepAliveLeases[rr.ID]
   256  		if !ok {
   257  			neededResps = &atomicCounter{}
   258  			lps.keepAliveLeases[rr.ID] = neededResps
   259  			lps.wg.Add(1)
   260  			go func() {
   261  				defer lps.wg.Done()
   262  				if err := lps.keepAliveLoop(rr.ID, neededResps); err != nil {
   263  					lps.cancel()
   264  				}
   265  			}()
   266  		}
   267  		neededResps.add(1)
   268  		lps.mu.Unlock()
   269  	}
   270  }
   271  
   272  func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
   273  	cctx, ccancel := context.WithCancel(lps.ctx)
   274  	defer ccancel()
   275  	respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
   276  	if err != nil {
   277  		return err
   278  	}
   279  	// ticker expires when loop hasn't received keepalive within TTL
   280  	var ticker <-chan time.Time
   281  	for {
   282  		select {
   283  		case <-ticker:
   284  			lps.mu.Lock()
   285  			// if there are outstanding keepAlive reqs at the moment of ticker firing,
   286  			// don't close keepAliveLoop(), let it continuing to process the KeepAlive reqs.
   287  			if neededResps.get() > 0 {
   288  				lps.mu.Unlock()
   289  				ticker = nil
   290  				continue
   291  			}
   292  			delete(lps.keepAliveLeases, leaseID)
   293  			lps.mu.Unlock()
   294  			return nil
   295  		case rp, ok := <-respc:
   296  			if !ok {
   297  				lps.mu.Lock()
   298  				delete(lps.keepAliveLeases, leaseID)
   299  				lps.mu.Unlock()
   300  				if neededResps.get() == 0 {
   301  					return nil
   302  				}
   303  				ttlResp, err := lps.lessor.TimeToLive(cctx, clientv3.LeaseID(leaseID))
   304  				if err != nil {
   305  					return err
   306  				}
   307  				r := &pb.LeaseKeepAliveResponse{
   308  					Header: ttlResp.ResponseHeader,
   309  					ID:     int64(ttlResp.ID),
   310  					TTL:    ttlResp.TTL,
   311  				}
   312  				for neededResps.get() > 0 {
   313  					select {
   314  					case lps.respc <- r:
   315  						neededResps.add(-1)
   316  					case <-lps.ctx.Done():
   317  						return nil
   318  					}
   319  				}
   320  				return nil
   321  			}
   322  			if neededResps.get() == 0 {
   323  				continue
   324  			}
   325  			ticker = time.After(time.Duration(rp.TTL) * time.Second)
   326  			r := &pb.LeaseKeepAliveResponse{
   327  				Header: rp.ResponseHeader,
   328  				ID:     int64(rp.ID),
   329  				TTL:    rp.TTL,
   330  			}
   331  			lps.replyToClient(r, neededResps)
   332  		}
   333  	}
   334  }
   335  
   336  func (lps *leaseProxyStream) replyToClient(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter) {
   337  	timer := time.After(500 * time.Millisecond)
   338  	for neededResps.get() > 0 {
   339  		select {
   340  		case lps.respc <- r:
   341  			neededResps.add(-1)
   342  		case <-timer:
   343  			return
   344  		case <-lps.ctx.Done():
   345  			return
   346  		}
   347  	}
   348  }
   349  
   350  func (lps *leaseProxyStream) sendLoop() error {
   351  	for {
   352  		select {
   353  		case lrp, ok := <-lps.respc:
   354  			if !ok {
   355  				return nil
   356  			}
   357  			if err := lps.stream.Send(lrp); err != nil {
   358  				return err
   359  			}
   360  		case <-lps.ctx.Done():
   361  			return lps.ctx.Err()
   362  		}
   363  	}
   364  }
   365  
   366  func (lps *leaseProxyStream) close() {
   367  	lps.cancel()
   368  	lps.wg.Wait()
   369  	// only close respc channel if all the keepAliveLoop() goroutines have finished
   370  	// this ensures those goroutines don't send resp to a closed resp channel
   371  	close(lps.respc)
   372  }
   373  
   374  type atomicCounter struct {
   375  	counter int64
   376  }
   377  
   378  func (ac *atomicCounter) add(delta int64) {
   379  	atomic.AddInt64(&ac.counter, delta)
   380  }
   381  
   382  func (ac *atomicCounter) get() int64 {
   383  	return atomic.LoadInt64(&ac.counter)
   384  }
   385  

View as plain text