...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v2http/client.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v2http

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v2http
    16  
    17  import (
    18  	"context"
    19  	"encoding/json"
    20  	"errors"
    21  	"fmt"
    22  	"io/ioutil"
    23  	"net/http"
    24  	"net/url"
    25  	"path"
    26  	"strconv"
    27  	"strings"
    28  	"time"
    29  
    30  	"go.etcd.io/etcd/api/v3/etcdserverpb"
    31  	"go.etcd.io/etcd/client/pkg/v3/types"
    32  	"go.etcd.io/etcd/server/v3/etcdserver"
    33  	"go.etcd.io/etcd/server/v3/etcdserver/api"
    34  	"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
    35  	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
    36  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2auth"
    37  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
    38  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2http/httptypes"
    39  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    40  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
    41  
    42  	"github.com/jonboulle/clockwork"
    43  	"go.uber.org/zap"
    44  )
    45  
    46  const (
    47  	authPrefix     = "/v2/auth"
    48  	keysPrefix     = "/v2/keys"
    49  	machinesPrefix = "/v2/machines"
    50  	membersPrefix  = "/v2/members"
    51  	statsPrefix    = "/v2/stats"
    52  )
    53  
    54  // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
    55  func NewClientHandler(lg *zap.Logger, server etcdserver.ServerPeer, timeout time.Duration) http.Handler {
    56  	if lg == nil {
    57  		lg = zap.NewNop()
    58  	}
    59  	mux := http.NewServeMux()
    60  	etcdhttp.HandleBasic(lg, mux, server)
    61  	etcdhttp.HandleMetrics(mux)
    62  	etcdhttp.HandleHealthForV2(lg, mux, server)
    63  	handleV2(lg, mux, server, timeout)
    64  	return requestLogger(lg, mux)
    65  }
    66  
    67  func handleV2(lg *zap.Logger, mux *http.ServeMux, server etcdserver.ServerV2, timeout time.Duration) {
    68  	sec := v2auth.NewStore(lg, server, timeout)
    69  	kh := &keysHandler{
    70  		lg:                    lg,
    71  		sec:                   sec,
    72  		server:                server,
    73  		cluster:               server.Cluster(),
    74  		timeout:               timeout,
    75  		clientCertAuthEnabled: server.ClientCertAuthEnabled(),
    76  	}
    77  
    78  	sh := &statsHandler{
    79  		lg:    lg,
    80  		stats: server,
    81  	}
    82  
    83  	mh := &membersHandler{
    84  		lg:                    lg,
    85  		sec:                   sec,
    86  		server:                server,
    87  		cluster:               server.Cluster(),
    88  		timeout:               timeout,
    89  		clock:                 clockwork.NewRealClock(),
    90  		clientCertAuthEnabled: server.ClientCertAuthEnabled(),
    91  	}
    92  
    93  	mah := &machinesHandler{cluster: server.Cluster()}
    94  
    95  	sech := &authHandler{
    96  		lg:                    lg,
    97  		sec:                   sec,
    98  		cluster:               server.Cluster(),
    99  		clientCertAuthEnabled: server.ClientCertAuthEnabled(),
   100  	}
   101  	mux.HandleFunc("/", http.NotFound)
   102  	mux.Handle(keysPrefix, kh)
   103  	mux.Handle(keysPrefix+"/", kh)
   104  	mux.HandleFunc(statsPrefix+"/store", sh.serveStore)
   105  	mux.HandleFunc(statsPrefix+"/self", sh.serveSelf)
   106  	mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader)
   107  	mux.Handle(membersPrefix, mh)
   108  	mux.Handle(membersPrefix+"/", mh)
   109  	mux.Handle(machinesPrefix, mah)
   110  	handleAuth(mux, sech)
   111  }
   112  
   113  type keysHandler struct {
   114  	lg                    *zap.Logger
   115  	sec                   v2auth.Store
   116  	server                etcdserver.ServerV2
   117  	cluster               api.Cluster
   118  	timeout               time.Duration
   119  	clientCertAuthEnabled bool
   120  }
   121  
   122  func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   123  	if !allowMethod(w, r.Method, "HEAD", "GET", "PUT", "POST", "DELETE") {
   124  		return
   125  	}
   126  
   127  	w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
   128  
   129  	ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
   130  	defer cancel()
   131  	clock := clockwork.NewRealClock()
   132  	startTime := clock.Now()
   133  	rr, noValueOnSuccess, err := parseKeyRequest(r, clock)
   134  	if err != nil {
   135  		writeKeyError(h.lg, w, err)
   136  		return
   137  	}
   138  	// The path must be valid at this point (we've parsed the request successfully).
   139  	if !hasKeyPrefixAccess(h.lg, h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive, h.clientCertAuthEnabled) {
   140  		writeKeyNoAuth(w)
   141  		return
   142  	}
   143  	if !rr.Wait {
   144  		reportRequestReceived(rr)
   145  	}
   146  	resp, err := h.server.Do(ctx, rr)
   147  	if err != nil {
   148  		err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
   149  		writeKeyError(h.lg, w, err)
   150  		reportRequestFailed(rr, err)
   151  		return
   152  	}
   153  	switch {
   154  	case resp.Event != nil:
   155  		if err := writeKeyEvent(w, resp, noValueOnSuccess); err != nil {
   156  			// Should never be reached
   157  			h.lg.Warn("failed to write key event", zap.Error(err))
   158  		}
   159  		reportRequestCompleted(rr, startTime)
   160  	case resp.Watcher != nil:
   161  		ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
   162  		defer cancel()
   163  		handleKeyWatch(ctx, h.lg, w, resp, rr.Stream)
   164  	default:
   165  		writeKeyError(h.lg, w, errors.New("received response with no Event/Watcher"))
   166  	}
   167  }
   168  
   169  type machinesHandler struct {
   170  	cluster api.Cluster
   171  }
   172  
   173  func (h *machinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   174  	if !allowMethod(w, r.Method, "GET", "HEAD") {
   175  		return
   176  	}
   177  	endpoints := h.cluster.ClientURLs()
   178  	w.Write([]byte(strings.Join(endpoints, ", ")))
   179  }
   180  
   181  type membersHandler struct {
   182  	lg                    *zap.Logger
   183  	sec                   v2auth.Store
   184  	server                etcdserver.ServerV2
   185  	cluster               api.Cluster
   186  	timeout               time.Duration
   187  	clock                 clockwork.Clock
   188  	clientCertAuthEnabled bool
   189  }
   190  
   191  func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   192  	if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") {
   193  		return
   194  	}
   195  	if !hasWriteRootAccess(h.lg, h.sec, r, h.clientCertAuthEnabled) {
   196  		writeNoAuth(h.lg, w, r)
   197  		return
   198  	}
   199  	w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
   200  
   201  	ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
   202  	defer cancel()
   203  
   204  	switch r.Method {
   205  	case "GET":
   206  		switch trimPrefix(r.URL.Path, membersPrefix) {
   207  		case "":
   208  			mc := newMemberCollection(h.cluster.Members())
   209  			w.Header().Set("Content-Type", "application/json")
   210  			if err := json.NewEncoder(w).Encode(mc); err != nil {
   211  				h.lg.Warn("failed to encode members response", zap.Error(err))
   212  			}
   213  		case "leader":
   214  			id := h.server.Leader()
   215  			if id == 0 {
   216  				writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election"))
   217  				return
   218  			}
   219  			m := newMember(h.cluster.Member(id))
   220  			w.Header().Set("Content-Type", "application/json")
   221  			if err := json.NewEncoder(w).Encode(m); err != nil {
   222  				h.lg.Warn("failed to encode members response", zap.Error(err))
   223  			}
   224  		default:
   225  			writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusNotFound, "Not found"))
   226  		}
   227  
   228  	case "POST":
   229  		req := httptypes.MemberCreateRequest{}
   230  		if ok := unmarshalRequest(h.lg, r, &req, w); !ok {
   231  			return
   232  		}
   233  		now := h.clock.Now()
   234  		m := membership.NewMember("", req.PeerURLs, "", &now)
   235  		_, err := h.server.AddMember(ctx, *m)
   236  		switch {
   237  		case err == membership.ErrIDExists || err == membership.ErrPeerURLexists:
   238  			writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
   239  			return
   240  		case err != nil:
   241  			h.lg.Warn(
   242  				"failed to add a member",
   243  				zap.String("member-id", m.ID.String()),
   244  				zap.Error(err),
   245  			)
   246  			writeError(h.lg, w, r, err)
   247  			return
   248  		}
   249  		res := newMember(m)
   250  		w.Header().Set("Content-Type", "application/json")
   251  		w.WriteHeader(http.StatusCreated)
   252  		if err := json.NewEncoder(w).Encode(res); err != nil {
   253  			h.lg.Warn("failed to encode members response", zap.Error(err))
   254  		}
   255  
   256  	case "DELETE":
   257  		id, ok := getID(h.lg, r.URL.Path, w)
   258  		if !ok {
   259  			return
   260  		}
   261  		_, err := h.server.RemoveMember(ctx, uint64(id))
   262  		switch {
   263  		case err == membership.ErrIDRemoved:
   264  			writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id)))
   265  		case err == membership.ErrIDNotFound:
   266  			writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
   267  		case err != nil:
   268  			h.lg.Warn(
   269  				"failed to remove a member",
   270  				zap.String("member-id", id.String()),
   271  				zap.Error(err),
   272  			)
   273  			writeError(h.lg, w, r, err)
   274  		default:
   275  			w.WriteHeader(http.StatusNoContent)
   276  		}
   277  
   278  	case "PUT":
   279  		id, ok := getID(h.lg, r.URL.Path, w)
   280  		if !ok {
   281  			return
   282  		}
   283  		req := httptypes.MemberUpdateRequest{}
   284  		if ok := unmarshalRequest(h.lg, r, &req, w); !ok {
   285  			return
   286  		}
   287  		m := membership.Member{
   288  			ID:             id,
   289  			RaftAttributes: membership.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
   290  		}
   291  		_, err := h.server.UpdateMember(ctx, m)
   292  		switch {
   293  		case err == membership.ErrPeerURLexists:
   294  			writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
   295  		case err == membership.ErrIDNotFound:
   296  			writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
   297  		case err != nil:
   298  			h.lg.Warn(
   299  				"failed to update a member",
   300  				zap.String("member-id", m.ID.String()),
   301  				zap.Error(err),
   302  			)
   303  			writeError(h.lg, w, r, err)
   304  		default:
   305  			w.WriteHeader(http.StatusNoContent)
   306  		}
   307  	}
   308  }
   309  
   310  type statsHandler struct {
   311  	lg    *zap.Logger
   312  	stats stats.Stats
   313  }
   314  
   315  func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) {
   316  	if !allowMethod(w, r.Method, "GET") {
   317  		return
   318  	}
   319  	w.Header().Set("Content-Type", "application/json")
   320  	w.Write(h.stats.StoreStats())
   321  }
   322  
   323  func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) {
   324  	if !allowMethod(w, r.Method, "GET") {
   325  		return
   326  	}
   327  	w.Header().Set("Content-Type", "application/json")
   328  	w.Write(h.stats.SelfStats())
   329  }
   330  
   331  func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
   332  	if !allowMethod(w, r.Method, "GET") {
   333  		return
   334  	}
   335  	stats := h.stats.LeaderStats()
   336  	if stats == nil {
   337  		etcdhttp.WriteError(h.lg, w, r, httptypes.NewHTTPError(http.StatusForbidden, "not current leader"))
   338  		return
   339  	}
   340  	w.Header().Set("Content-Type", "application/json")
   341  	w.Write(stats)
   342  }
   343  
   344  // parseKeyRequest converts a received http.Request on keysPrefix to
   345  // a server Request, performing validation of supplied fields as appropriate.
   346  // If any validation fails, an empty Request and non-nil error is returned.
   347  func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, bool, error) {
   348  	var noValueOnSuccess bool
   349  	emptyReq := etcdserverpb.Request{}
   350  
   351  	err := r.ParseForm()
   352  	if err != nil {
   353  		return emptyReq, false, v2error.NewRequestError(
   354  			v2error.EcodeInvalidForm,
   355  			err.Error(),
   356  		)
   357  	}
   358  
   359  	if !strings.HasPrefix(r.URL.Path, keysPrefix) {
   360  		return emptyReq, false, v2error.NewRequestError(
   361  			v2error.EcodeInvalidForm,
   362  			"incorrect key prefix",
   363  		)
   364  	}
   365  	p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
   366  
   367  	var pIdx, wIdx uint64
   368  	if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
   369  		return emptyReq, false, v2error.NewRequestError(
   370  			v2error.EcodeIndexNaN,
   371  			`invalid value for "prevIndex"`,
   372  		)
   373  	}
   374  	if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
   375  		return emptyReq, false, v2error.NewRequestError(
   376  			v2error.EcodeIndexNaN,
   377  			`invalid value for "waitIndex"`,
   378  		)
   379  	}
   380  
   381  	var rec, sort, wait, dir, quorum, stream bool
   382  	if rec, err = getBool(r.Form, "recursive"); err != nil {
   383  		return emptyReq, false, v2error.NewRequestError(
   384  			v2error.EcodeInvalidField,
   385  			`invalid value for "recursive"`,
   386  		)
   387  	}
   388  	if sort, err = getBool(r.Form, "sorted"); err != nil {
   389  		return emptyReq, false, v2error.NewRequestError(
   390  			v2error.EcodeInvalidField,
   391  			`invalid value for "sorted"`,
   392  		)
   393  	}
   394  	if wait, err = getBool(r.Form, "wait"); err != nil {
   395  		return emptyReq, false, v2error.NewRequestError(
   396  			v2error.EcodeInvalidField,
   397  			`invalid value for "wait"`,
   398  		)
   399  	}
   400  	// TODO(jonboulle): define what parameters dir is/isn't compatible with?
   401  	if dir, err = getBool(r.Form, "dir"); err != nil {
   402  		return emptyReq, false, v2error.NewRequestError(
   403  			v2error.EcodeInvalidField,
   404  			`invalid value for "dir"`,
   405  		)
   406  	}
   407  	if quorum, err = getBool(r.Form, "quorum"); err != nil {
   408  		return emptyReq, false, v2error.NewRequestError(
   409  			v2error.EcodeInvalidField,
   410  			`invalid value for "quorum"`,
   411  		)
   412  	}
   413  	if stream, err = getBool(r.Form, "stream"); err != nil {
   414  		return emptyReq, false, v2error.NewRequestError(
   415  			v2error.EcodeInvalidField,
   416  			`invalid value for "stream"`,
   417  		)
   418  	}
   419  
   420  	if wait && r.Method != "GET" {
   421  		return emptyReq, false, v2error.NewRequestError(
   422  			v2error.EcodeInvalidField,
   423  			`"wait" can only be used with GET requests`,
   424  		)
   425  	}
   426  
   427  	pV := r.FormValue("prevValue")
   428  	if _, ok := r.Form["prevValue"]; ok && pV == "" {
   429  		return emptyReq, false, v2error.NewRequestError(
   430  			v2error.EcodePrevValueRequired,
   431  			`"prevValue" cannot be empty`,
   432  		)
   433  	}
   434  
   435  	if noValueOnSuccess, err = getBool(r.Form, "noValueOnSuccess"); err != nil {
   436  		return emptyReq, false, v2error.NewRequestError(
   437  			v2error.EcodeInvalidField,
   438  			`invalid value for "noValueOnSuccess"`,
   439  		)
   440  	}
   441  
   442  	// TTL is nullable, so leave it null if not specified
   443  	// or an empty string
   444  	var ttl *uint64
   445  	if len(r.FormValue("ttl")) > 0 {
   446  		i, err := getUint64(r.Form, "ttl")
   447  		if err != nil {
   448  			return emptyReq, false, v2error.NewRequestError(
   449  				v2error.EcodeTTLNaN,
   450  				`invalid value for "ttl"`,
   451  			)
   452  		}
   453  		ttl = &i
   454  	}
   455  
   456  	// prevExist is nullable, so leave it null if not specified
   457  	var pe *bool
   458  	if _, ok := r.Form["prevExist"]; ok {
   459  		bv, err := getBool(r.Form, "prevExist")
   460  		if err != nil {
   461  			return emptyReq, false, v2error.NewRequestError(
   462  				v2error.EcodeInvalidField,
   463  				"invalid value for prevExist",
   464  			)
   465  		}
   466  		pe = &bv
   467  	}
   468  
   469  	// refresh is nullable, so leave it null if not specified
   470  	var refresh *bool
   471  	if _, ok := r.Form["refresh"]; ok {
   472  		bv, err := getBool(r.Form, "refresh")
   473  		if err != nil {
   474  			return emptyReq, false, v2error.NewRequestError(
   475  				v2error.EcodeInvalidField,
   476  				"invalid value for refresh",
   477  			)
   478  		}
   479  		refresh = &bv
   480  		if refresh != nil && *refresh {
   481  			val := r.FormValue("value")
   482  			if _, ok := r.Form["value"]; ok && val != "" {
   483  				return emptyReq, false, v2error.NewRequestError(
   484  					v2error.EcodeRefreshValue,
   485  					`A value was provided on a refresh`,
   486  				)
   487  			}
   488  			if ttl == nil {
   489  				return emptyReq, false, v2error.NewRequestError(
   490  					v2error.EcodeRefreshTTLRequired,
   491  					`No TTL value set`,
   492  				)
   493  			}
   494  		}
   495  	}
   496  
   497  	rr := etcdserverpb.Request{
   498  		Method:    r.Method,
   499  		Path:      p,
   500  		Val:       r.FormValue("value"),
   501  		Dir:       dir,
   502  		PrevValue: pV,
   503  		PrevIndex: pIdx,
   504  		PrevExist: pe,
   505  		Wait:      wait,
   506  		Since:     wIdx,
   507  		Recursive: rec,
   508  		Sorted:    sort,
   509  		Quorum:    quorum,
   510  		Stream:    stream,
   511  	}
   512  
   513  	if pe != nil {
   514  		rr.PrevExist = pe
   515  	}
   516  
   517  	if refresh != nil {
   518  		rr.Refresh = refresh
   519  	}
   520  
   521  	// Null TTL is equivalent to unset Expiration
   522  	if ttl != nil {
   523  		expr := time.Duration(*ttl) * time.Second
   524  		rr.Expiration = clock.Now().Add(expr).UnixNano()
   525  	}
   526  
   527  	return rr, noValueOnSuccess, nil
   528  }
   529  
   530  // writeKeyEvent trims the prefix of key path in a single Event under
   531  // StoreKeysPrefix, serializes it and writes the resulting JSON to the given
   532  // ResponseWriter, along with the appropriate headers.
   533  func writeKeyEvent(w http.ResponseWriter, resp etcdserver.Response, noValueOnSuccess bool) error {
   534  	ev := resp.Event
   535  	if ev == nil {
   536  		return errors.New("cannot write empty Event")
   537  	}
   538  	w.Header().Set("Content-Type", "application/json")
   539  	w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
   540  	w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
   541  	w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
   542  
   543  	if ev.IsCreated() {
   544  		w.WriteHeader(http.StatusCreated)
   545  	}
   546  
   547  	ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
   548  	if noValueOnSuccess &&
   549  		(ev.Action == v2store.Set || ev.Action == v2store.CompareAndSwap ||
   550  			ev.Action == v2store.Create || ev.Action == v2store.Update) {
   551  		ev.Node = nil
   552  		ev.PrevNode = nil
   553  	}
   554  	return json.NewEncoder(w).Encode(ev)
   555  }
   556  
   557  func writeKeyNoAuth(w http.ResponseWriter) {
   558  	e := v2error.NewError(v2error.EcodeUnauthorized, "Insufficient credentials", 0)
   559  	e.WriteTo(w)
   560  }
   561  
   562  // writeKeyError logs and writes the given Error to the ResponseWriter.
   563  // If Error is not an etcdErr, the error will be converted to an etcd error.
   564  func writeKeyError(lg *zap.Logger, w http.ResponseWriter, err error) {
   565  	if err == nil {
   566  		return
   567  	}
   568  	switch e := err.(type) {
   569  	case *v2error.Error:
   570  		e.WriteTo(w)
   571  	default:
   572  		switch err {
   573  		case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost:
   574  			if lg != nil {
   575  				lg.Warn(
   576  					"v2 response error",
   577  					zap.String("internal-server-error", err.Error()),
   578  				)
   579  			}
   580  		default:
   581  			if lg != nil {
   582  				lg.Warn(
   583  					"unexpected v2 response error",
   584  					zap.String("internal-server-error", err.Error()),
   585  				)
   586  			}
   587  		}
   588  		ee := v2error.NewError(v2error.EcodeRaftInternal, err.Error(), 0)
   589  		ee.WriteTo(w)
   590  	}
   591  }
   592  
   593  func handleKeyWatch(ctx context.Context, lg *zap.Logger, w http.ResponseWriter, resp etcdserver.Response, stream bool) {
   594  	wa := resp.Watcher
   595  	defer wa.Remove()
   596  	ech := wa.EventChan()
   597  	var nch <-chan bool
   598  	if x, ok := w.(http.CloseNotifier); ok {
   599  		nch = x.CloseNotify()
   600  	}
   601  
   602  	w.Header().Set("Content-Type", "application/json")
   603  	w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
   604  	w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
   605  	w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
   606  	w.WriteHeader(http.StatusOK)
   607  
   608  	// Ensure headers are flushed early, in case of long polling
   609  	w.(http.Flusher).Flush()
   610  
   611  	for {
   612  		select {
   613  		case <-nch:
   614  			// Client closed connection. Nothing to do.
   615  			return
   616  		case <-ctx.Done():
   617  			// Timed out. net/http will close the connection for us, so nothing to do.
   618  			return
   619  		case ev, ok := <-ech:
   620  			if !ok {
   621  				// If the channel is closed this may be an indication of
   622  				// that notifications are much more than we are able to
   623  				// send to the client in time. Then we simply end streaming.
   624  				return
   625  			}
   626  			ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
   627  			if err := json.NewEncoder(w).Encode(ev); err != nil {
   628  				// Should never be reached
   629  				lg.Warn("failed to encode event", zap.Error(err))
   630  				return
   631  			}
   632  			if !stream {
   633  				return
   634  			}
   635  			w.(http.Flusher).Flush()
   636  		}
   637  	}
   638  }
   639  
   640  func trimEventPrefix(ev *v2store.Event, prefix string) *v2store.Event {
   641  	if ev == nil {
   642  		return nil
   643  	}
   644  	// Since the *Event may reference one in the store history
   645  	// history, we must copy it before modifying
   646  	e := ev.Clone()
   647  	trimNodeExternPrefix(e.Node, prefix)
   648  	trimNodeExternPrefix(e.PrevNode, prefix)
   649  	return e
   650  }
   651  
   652  func trimNodeExternPrefix(n *v2store.NodeExtern, prefix string) {
   653  	if n == nil {
   654  		return
   655  	}
   656  	n.Key = strings.TrimPrefix(n.Key, prefix)
   657  	for _, nn := range n.Nodes {
   658  		trimNodeExternPrefix(nn, prefix)
   659  	}
   660  }
   661  
   662  func trimErrorPrefix(err error, prefix string) error {
   663  	if e, ok := err.(*v2error.Error); ok {
   664  		e.Cause = strings.TrimPrefix(e.Cause, prefix)
   665  	}
   666  	return err
   667  }
   668  
   669  func unmarshalRequest(lg *zap.Logger, r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool {
   670  	ctype := r.Header.Get("Content-Type")
   671  	semicolonPosition := strings.Index(ctype, ";")
   672  	if semicolonPosition != -1 {
   673  		ctype = strings.TrimSpace(strings.ToLower(ctype[0:semicolonPosition]))
   674  	}
   675  	if ctype != "application/json" {
   676  		writeError(lg, w, r, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
   677  		return false
   678  	}
   679  	b, err := ioutil.ReadAll(r.Body)
   680  	if err != nil {
   681  		writeError(lg, w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
   682  		return false
   683  	}
   684  	if err := req.UnmarshalJSON(b); err != nil {
   685  		writeError(lg, w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
   686  		return false
   687  	}
   688  	return true
   689  }
   690  
   691  func getID(lg *zap.Logger, p string, w http.ResponseWriter) (types.ID, bool) {
   692  	idStr := trimPrefix(p, membersPrefix)
   693  	if idStr == "" {
   694  		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
   695  		return 0, false
   696  	}
   697  	id, err := types.IDFromString(idStr)
   698  	if err != nil {
   699  		writeError(lg, w, nil, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
   700  		return 0, false
   701  	}
   702  	return id, true
   703  }
   704  
   705  // getUint64 extracts a uint64 by the given key from a Form. If the key does
   706  // not exist in the form, 0 is returned. If the key exists but the value is
   707  // badly formed, an error is returned. If multiple values are present only the
   708  // first is considered.
   709  func getUint64(form url.Values, key string) (i uint64, err error) {
   710  	if vals, ok := form[key]; ok {
   711  		i, err = strconv.ParseUint(vals[0], 10, 64)
   712  	}
   713  	return
   714  }
   715  
   716  // getBool extracts a bool by the given key from a Form. If the key does not
   717  // exist in the form, false is returned. If the key exists but the value is
   718  // badly formed, an error is returned. If multiple values are present only the
   719  // first is considered.
   720  func getBool(form url.Values, key string) (b bool, err error) {
   721  	if vals, ok := form[key]; ok {
   722  		b, err = strconv.ParseBool(vals[0])
   723  	}
   724  	return
   725  }
   726  
   727  // trimPrefix removes a given prefix and any slash following the prefix
   728  // e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == ""
   729  func trimPrefix(p, prefix string) (s string) {
   730  	s = strings.TrimPrefix(p, prefix)
   731  	s = strings.TrimPrefix(s, "/")
   732  	return
   733  }
   734  
   735  func newMemberCollection(ms []*membership.Member) *httptypes.MemberCollection {
   736  	c := httptypes.MemberCollection(make([]httptypes.Member, len(ms)))
   737  
   738  	for i, m := range ms {
   739  		c[i] = newMember(m)
   740  	}
   741  
   742  	return &c
   743  }
   744  
   745  func newMember(m *membership.Member) httptypes.Member {
   746  	tm := httptypes.Member{
   747  		ID:         m.ID.String(),
   748  		Name:       m.Name,
   749  		PeerURLs:   make([]string, len(m.PeerURLs)),
   750  		ClientURLs: make([]string, len(m.ClientURLs)),
   751  	}
   752  
   753  	copy(tm.PeerURLs, m.PeerURLs)
   754  	copy(tm.ClientURLs, m.ClientURLs)
   755  
   756  	return tm
   757  }
   758  

View as plain text