1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package leasehttp
16
17 import (
18 "bytes"
19 "context"
20 "errors"
21 "fmt"
22 "io/ioutil"
23 "net/http"
24 "time"
25
26 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
27 "go.etcd.io/etcd/pkg/v3/httputil"
28 "go.etcd.io/etcd/server/v3/lease"
29 "go.etcd.io/etcd/server/v3/lease/leasepb"
30 )
31
32 var (
33 LeasePrefix = "/leases"
34 LeaseInternalPrefix = "/leases/internal"
35 applyTimeout = time.Second
36 ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
37 )
38
39
40 func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
41 return &leaseHandler{l, waitch}
42 }
43
44 type leaseHandler struct {
45 l lease.Lessor
46 waitch func() <-chan struct{}
47 }
48
49 func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
50 if r.Method != "POST" {
51 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
52 return
53 }
54
55 defer r.Body.Close()
56 b, err := ioutil.ReadAll(r.Body)
57 if err != nil {
58 http.Error(w, "error reading body", http.StatusBadRequest)
59 return
60 }
61
62 var v []byte
63 switch r.URL.Path {
64 case LeasePrefix:
65 lreq := pb.LeaseKeepAliveRequest{}
66 if uerr := lreq.Unmarshal(b); uerr != nil {
67 http.Error(w, "error unmarshalling request", http.StatusBadRequest)
68 return
69 }
70 select {
71 case <-h.waitch():
72 case <-time.After(applyTimeout):
73 http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
74 return
75 }
76 ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
77 if rerr != nil {
78 if rerr == lease.ErrLeaseNotFound {
79 http.Error(w, rerr.Error(), http.StatusNotFound)
80 return
81 }
82
83 http.Error(w, rerr.Error(), http.StatusBadRequest)
84 return
85 }
86
87 resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
88 v, err = resp.Marshal()
89 if err != nil {
90 http.Error(w, err.Error(), http.StatusInternalServerError)
91 return
92 }
93
94 case LeaseInternalPrefix:
95 lreq := leasepb.LeaseInternalRequest{}
96 if lerr := lreq.Unmarshal(b); lerr != nil {
97 http.Error(w, "error unmarshalling request", http.StatusBadRequest)
98 return
99 }
100 select {
101 case <-h.waitch():
102 case <-time.After(applyTimeout):
103 http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
104 return
105 }
106 l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
107 if l == nil {
108 http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
109 return
110 }
111
112 resp := &leasepb.LeaseInternalResponse{
113 LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{
114 Header: &pb.ResponseHeader{},
115 ID: lreq.LeaseTimeToLiveRequest.ID,
116 TTL: int64(l.Remaining().Seconds()),
117 GrantedTTL: l.TTL(),
118 },
119 }
120 if lreq.LeaseTimeToLiveRequest.Keys {
121 ks := l.Keys()
122 kbs := make([][]byte, len(ks))
123 for i := range ks {
124 kbs[i] = []byte(ks[i])
125 }
126 resp.LeaseTimeToLiveResponse.Keys = kbs
127 }
128
129 v, err = resp.Marshal()
130 if err != nil {
131 http.Error(w, err.Error(), http.StatusInternalServerError)
132 return
133 }
134
135 default:
136 http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest)
137 return
138 }
139
140 w.Header().Set("Content-Type", "application/protobuf")
141 w.Write(v)
142 }
143
144
145
146 func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
147
148 lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
149 if err != nil {
150 return -1, err
151 }
152
153 cc := &http.Client{
154 Transport: rt,
155 CheckRedirect: func(req *http.Request, via []*http.Request) error {
156 return http.ErrUseLastResponse
157 },
158 }
159 req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
160 if err != nil {
161 return -1, err
162 }
163 req.Header.Set("Content-Type", "application/protobuf")
164 req.Cancel = ctx.Done()
165
166 resp, err := cc.Do(req)
167 if err != nil {
168 return -1, err
169 }
170 b, err := readResponse(resp)
171 if err != nil {
172 return -1, err
173 }
174
175 if resp.StatusCode == http.StatusRequestTimeout {
176 return -1, ErrLeaseHTTPTimeout
177 }
178
179 if resp.StatusCode == http.StatusNotFound {
180 return -1, lease.ErrLeaseNotFound
181 }
182
183 if resp.StatusCode != http.StatusOK {
184 return -1, fmt.Errorf("lease: unknown error(%s)", string(b))
185 }
186
187 lresp := &pb.LeaseKeepAliveResponse{}
188 if err := lresp.Unmarshal(b); err != nil {
189 return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
190 }
191 if lresp.ID != int64(id) {
192 return -1, fmt.Errorf("lease: renew id mismatch")
193 }
194 return lresp.TTL, nil
195 }
196
197
198 func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
199
200 lreq, err := (&leasepb.LeaseInternalRequest{
201 LeaseTimeToLiveRequest: &pb.LeaseTimeToLiveRequest{
202 ID: int64(id),
203 Keys: keys,
204 },
205 }).Marshal()
206 if err != nil {
207 return nil, err
208 }
209
210 req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
211 if err != nil {
212 return nil, err
213 }
214 req.Header.Set("Content-Type", "application/protobuf")
215
216 req = req.WithContext(ctx)
217
218 cc := &http.Client{
219 Transport: rt,
220 CheckRedirect: func(req *http.Request, via []*http.Request) error {
221 return http.ErrUseLastResponse
222 },
223 }
224 var b []byte
225
226 resp, err := cc.Do(req)
227 if err != nil {
228 return nil, err
229 }
230 b, err = readResponse(resp)
231 if err != nil {
232 return nil, err
233 }
234 if resp.StatusCode == http.StatusRequestTimeout {
235 return nil, ErrLeaseHTTPTimeout
236 }
237 if resp.StatusCode == http.StatusNotFound {
238 return nil, lease.ErrLeaseNotFound
239 }
240 if resp.StatusCode != http.StatusOK {
241 return nil, fmt.Errorf("lease: unknown error(%s)", string(b))
242 }
243
244 lresp := &leasepb.LeaseInternalResponse{}
245 if err := lresp.Unmarshal(b); err != nil {
246 return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
247 }
248 if lresp.LeaseTimeToLiveResponse.ID != int64(id) {
249 return nil, fmt.Errorf("lease: renew id mismatch")
250 }
251 return lresp, nil
252 }
253
254 func readResponse(resp *http.Response) (b []byte, err error) {
255 b, err = ioutil.ReadAll(resp.Body)
256 httputil.GracefulClose(resp)
257 return
258 }
259
View as plain text