...

Source file src/go.etcd.io/etcd/server/v3/lease/leasehttp/http.go

Documentation: go.etcd.io/etcd/server/v3/lease/leasehttp

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package leasehttp
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"errors"
    21  	"fmt"
    22  	"io/ioutil"
    23  	"net/http"
    24  	"time"
    25  
    26  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    27  	"go.etcd.io/etcd/pkg/v3/httputil"
    28  	"go.etcd.io/etcd/server/v3/lease"
    29  	"go.etcd.io/etcd/server/v3/lease/leasepb"
    30  )
    31  
    32  var (
    33  	LeasePrefix         = "/leases"
    34  	LeaseInternalPrefix = "/leases/internal"
    35  	applyTimeout        = time.Second
    36  	ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
    37  )
    38  
    39  // NewHandler returns an http Handler for lease renewals
    40  func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
    41  	return &leaseHandler{l, waitch}
    42  }
    43  
    44  type leaseHandler struct {
    45  	l      lease.Lessor
    46  	waitch func() <-chan struct{}
    47  }
    48  
    49  func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    50  	if r.Method != "POST" {
    51  		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
    52  		return
    53  	}
    54  
    55  	defer r.Body.Close()
    56  	b, err := ioutil.ReadAll(r.Body)
    57  	if err != nil {
    58  		http.Error(w, "error reading body", http.StatusBadRequest)
    59  		return
    60  	}
    61  
    62  	var v []byte
    63  	switch r.URL.Path {
    64  	case LeasePrefix:
    65  		lreq := pb.LeaseKeepAliveRequest{}
    66  		if uerr := lreq.Unmarshal(b); uerr != nil {
    67  			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
    68  			return
    69  		}
    70  		select {
    71  		case <-h.waitch():
    72  		case <-time.After(applyTimeout):
    73  			http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
    74  			return
    75  		}
    76  		ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
    77  		if rerr != nil {
    78  			if rerr == lease.ErrLeaseNotFound {
    79  				http.Error(w, rerr.Error(), http.StatusNotFound)
    80  				return
    81  			}
    82  
    83  			http.Error(w, rerr.Error(), http.StatusBadRequest)
    84  			return
    85  		}
    86  		// TODO: fill out ResponseHeader
    87  		resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
    88  		v, err = resp.Marshal()
    89  		if err != nil {
    90  			http.Error(w, err.Error(), http.StatusInternalServerError)
    91  			return
    92  		}
    93  
    94  	case LeaseInternalPrefix:
    95  		lreq := leasepb.LeaseInternalRequest{}
    96  		if lerr := lreq.Unmarshal(b); lerr != nil {
    97  			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
    98  			return
    99  		}
   100  		select {
   101  		case <-h.waitch():
   102  		case <-time.After(applyTimeout):
   103  			http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
   104  			return
   105  		}
   106  		l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
   107  		if l == nil {
   108  			http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
   109  			return
   110  		}
   111  		// TODO: fill out ResponseHeader
   112  		resp := &leasepb.LeaseInternalResponse{
   113  			LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{
   114  				Header:     &pb.ResponseHeader{},
   115  				ID:         lreq.LeaseTimeToLiveRequest.ID,
   116  				TTL:        int64(l.Remaining().Seconds()),
   117  				GrantedTTL: l.TTL(),
   118  			},
   119  		}
   120  		if lreq.LeaseTimeToLiveRequest.Keys {
   121  			ks := l.Keys()
   122  			kbs := make([][]byte, len(ks))
   123  			for i := range ks {
   124  				kbs[i] = []byte(ks[i])
   125  			}
   126  			resp.LeaseTimeToLiveResponse.Keys = kbs
   127  		}
   128  
   129  		v, err = resp.Marshal()
   130  		if err != nil {
   131  			http.Error(w, err.Error(), http.StatusInternalServerError)
   132  			return
   133  		}
   134  
   135  	default:
   136  		http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest)
   137  		return
   138  	}
   139  
   140  	w.Header().Set("Content-Type", "application/protobuf")
   141  	w.Write(v)
   142  }
   143  
   144  // RenewHTTP renews a lease at a given primary server.
   145  // TODO: Batch request in future?
   146  func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
   147  	// will post lreq protobuf to leader
   148  	lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
   149  	if err != nil {
   150  		return -1, err
   151  	}
   152  
   153  	cc := &http.Client{
   154  		Transport: rt,
   155  		CheckRedirect: func(req *http.Request, via []*http.Request) error {
   156  			return http.ErrUseLastResponse
   157  		},
   158  	}
   159  	req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
   160  	if err != nil {
   161  		return -1, err
   162  	}
   163  	req.Header.Set("Content-Type", "application/protobuf")
   164  	req.Cancel = ctx.Done()
   165  
   166  	resp, err := cc.Do(req)
   167  	if err != nil {
   168  		return -1, err
   169  	}
   170  	b, err := readResponse(resp)
   171  	if err != nil {
   172  		return -1, err
   173  	}
   174  
   175  	if resp.StatusCode == http.StatusRequestTimeout {
   176  		return -1, ErrLeaseHTTPTimeout
   177  	}
   178  
   179  	if resp.StatusCode == http.StatusNotFound {
   180  		return -1, lease.ErrLeaseNotFound
   181  	}
   182  
   183  	if resp.StatusCode != http.StatusOK {
   184  		return -1, fmt.Errorf("lease: unknown error(%s)", string(b))
   185  	}
   186  
   187  	lresp := &pb.LeaseKeepAliveResponse{}
   188  	if err := lresp.Unmarshal(b); err != nil {
   189  		return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
   190  	}
   191  	if lresp.ID != int64(id) {
   192  		return -1, fmt.Errorf("lease: renew id mismatch")
   193  	}
   194  	return lresp.TTL, nil
   195  }
   196  
   197  // TimeToLiveHTTP retrieves lease information of the given lease ID.
   198  func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
   199  	// will post lreq protobuf to leader
   200  	lreq, err := (&leasepb.LeaseInternalRequest{
   201  		LeaseTimeToLiveRequest: &pb.LeaseTimeToLiveRequest{
   202  			ID:   int64(id),
   203  			Keys: keys,
   204  		},
   205  	}).Marshal()
   206  	if err != nil {
   207  		return nil, err
   208  	}
   209  
   210  	req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
   211  	if err != nil {
   212  		return nil, err
   213  	}
   214  	req.Header.Set("Content-Type", "application/protobuf")
   215  
   216  	req = req.WithContext(ctx)
   217  
   218  	cc := &http.Client{
   219  		Transport: rt,
   220  		CheckRedirect: func(req *http.Request, via []*http.Request) error {
   221  			return http.ErrUseLastResponse
   222  		},
   223  	}
   224  	var b []byte
   225  	// buffer errc channel so that errc don't block inside the go routinue
   226  	resp, err := cc.Do(req)
   227  	if err != nil {
   228  		return nil, err
   229  	}
   230  	b, err = readResponse(resp)
   231  	if err != nil {
   232  		return nil, err
   233  	}
   234  	if resp.StatusCode == http.StatusRequestTimeout {
   235  		return nil, ErrLeaseHTTPTimeout
   236  	}
   237  	if resp.StatusCode == http.StatusNotFound {
   238  		return nil, lease.ErrLeaseNotFound
   239  	}
   240  	if resp.StatusCode != http.StatusOK {
   241  		return nil, fmt.Errorf("lease: unknown error(%s)", string(b))
   242  	}
   243  
   244  	lresp := &leasepb.LeaseInternalResponse{}
   245  	if err := lresp.Unmarshal(b); err != nil {
   246  		return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
   247  	}
   248  	if lresp.LeaseTimeToLiveResponse.ID != int64(id) {
   249  		return nil, fmt.Errorf("lease: renew id mismatch")
   250  	}
   251  	return lresp, nil
   252  }
   253  
   254  func readResponse(resp *http.Response) (b []byte, err error) {
   255  	b, err = ioutil.ReadAll(resp.Body)
   256  	httputil.GracefulClose(resp)
   257  	return
   258  }
   259  

View as plain text