...

Source file src/go.etcd.io/etcd/server/v3/embed/serve.go

Documentation: go.etcd.io/etcd/server/v3/embed

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package embed
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"io/ioutil"
    21  	defaultLog "log"
    22  	"net"
    23  	"net/http"
    24  	"strings"
    25  
    26  	etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"
    27  	"go.etcd.io/etcd/client/pkg/v3/transport"
    28  	"go.etcd.io/etcd/pkg/v3/debugutil"
    29  	"go.etcd.io/etcd/pkg/v3/httputil"
    30  	"go.etcd.io/etcd/server/v3/config"
    31  	"go.etcd.io/etcd/server/v3/etcdserver"
    32  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
    33  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3election"
    34  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
    35  	v3electiongw "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb/gw"
    36  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock"
    37  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
    38  	v3lockgw "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb/gw"
    39  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
    40  
    41  	gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
    42  	"github.com/soheilhy/cmux"
    43  	"github.com/tmc/grpc-websocket-proxy/wsproxy"
    44  	"go.uber.org/zap"
    45  	"golang.org/x/net/http2"
    46  	"golang.org/x/net/trace"
    47  	"google.golang.org/grpc"
    48  )
    49  
    50  type serveCtx struct {
    51  	lg *zap.Logger
    52  	l  net.Listener
    53  
    54  	scheme   string
    55  	addr     string
    56  	network  string
    57  	secure   bool
    58  	insecure bool
    59  	httpOnly bool
    60  
    61  	ctx    context.Context
    62  	cancel context.CancelFunc
    63  
    64  	userHandlers    map[string]http.Handler
    65  	serviceRegister func(*grpc.Server)
    66  	serversC        chan *servers
    67  }
    68  
    69  type servers struct {
    70  	secure bool
    71  	grpc   *grpc.Server
    72  	http   *http.Server
    73  }
    74  
    75  func newServeCtx(lg *zap.Logger) *serveCtx {
    76  	ctx, cancel := context.WithCancel(context.Background())
    77  	if lg == nil {
    78  		lg = zap.NewNop()
    79  	}
    80  	return &serveCtx{
    81  		lg:           lg,
    82  		ctx:          ctx,
    83  		cancel:       cancel,
    84  		userHandlers: make(map[string]http.Handler),
    85  		serversC:     make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
    86  	}
    87  }
    88  
    89  // serve accepts incoming connections on the listener l,
    90  // creating a new service goroutine for each. The service goroutines
    91  // read requests and then call handler to reply to them.
    92  func (sctx *serveCtx) serve(
    93  	s *etcdserver.EtcdServer,
    94  	tlsinfo *transport.TLSInfo,
    95  	handler http.Handler,
    96  	errHandler func(error),
    97  	grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error),
    98  	splitHttp bool,
    99  	gopts ...grpc.ServerOption) (err error) {
   100  	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
   101  	<-s.ReadyNotify()
   102  
   103  	sctx.lg.Info("ready to serve client requests")
   104  
   105  	m := cmux.New(sctx.l)
   106  	var server func() error
   107  	onlyGRPC := splitHttp && !sctx.httpOnly
   108  	onlyHttp := splitHttp && sctx.httpOnly
   109  	grpcEnabled := !onlyHttp
   110  	httpEnabled := !onlyGRPC
   111  
   112  	v3c := v3client.New(s)
   113  	servElection := v3election.NewElectionServer(v3c)
   114  	servLock := v3lock.NewLockServer(v3c)
   115  
   116  	// Make sure serversC is closed even if we prematurely exit the function.
   117  	defer close(sctx.serversC)
   118  	var gwmux *gw.ServeMux
   119  	if s.Cfg.EnableGRPCGateway {
   120  		// GRPC gateway connects to grpc server via connection provided by grpc dial.
   121  		gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends)
   122  		if err != nil {
   123  			sctx.lg.Error("registerGateway failed", zap.Error(err))
   124  			return err
   125  		}
   126  	}
   127  	var traffic string
   128  	switch {
   129  	case onlyGRPC:
   130  		traffic = "grpc"
   131  	case onlyHttp:
   132  		traffic = "http"
   133  	default:
   134  		traffic = "grpc+http"
   135  	}
   136  
   137  	if sctx.insecure {
   138  		var gs *grpc.Server
   139  		var srv *http.Server
   140  		if httpEnabled {
   141  			httpmux := sctx.createMux(gwmux, handler)
   142  			srv = &http.Server{
   143  				Handler:  createAccessController(sctx.lg, s, httpmux),
   144  				ErrorLog: logger, // do not log user error
   145  			}
   146  			if err := configureHttpServer(srv, s.Cfg); err != nil {
   147  				sctx.lg.Error("Configure http server failed", zap.Error(err))
   148  				return err
   149  			}
   150  		}
   151  		if grpcEnabled {
   152  			gs = v3rpc.Server(s, nil, nil, gopts...)
   153  			v3electionpb.RegisterElectionServer(gs, servElection)
   154  			v3lockpb.RegisterLockServer(gs, servLock)
   155  			if sctx.serviceRegister != nil {
   156  				sctx.serviceRegister(gs)
   157  			}
   158  			defer func(gs *grpc.Server) {
   159  				if err != nil {
   160  					sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
   161  					gs.Stop()
   162  					sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
   163  				}
   164  			}(gs)
   165  		}
   166  		if onlyGRPC {
   167  			server = func() error {
   168  				return gs.Serve(sctx.l)
   169  			}
   170  		} else {
   171  			server = m.Serve
   172  
   173  			httpl := m.Match(cmux.HTTP1())
   174  			go func(srvhttp *http.Server, tlsLis net.Listener) {
   175  				errHandler(srvhttp.Serve(tlsLis))
   176  			}(srv, httpl)
   177  
   178  			if grpcEnabled {
   179  				grpcl := m.Match(cmux.HTTP2())
   180  				go func(gs *grpc.Server, l net.Listener) {
   181  					errHandler(gs.Serve(l))
   182  				}(gs, grpcl)
   183  			}
   184  		}
   185  
   186  		sctx.serversC <- &servers{grpc: gs, http: srv}
   187  		sctx.lg.Info(
   188  			"serving client traffic insecurely; this is strongly discouraged!",
   189  			zap.String("traffic", traffic),
   190  			zap.String("address", sctx.l.Addr().String()),
   191  		)
   192  	}
   193  
   194  	if sctx.secure {
   195  		var gs *grpc.Server
   196  		var srv *http.Server
   197  
   198  		tlscfg, tlsErr := tlsinfo.ServerConfig()
   199  		if tlsErr != nil {
   200  			return tlsErr
   201  		}
   202  
   203  		if grpcEnabled {
   204  			gs = v3rpc.Server(s, tlscfg, nil, gopts...)
   205  			v3electionpb.RegisterElectionServer(gs, servElection)
   206  			v3lockpb.RegisterLockServer(gs, servLock)
   207  			if sctx.serviceRegister != nil {
   208  				sctx.serviceRegister(gs)
   209  			}
   210  			defer func(gs *grpc.Server) {
   211  				if err != nil {
   212  					sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
   213  					gs.Stop()
   214  					sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
   215  				}
   216  			}(gs)
   217  		}
   218  		if httpEnabled {
   219  			if grpcEnabled {
   220  				handler = grpcHandlerFunc(gs, handler)
   221  			}
   222  			httpmux := sctx.createMux(gwmux, handler)
   223  
   224  			srv = &http.Server{
   225  				Handler:   createAccessController(sctx.lg, s, httpmux),
   226  				TLSConfig: tlscfg,
   227  				ErrorLog:  logger, // do not log user error
   228  			}
   229  			if err := configureHttpServer(srv, s.Cfg); err != nil {
   230  				sctx.lg.Error("Configure https server failed", zap.Error(err))
   231  				return err
   232  			}
   233  		}
   234  
   235  		if onlyGRPC {
   236  			server = func() error { return gs.Serve(sctx.l) }
   237  		} else {
   238  			server = m.Serve
   239  
   240  			tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
   241  			if err != nil {
   242  				return err
   243  			}
   244  			go func(srvhttp *http.Server, tlsl net.Listener) {
   245  				errHandler(srvhttp.Serve(tlsl))
   246  			}(srv, tlsl)
   247  		}
   248  
   249  		sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
   250  		sctx.lg.Info(
   251  			"serving client traffic securely",
   252  			zap.String("traffic", traffic),
   253  			zap.String("address", sctx.l.Addr().String()),
   254  		)
   255  	}
   256  
   257  	return server()
   258  }
   259  
   260  func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
   261  	// todo (ahrtr): should we support configuring other parameters in the future as well?
   262  	return http2.ConfigureServer(srv, &http2.Server{
   263  		MaxConcurrentStreams: cfg.MaxConcurrentStreams,
   264  	})
   265  }
   266  
   267  // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
   268  // connections or otherHandler otherwise. Given in gRPC docs.
   269  func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
   270  	if otherHandler == nil {
   271  		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   272  			grpcServer.ServeHTTP(w, r)
   273  		})
   274  	}
   275  	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   276  		if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
   277  			grpcServer.ServeHTTP(w, r)
   278  		} else {
   279  			otherHandler.ServeHTTP(w, r)
   280  		}
   281  	})
   282  }
   283  
   284  type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
   285  
   286  func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.ClientConn, error)) (*gw.ServeMux, error) {
   287  	ctx := sctx.ctx
   288  
   289  	conn, err := dial(ctx)
   290  	if err != nil {
   291  		return nil, err
   292  	}
   293  	gwmux := gw.NewServeMux()
   294  
   295  	handlers := []registerHandlerFunc{
   296  		etcdservergw.RegisterKVHandler,
   297  		etcdservergw.RegisterWatchHandler,
   298  		etcdservergw.RegisterLeaseHandler,
   299  		etcdservergw.RegisterClusterHandler,
   300  		etcdservergw.RegisterMaintenanceHandler,
   301  		etcdservergw.RegisterAuthHandler,
   302  		v3lockgw.RegisterLockHandler,
   303  		v3electiongw.RegisterElectionHandler,
   304  	}
   305  	for _, h := range handlers {
   306  		if err := h(ctx, gwmux, conn); err != nil {
   307  			return nil, err
   308  		}
   309  	}
   310  	go func() {
   311  		<-ctx.Done()
   312  		if cerr := conn.Close(); cerr != nil {
   313  			sctx.lg.Warn(
   314  				"failed to close connection",
   315  				zap.String("address", sctx.l.Addr().String()),
   316  				zap.Error(cerr),
   317  			)
   318  		}
   319  	}()
   320  
   321  	return gwmux, nil
   322  }
   323  
   324  type wsProxyZapLogger struct {
   325  	*zap.Logger
   326  }
   327  
   328  func (w wsProxyZapLogger) Warnln(i ...interface{}) {
   329  	w.Warn(fmt.Sprint(i...))
   330  }
   331  
   332  func (w wsProxyZapLogger) Debugln(i ...interface{}) {
   333  	w.Debug(fmt.Sprint(i...))
   334  }
   335  
   336  func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
   337  	httpmux := http.NewServeMux()
   338  	for path, h := range sctx.userHandlers {
   339  		httpmux.Handle(path, h)
   340  	}
   341  
   342  	if gwmux != nil {
   343  		httpmux.Handle(
   344  			"/v3/",
   345  			wsproxy.WebsocketProxy(
   346  				gwmux,
   347  				wsproxy.WithRequestMutator(
   348  					// Default to the POST method for streams
   349  					func(_ *http.Request, outgoing *http.Request) *http.Request {
   350  						outgoing.Method = "POST"
   351  						return outgoing
   352  					},
   353  				),
   354  				wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
   355  				wsproxy.WithLogger(wsProxyZapLogger{sctx.lg}),
   356  			),
   357  		)
   358  	}
   359  	if handler != nil {
   360  		httpmux.Handle("/", handler)
   361  	}
   362  	return httpmux
   363  }
   364  
   365  // createAccessController wraps HTTP multiplexer:
   366  // - mutate gRPC gateway request paths
   367  // - check hostname whitelist
   368  // client HTTP requests goes here first
   369  func createAccessController(lg *zap.Logger, s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler {
   370  	if lg == nil {
   371  		lg = zap.NewNop()
   372  	}
   373  	return &accessController{lg: lg, s: s, mux: mux}
   374  }
   375  
   376  type accessController struct {
   377  	lg  *zap.Logger
   378  	s   *etcdserver.EtcdServer
   379  	mux *http.ServeMux
   380  }
   381  
   382  func (ac *accessController) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
   383  	if req == nil {
   384  		http.Error(rw, "Request is nil", http.StatusBadRequest)
   385  		return
   386  	}
   387  	// redirect for backward compatibilities
   388  	if req.URL != nil && strings.HasPrefix(req.URL.Path, "/v3beta/") {
   389  		req.URL.Path = strings.Replace(req.URL.Path, "/v3beta/", "/v3/", 1)
   390  	}
   391  
   392  	if req.TLS == nil { // check origin if client connection is not secure
   393  		host := httputil.GetHostname(req)
   394  		if !ac.s.AccessController.IsHostWhitelisted(host) {
   395  			ac.lg.Warn(
   396  				"rejecting HTTP request to prevent DNS rebinding attacks",
   397  				zap.String("host", host),
   398  			)
   399  			http.Error(rw, errCVE20185702(host), http.StatusMisdirectedRequest)
   400  			return
   401  		}
   402  	} else if ac.s.Cfg.ClientCertAuthEnabled && ac.s.Cfg.EnableGRPCGateway &&
   403  		ac.s.AuthStore().IsAuthEnabled() && strings.HasPrefix(req.URL.Path, "/v3/") {
   404  		for _, chains := range req.TLS.VerifiedChains {
   405  			if len(chains) < 1 {
   406  				continue
   407  			}
   408  			if len(chains[0].Subject.CommonName) != 0 {
   409  				http.Error(rw, "CommonName of client sending a request against gateway will be ignored and not used as expected", http.StatusBadRequest)
   410  				return
   411  			}
   412  		}
   413  	}
   414  
   415  	// Write CORS header.
   416  	if ac.s.AccessController.OriginAllowed("*") {
   417  		addCORSHeader(rw, "*")
   418  	} else if origin := req.Header.Get("Origin"); ac.s.OriginAllowed(origin) {
   419  		addCORSHeader(rw, origin)
   420  	}
   421  
   422  	if req.Method == "OPTIONS" {
   423  		rw.WriteHeader(http.StatusOK)
   424  		return
   425  	}
   426  
   427  	ac.mux.ServeHTTP(rw, req)
   428  }
   429  
   430  // addCORSHeader adds the correct cors headers given an origin
   431  func addCORSHeader(w http.ResponseWriter, origin string) {
   432  	w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
   433  	w.Header().Add("Access-Control-Allow-Origin", origin)
   434  	w.Header().Add("Access-Control-Allow-Headers", "accept, content-type, authorization")
   435  }
   436  
   437  // https://github.com/transmission/transmission/pull/468
   438  func errCVE20185702(host string) string {
   439  	return fmt.Sprintf(`
   440  etcd received your request, but the Host header was unrecognized.
   441  
   442  To fix this, choose one of the following options:
   443  - Enable TLS, then any HTTPS request will be allowed.
   444  - Add the hostname you want to use to the whitelist in settings.
   445    - e.g. etcd --host-whitelist %q
   446  
   447  This requirement has been added to help prevent "DNS Rebinding" attacks (CVE-2018-5702).
   448  `, host)
   449  }
   450  
   451  // WrapCORS wraps existing handler with CORS.
   452  // TODO: deprecate this after v2 proxy deprecate
   453  func WrapCORS(cors map[string]struct{}, h http.Handler) http.Handler {
   454  	return &corsHandler{
   455  		ac: &etcdserver.AccessController{CORS: cors},
   456  		h:  h,
   457  	}
   458  }
   459  
   460  type corsHandler struct {
   461  	ac *etcdserver.AccessController
   462  	h  http.Handler
   463  }
   464  
   465  func (ch *corsHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
   466  	if ch.ac.OriginAllowed("*") {
   467  		addCORSHeader(rw, "*")
   468  	} else if origin := req.Header.Get("Origin"); ch.ac.OriginAllowed(origin) {
   469  		addCORSHeader(rw, origin)
   470  	}
   471  
   472  	if req.Method == "OPTIONS" {
   473  		rw.WriteHeader(http.StatusOK)
   474  		return
   475  	}
   476  
   477  	ch.h.ServeHTTP(rw, req)
   478  }
   479  
   480  func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
   481  	if sctx.userHandlers[s] != nil {
   482  		sctx.lg.Warn("path is already registered by user handler", zap.String("path", s))
   483  		return
   484  	}
   485  	sctx.userHandlers[s] = h
   486  }
   487  
   488  func (sctx *serveCtx) registerPprof() {
   489  	for p, h := range debugutil.PProfHandlers() {
   490  		sctx.registerUserHandler(p, h)
   491  	}
   492  }
   493  
   494  func (sctx *serveCtx) registerTrace() {
   495  	reqf := func(w http.ResponseWriter, r *http.Request) { trace.Render(w, r, true) }
   496  	sctx.registerUserHandler("/debug/requests", http.HandlerFunc(reqf))
   497  	evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
   498  	sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
   499  }
   500  

View as plain text