1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package embed
16
17 import (
18 "context"
19 "fmt"
20 "io/ioutil"
21 defaultLog "log"
22 "net"
23 "net/http"
24 "strings"
25
26 etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"
27 "go.etcd.io/etcd/client/pkg/v3/transport"
28 "go.etcd.io/etcd/pkg/v3/debugutil"
29 "go.etcd.io/etcd/pkg/v3/httputil"
30 "go.etcd.io/etcd/server/v3/config"
31 "go.etcd.io/etcd/server/v3/etcdserver"
32 "go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
33 "go.etcd.io/etcd/server/v3/etcdserver/api/v3election"
34 "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
35 v3electiongw "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb/gw"
36 "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock"
37 "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
38 v3lockgw "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb/gw"
39 "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
40
41 gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
42 "github.com/soheilhy/cmux"
43 "github.com/tmc/grpc-websocket-proxy/wsproxy"
44 "go.uber.org/zap"
45 "golang.org/x/net/http2"
46 "golang.org/x/net/trace"
47 "google.golang.org/grpc"
48 )
49
50 type serveCtx struct {
51 lg *zap.Logger
52 l net.Listener
53
54 scheme string
55 addr string
56 network string
57 secure bool
58 insecure bool
59 httpOnly bool
60
61 ctx context.Context
62 cancel context.CancelFunc
63
64 userHandlers map[string]http.Handler
65 serviceRegister func(*grpc.Server)
66 serversC chan *servers
67 }
68
69 type servers struct {
70 secure bool
71 grpc *grpc.Server
72 http *http.Server
73 }
74
75 func newServeCtx(lg *zap.Logger) *serveCtx {
76 ctx, cancel := context.WithCancel(context.Background())
77 if lg == nil {
78 lg = zap.NewNop()
79 }
80 return &serveCtx{
81 lg: lg,
82 ctx: ctx,
83 cancel: cancel,
84 userHandlers: make(map[string]http.Handler),
85 serversC: make(chan *servers, 2),
86 }
87 }
88
89
90
91
92 func (sctx *serveCtx) serve(
93 s *etcdserver.EtcdServer,
94 tlsinfo *transport.TLSInfo,
95 handler http.Handler,
96 errHandler func(error),
97 grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error),
98 splitHttp bool,
99 gopts ...grpc.ServerOption) (err error) {
100 logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
101 <-s.ReadyNotify()
102
103 sctx.lg.Info("ready to serve client requests")
104
105 m := cmux.New(sctx.l)
106 var server func() error
107 onlyGRPC := splitHttp && !sctx.httpOnly
108 onlyHttp := splitHttp && sctx.httpOnly
109 grpcEnabled := !onlyHttp
110 httpEnabled := !onlyGRPC
111
112 v3c := v3client.New(s)
113 servElection := v3election.NewElectionServer(v3c)
114 servLock := v3lock.NewLockServer(v3c)
115
116
117 defer close(sctx.serversC)
118 var gwmux *gw.ServeMux
119 if s.Cfg.EnableGRPCGateway {
120
121 gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends)
122 if err != nil {
123 sctx.lg.Error("registerGateway failed", zap.Error(err))
124 return err
125 }
126 }
127 var traffic string
128 switch {
129 case onlyGRPC:
130 traffic = "grpc"
131 case onlyHttp:
132 traffic = "http"
133 default:
134 traffic = "grpc+http"
135 }
136
137 if sctx.insecure {
138 var gs *grpc.Server
139 var srv *http.Server
140 if httpEnabled {
141 httpmux := sctx.createMux(gwmux, handler)
142 srv = &http.Server{
143 Handler: createAccessController(sctx.lg, s, httpmux),
144 ErrorLog: logger,
145 }
146 if err := configureHttpServer(srv, s.Cfg); err != nil {
147 sctx.lg.Error("Configure http server failed", zap.Error(err))
148 return err
149 }
150 }
151 if grpcEnabled {
152 gs = v3rpc.Server(s, nil, nil, gopts...)
153 v3electionpb.RegisterElectionServer(gs, servElection)
154 v3lockpb.RegisterLockServer(gs, servLock)
155 if sctx.serviceRegister != nil {
156 sctx.serviceRegister(gs)
157 }
158 defer func(gs *grpc.Server) {
159 if err != nil {
160 sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
161 gs.Stop()
162 sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
163 }
164 }(gs)
165 }
166 if onlyGRPC {
167 server = func() error {
168 return gs.Serve(sctx.l)
169 }
170 } else {
171 server = m.Serve
172
173 httpl := m.Match(cmux.HTTP1())
174 go func(srvhttp *http.Server, tlsLis net.Listener) {
175 errHandler(srvhttp.Serve(tlsLis))
176 }(srv, httpl)
177
178 if grpcEnabled {
179 grpcl := m.Match(cmux.HTTP2())
180 go func(gs *grpc.Server, l net.Listener) {
181 errHandler(gs.Serve(l))
182 }(gs, grpcl)
183 }
184 }
185
186 sctx.serversC <- &servers{grpc: gs, http: srv}
187 sctx.lg.Info(
188 "serving client traffic insecurely; this is strongly discouraged!",
189 zap.String("traffic", traffic),
190 zap.String("address", sctx.l.Addr().String()),
191 )
192 }
193
194 if sctx.secure {
195 var gs *grpc.Server
196 var srv *http.Server
197
198 tlscfg, tlsErr := tlsinfo.ServerConfig()
199 if tlsErr != nil {
200 return tlsErr
201 }
202
203 if grpcEnabled {
204 gs = v3rpc.Server(s, tlscfg, nil, gopts...)
205 v3electionpb.RegisterElectionServer(gs, servElection)
206 v3lockpb.RegisterLockServer(gs, servLock)
207 if sctx.serviceRegister != nil {
208 sctx.serviceRegister(gs)
209 }
210 defer func(gs *grpc.Server) {
211 if err != nil {
212 sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
213 gs.Stop()
214 sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
215 }
216 }(gs)
217 }
218 if httpEnabled {
219 if grpcEnabled {
220 handler = grpcHandlerFunc(gs, handler)
221 }
222 httpmux := sctx.createMux(gwmux, handler)
223
224 srv = &http.Server{
225 Handler: createAccessController(sctx.lg, s, httpmux),
226 TLSConfig: tlscfg,
227 ErrorLog: logger,
228 }
229 if err := configureHttpServer(srv, s.Cfg); err != nil {
230 sctx.lg.Error("Configure https server failed", zap.Error(err))
231 return err
232 }
233 }
234
235 if onlyGRPC {
236 server = func() error { return gs.Serve(sctx.l) }
237 } else {
238 server = m.Serve
239
240 tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
241 if err != nil {
242 return err
243 }
244 go func(srvhttp *http.Server, tlsl net.Listener) {
245 errHandler(srvhttp.Serve(tlsl))
246 }(srv, tlsl)
247 }
248
249 sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
250 sctx.lg.Info(
251 "serving client traffic securely",
252 zap.String("traffic", traffic),
253 zap.String("address", sctx.l.Addr().String()),
254 )
255 }
256
257 return server()
258 }
259
260 func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error {
261
262 return http2.ConfigureServer(srv, &http2.Server{
263 MaxConcurrentStreams: cfg.MaxConcurrentStreams,
264 })
265 }
266
267
268
269 func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
270 if otherHandler == nil {
271 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
272 grpcServer.ServeHTTP(w, r)
273 })
274 }
275 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
276 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
277 grpcServer.ServeHTTP(w, r)
278 } else {
279 otherHandler.ServeHTTP(w, r)
280 }
281 })
282 }
283
284 type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
285
286 func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.ClientConn, error)) (*gw.ServeMux, error) {
287 ctx := sctx.ctx
288
289 conn, err := dial(ctx)
290 if err != nil {
291 return nil, err
292 }
293 gwmux := gw.NewServeMux()
294
295 handlers := []registerHandlerFunc{
296 etcdservergw.RegisterKVHandler,
297 etcdservergw.RegisterWatchHandler,
298 etcdservergw.RegisterLeaseHandler,
299 etcdservergw.RegisterClusterHandler,
300 etcdservergw.RegisterMaintenanceHandler,
301 etcdservergw.RegisterAuthHandler,
302 v3lockgw.RegisterLockHandler,
303 v3electiongw.RegisterElectionHandler,
304 }
305 for _, h := range handlers {
306 if err := h(ctx, gwmux, conn); err != nil {
307 return nil, err
308 }
309 }
310 go func() {
311 <-ctx.Done()
312 if cerr := conn.Close(); cerr != nil {
313 sctx.lg.Warn(
314 "failed to close connection",
315 zap.String("address", sctx.l.Addr().String()),
316 zap.Error(cerr),
317 )
318 }
319 }()
320
321 return gwmux, nil
322 }
323
324 type wsProxyZapLogger struct {
325 *zap.Logger
326 }
327
328 func (w wsProxyZapLogger) Warnln(i ...interface{}) {
329 w.Warn(fmt.Sprint(i...))
330 }
331
332 func (w wsProxyZapLogger) Debugln(i ...interface{}) {
333 w.Debug(fmt.Sprint(i...))
334 }
335
336 func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
337 httpmux := http.NewServeMux()
338 for path, h := range sctx.userHandlers {
339 httpmux.Handle(path, h)
340 }
341
342 if gwmux != nil {
343 httpmux.Handle(
344 "/v3/",
345 wsproxy.WebsocketProxy(
346 gwmux,
347 wsproxy.WithRequestMutator(
348
349 func(_ *http.Request, outgoing *http.Request) *http.Request {
350 outgoing.Method = "POST"
351 return outgoing
352 },
353 ),
354 wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
355 wsproxy.WithLogger(wsProxyZapLogger{sctx.lg}),
356 ),
357 )
358 }
359 if handler != nil {
360 httpmux.Handle("/", handler)
361 }
362 return httpmux
363 }
364
365
366
367
368
369 func createAccessController(lg *zap.Logger, s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler {
370 if lg == nil {
371 lg = zap.NewNop()
372 }
373 return &accessController{lg: lg, s: s, mux: mux}
374 }
375
376 type accessController struct {
377 lg *zap.Logger
378 s *etcdserver.EtcdServer
379 mux *http.ServeMux
380 }
381
382 func (ac *accessController) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
383 if req == nil {
384 http.Error(rw, "Request is nil", http.StatusBadRequest)
385 return
386 }
387
388 if req.URL != nil && strings.HasPrefix(req.URL.Path, "/v3beta/") {
389 req.URL.Path = strings.Replace(req.URL.Path, "/v3beta/", "/v3/", 1)
390 }
391
392 if req.TLS == nil {
393 host := httputil.GetHostname(req)
394 if !ac.s.AccessController.IsHostWhitelisted(host) {
395 ac.lg.Warn(
396 "rejecting HTTP request to prevent DNS rebinding attacks",
397 zap.String("host", host),
398 )
399 http.Error(rw, errCVE20185702(host), http.StatusMisdirectedRequest)
400 return
401 }
402 } else if ac.s.Cfg.ClientCertAuthEnabled && ac.s.Cfg.EnableGRPCGateway &&
403 ac.s.AuthStore().IsAuthEnabled() && strings.HasPrefix(req.URL.Path, "/v3/") {
404 for _, chains := range req.TLS.VerifiedChains {
405 if len(chains) < 1 {
406 continue
407 }
408 if len(chains[0].Subject.CommonName) != 0 {
409 http.Error(rw, "CommonName of client sending a request against gateway will be ignored and not used as expected", http.StatusBadRequest)
410 return
411 }
412 }
413 }
414
415
416 if ac.s.AccessController.OriginAllowed("*") {
417 addCORSHeader(rw, "*")
418 } else if origin := req.Header.Get("Origin"); ac.s.OriginAllowed(origin) {
419 addCORSHeader(rw, origin)
420 }
421
422 if req.Method == "OPTIONS" {
423 rw.WriteHeader(http.StatusOK)
424 return
425 }
426
427 ac.mux.ServeHTTP(rw, req)
428 }
429
430
431 func addCORSHeader(w http.ResponseWriter, origin string) {
432 w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
433 w.Header().Add("Access-Control-Allow-Origin", origin)
434 w.Header().Add("Access-Control-Allow-Headers", "accept, content-type, authorization")
435 }
436
437
438 func errCVE20185702(host string) string {
439 return fmt.Sprintf(`
440 etcd received your request, but the Host header was unrecognized.
441
442 To fix this, choose one of the following options:
443 - Enable TLS, then any HTTPS request will be allowed.
444 - Add the hostname you want to use to the whitelist in settings.
445 - e.g. etcd --host-whitelist %q
446
447 This requirement has been added to help prevent "DNS Rebinding" attacks (CVE-2018-5702).
448 `, host)
449 }
450
451
452
453 func WrapCORS(cors map[string]struct{}, h http.Handler) http.Handler {
454 return &corsHandler{
455 ac: &etcdserver.AccessController{CORS: cors},
456 h: h,
457 }
458 }
459
460 type corsHandler struct {
461 ac *etcdserver.AccessController
462 h http.Handler
463 }
464
465 func (ch *corsHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
466 if ch.ac.OriginAllowed("*") {
467 addCORSHeader(rw, "*")
468 } else if origin := req.Header.Get("Origin"); ch.ac.OriginAllowed(origin) {
469 addCORSHeader(rw, origin)
470 }
471
472 if req.Method == "OPTIONS" {
473 rw.WriteHeader(http.StatusOK)
474 return
475 }
476
477 ch.h.ServeHTTP(rw, req)
478 }
479
480 func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
481 if sctx.userHandlers[s] != nil {
482 sctx.lg.Warn("path is already registered by user handler", zap.String("path", s))
483 return
484 }
485 sctx.userHandlers[s] = h
486 }
487
488 func (sctx *serveCtx) registerPprof() {
489 for p, h := range debugutil.PProfHandlers() {
490 sctx.registerUserHandler(p, h)
491 }
492 }
493
494 func (sctx *serveCtx) registerTrace() {
495 reqf := func(w http.ResponseWriter, r *http.Request) { trace.Render(w, r, true) }
496 sctx.registerUserHandler("/debug/requests", http.HandlerFunc(reqf))
497 evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
498 sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
499 }
500
View as plain text