...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc/lease.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v3rpc
    16  
    17  import (
    18  	"context"
    19  	"io"
    20  
    21  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    22  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    23  	"go.etcd.io/etcd/server/v3/etcdserver"
    24  	"go.etcd.io/etcd/server/v3/lease"
    25  
    26  	"go.uber.org/zap"
    27  )
    28  
    29  type LeaseServer struct {
    30  	lg  *zap.Logger
    31  	hdr header
    32  	le  etcdserver.Lessor
    33  }
    34  
    35  func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
    36  	srv := &LeaseServer{lg: s.Cfg.Logger, le: s, hdr: newHeader(s)}
    37  	if srv.lg == nil {
    38  		srv.lg = zap.NewNop()
    39  	}
    40  	return srv
    41  }
    42  
    43  func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
    44  	resp, err := ls.le.LeaseGrant(ctx, cr)
    45  
    46  	if err != nil {
    47  		return nil, togRPCError(err)
    48  	}
    49  	ls.hdr.fill(resp.Header)
    50  	return resp, nil
    51  }
    52  
    53  func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
    54  	resp, err := ls.le.LeaseRevoke(ctx, rr)
    55  	if err != nil {
    56  		return nil, togRPCError(err)
    57  	}
    58  	ls.hdr.fill(resp.Header)
    59  	return resp, nil
    60  }
    61  
    62  func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
    63  	resp, err := ls.le.LeaseTimeToLive(ctx, rr)
    64  	if err != nil && err != lease.ErrLeaseNotFound {
    65  		return nil, togRPCError(err)
    66  	}
    67  	if err == lease.ErrLeaseNotFound {
    68  		resp = &pb.LeaseTimeToLiveResponse{
    69  			Header: &pb.ResponseHeader{},
    70  			ID:     rr.ID,
    71  			TTL:    -1,
    72  		}
    73  	}
    74  	ls.hdr.fill(resp.Header)
    75  	return resp, nil
    76  }
    77  
    78  func (ls *LeaseServer) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
    79  	resp, err := ls.le.LeaseLeases(ctx, rr)
    80  	if err != nil && err != lease.ErrLeaseNotFound {
    81  		return nil, togRPCError(err)
    82  	}
    83  	if err == lease.ErrLeaseNotFound {
    84  		resp = &pb.LeaseLeasesResponse{
    85  			Header: &pb.ResponseHeader{},
    86  			Leases: []*pb.LeaseStatus{},
    87  		}
    88  	}
    89  	ls.hdr.fill(resp.Header)
    90  	return resp, nil
    91  }
    92  
    93  func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
    94  	errc := make(chan error, 1)
    95  	go func() {
    96  		errc <- ls.leaseKeepAlive(stream)
    97  	}()
    98  	select {
    99  	case err = <-errc:
   100  	case <-stream.Context().Done():
   101  		// the only server-side cancellation is noleader for now.
   102  		err = stream.Context().Err()
   103  		if err == context.Canceled {
   104  			err = rpctypes.ErrGRPCNoLeader
   105  		}
   106  	}
   107  	return err
   108  }
   109  
   110  func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
   111  	for {
   112  		req, err := stream.Recv()
   113  		if err == io.EOF {
   114  			return nil
   115  		}
   116  		if err != nil {
   117  			if isClientCtxErr(stream.Context().Err(), err) {
   118  				ls.lg.Debug("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
   119  			} else {
   120  				ls.lg.Warn("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
   121  				streamFailures.WithLabelValues("receive", "lease-keepalive").Inc()
   122  			}
   123  			return err
   124  		}
   125  
   126  		// Create header before we sent out the renew request.
   127  		// This can make sure that the revision is strictly smaller or equal to
   128  		// when the keepalive happened at the local server (when the local server is the leader)
   129  		// or remote leader.
   130  		// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
   131  		// at rev 4.
   132  		resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
   133  		ls.hdr.fill(resp.Header)
   134  
   135  		ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
   136  		if err == lease.ErrLeaseNotFound {
   137  			err = nil
   138  			ttl = 0
   139  		}
   140  
   141  		if err != nil {
   142  			return togRPCError(err)
   143  		}
   144  
   145  		resp.TTL = ttl
   146  		err = stream.Send(resp)
   147  		if err != nil {
   148  			if isClientCtxErr(stream.Context().Err(), err) {
   149  				ls.lg.Debug("failed to send lease keepalive response to gRPC stream", zap.Error(err))
   150  			} else {
   151  				ls.lg.Warn("failed to send lease keepalive response to gRPC stream", zap.Error(err))
   152  				streamFailures.WithLabelValues("send", "lease-keepalive").Inc()
   153  			}
   154  			return err
   155  		}
   156  	}
   157  }
   158  

View as plain text