...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3rpc
16
17 import (
18 "context"
19 "io"
20
21 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
23 "go.etcd.io/etcd/server/v3/etcdserver"
24 "go.etcd.io/etcd/server/v3/lease"
25
26 "go.uber.org/zap"
27 )
28
29 type LeaseServer struct {
30 lg *zap.Logger
31 hdr header
32 le etcdserver.Lessor
33 }
34
35 func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
36 srv := &LeaseServer{lg: s.Cfg.Logger, le: s, hdr: newHeader(s)}
37 if srv.lg == nil {
38 srv.lg = zap.NewNop()
39 }
40 return srv
41 }
42
43 func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
44 resp, err := ls.le.LeaseGrant(ctx, cr)
45
46 if err != nil {
47 return nil, togRPCError(err)
48 }
49 ls.hdr.fill(resp.Header)
50 return resp, nil
51 }
52
53 func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
54 resp, err := ls.le.LeaseRevoke(ctx, rr)
55 if err != nil {
56 return nil, togRPCError(err)
57 }
58 ls.hdr.fill(resp.Header)
59 return resp, nil
60 }
61
62 func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
63 resp, err := ls.le.LeaseTimeToLive(ctx, rr)
64 if err != nil && err != lease.ErrLeaseNotFound {
65 return nil, togRPCError(err)
66 }
67 if err == lease.ErrLeaseNotFound {
68 resp = &pb.LeaseTimeToLiveResponse{
69 Header: &pb.ResponseHeader{},
70 ID: rr.ID,
71 TTL: -1,
72 }
73 }
74 ls.hdr.fill(resp.Header)
75 return resp, nil
76 }
77
78 func (ls *LeaseServer) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
79 resp, err := ls.le.LeaseLeases(ctx, rr)
80 if err != nil && err != lease.ErrLeaseNotFound {
81 return nil, togRPCError(err)
82 }
83 if err == lease.ErrLeaseNotFound {
84 resp = &pb.LeaseLeasesResponse{
85 Header: &pb.ResponseHeader{},
86 Leases: []*pb.LeaseStatus{},
87 }
88 }
89 ls.hdr.fill(resp.Header)
90 return resp, nil
91 }
92
93 func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
94 errc := make(chan error, 1)
95 go func() {
96 errc <- ls.leaseKeepAlive(stream)
97 }()
98 select {
99 case err = <-errc:
100 case <-stream.Context().Done():
101
102 err = stream.Context().Err()
103 if err == context.Canceled {
104 err = rpctypes.ErrGRPCNoLeader
105 }
106 }
107 return err
108 }
109
110 func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
111 for {
112 req, err := stream.Recv()
113 if err == io.EOF {
114 return nil
115 }
116 if err != nil {
117 if isClientCtxErr(stream.Context().Err(), err) {
118 ls.lg.Debug("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
119 } else {
120 ls.lg.Warn("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
121 streamFailures.WithLabelValues("receive", "lease-keepalive").Inc()
122 }
123 return err
124 }
125
126
127
128
129
130
131
132 resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
133 ls.hdr.fill(resp.Header)
134
135 ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
136 if err == lease.ErrLeaseNotFound {
137 err = nil
138 ttl = 0
139 }
140
141 if err != nil {
142 return togRPCError(err)
143 }
144
145 resp.TTL = ttl
146 err = stream.Send(resp)
147 if err != nil {
148 if isClientCtxErr(stream.Context().Err(), err) {
149 ls.lg.Debug("failed to send lease keepalive response to gRPC stream", zap.Error(err))
150 } else {
151 ls.lg.Warn("failed to send lease keepalive response to gRPC stream", zap.Error(err))
152 streamFailures.WithLabelValues("send", "lease-keepalive").Inc()
153 }
154 return err
155 }
156 }
157 }
158
View as plain text