1
16
17 package server
18
19 import (
20 "context"
21 "crypto/tls"
22 "fmt"
23 "io"
24 "net"
25 "net/http"
26 "net/http/pprof"
27 "net/url"
28 "os"
29 "reflect"
30 goruntime "runtime"
31 "strconv"
32 "strings"
33 "time"
34
35 "github.com/emicklei/go-restful/v3"
36 cadvisormetrics "github.com/google/cadvisor/container"
37 cadvisorapi "github.com/google/cadvisor/info/v1"
38 cadvisorv2 "github.com/google/cadvisor/info/v2"
39 "github.com/google/cadvisor/metrics"
40 "go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful"
41 oteltrace "go.opentelemetry.io/otel/trace"
42 "google.golang.org/grpc"
43 "k8s.io/klog/v2"
44 "k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
45 "k8s.io/utils/clock"
46 netutils "k8s.io/utils/net"
47
48 v1 "k8s.io/api/core/v1"
49 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
50 "k8s.io/apimachinery/pkg/runtime"
51 "k8s.io/apimachinery/pkg/runtime/schema"
52 "k8s.io/apimachinery/pkg/types"
53 "k8s.io/apimachinery/pkg/util/proxy"
54 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
55 "k8s.io/apimachinery/pkg/util/sets"
56 "k8s.io/apiserver/pkg/authentication/authenticator"
57 "k8s.io/apiserver/pkg/authorization/authorizer"
58 "k8s.io/apiserver/pkg/server/healthz"
59 "k8s.io/apiserver/pkg/server/httplog"
60 "k8s.io/apiserver/pkg/server/routes"
61 utilfeature "k8s.io/apiserver/pkg/util/feature"
62 "k8s.io/apiserver/pkg/util/flushwriter"
63 "k8s.io/component-base/configz"
64 "k8s.io/component-base/logs"
65 compbasemetrics "k8s.io/component-base/metrics"
66 metricsfeatures "k8s.io/component-base/metrics/features"
67 "k8s.io/component-base/metrics/legacyregistry"
68 "k8s.io/component-base/metrics/prometheus/slis"
69 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
70 podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
71 podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
72 "k8s.io/kubelet/pkg/cri/streaming"
73 "k8s.io/kubelet/pkg/cri/streaming/portforward"
74 remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
75 kubelettypes "k8s.io/kubelet/pkg/types"
76 "k8s.io/kubernetes/pkg/api/legacyscheme"
77 api "k8s.io/kubernetes/pkg/apis/core"
78 "k8s.io/kubernetes/pkg/apis/core/v1/validation"
79 "k8s.io/kubernetes/pkg/features"
80 kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
81 apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
82 "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
83 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
84 "k8s.io/kubernetes/pkg/kubelet/prober"
85 servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
86 "k8s.io/kubernetes/pkg/kubelet/server/stats"
87 "k8s.io/kubernetes/pkg/kubelet/util"
88 )
89
90 func init() {
91 utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
92 }
93
94 const (
95 metricsPath = "/metrics"
96 cadvisorMetricsPath = "/metrics/cadvisor"
97 resourceMetricsPath = "/metrics/resource"
98 proberMetricsPath = "/metrics/probes"
99 statsPath = "/stats/"
100 logsPath = "/logs/"
101 checkpointPath = "/checkpoint/"
102 pprofBasePath = "/debug/pprof/"
103 debugFlagPath = "/debug/flags/v"
104 )
105
106
107 type Server struct {
108 auth AuthInterface
109 host HostInterface
110 restfulCont containerInterface
111 metricsBuckets sets.String
112 metricsMethodBuckets sets.String
113 resourceAnalyzer stats.ResourceAnalyzer
114 }
115
116
117 type TLSOptions struct {
118 Config *tls.Config
119 CertFile string
120 KeyFile string
121 }
122
123
124 type containerInterface interface {
125 Add(service *restful.WebService) *restful.Container
126 Handle(path string, handler http.Handler)
127 Filter(filter restful.FilterFunction)
128 ServeHTTP(w http.ResponseWriter, r *http.Request)
129 RegisteredWebServices() []*restful.WebService
130
131
132
133 RegisteredHandlePaths() []string
134 }
135
136
137
138 type filteringContainer struct {
139 *restful.Container
140
141 registeredHandlePaths []string
142 }
143
144 func (a *filteringContainer) Handle(path string, handler http.Handler) {
145 a.HandleWithFilter(path, handler)
146 a.registeredHandlePaths = append(a.registeredHandlePaths, path)
147 }
148 func (a *filteringContainer) RegisteredHandlePaths() []string {
149 return a.registeredHandlePaths
150 }
151
152
153 func ListenAndServeKubeletServer(
154 host HostInterface,
155 resourceAnalyzer stats.ResourceAnalyzer,
156 kubeCfg *kubeletconfiginternal.KubeletConfiguration,
157 tlsOptions *TLSOptions,
158 auth AuthInterface,
159 tp oteltrace.TracerProvider) {
160
161 address := netutils.ParseIPSloppy(kubeCfg.Address)
162 port := uint(kubeCfg.Port)
163 klog.InfoS("Starting to listen", "address", address, "port", port)
164 handler := NewServer(host, resourceAnalyzer, auth, tp, kubeCfg)
165 s := &http.Server{
166 Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
167 Handler: &handler,
168 IdleTimeout: 90 * time.Second,
169 ReadTimeout: 4 * 60 * time.Minute,
170 WriteTimeout: 4 * 60 * time.Minute,
171 MaxHeaderBytes: 1 << 20,
172 }
173
174 if tlsOptions != nil {
175 s.TLSConfig = tlsOptions.Config
176
177
178
179 if err := s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile); err != nil {
180 klog.ErrorS(err, "Failed to listen and serve")
181 os.Exit(1)
182 }
183 } else if err := s.ListenAndServe(); err != nil {
184 klog.ErrorS(err, "Failed to listen and serve")
185 os.Exit(1)
186 }
187 }
188
189
190 func ListenAndServeKubeletReadOnlyServer(
191 host HostInterface,
192 resourceAnalyzer stats.ResourceAnalyzer,
193 address net.IP,
194 port uint) {
195 klog.InfoS("Starting to listen read-only", "address", address, "port", port)
196
197 s := NewServer(host, resourceAnalyzer, nil, oteltrace.NewNoopTracerProvider(), nil)
198
199 server := &http.Server{
200 Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
201 Handler: &s,
202 IdleTimeout: 90 * time.Second,
203 ReadTimeout: 4 * 60 * time.Minute,
204 WriteTimeout: 4 * 60 * time.Minute,
205 MaxHeaderBytes: 1 << 20,
206 }
207
208 if err := server.ListenAndServe(); err != nil {
209 klog.ErrorS(err, "Failed to listen and serve")
210 os.Exit(1)
211 }
212 }
213
214
215 func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) {
216 server := grpc.NewServer(apisgrpc.WithRateLimiter("podresources", podresources.DefaultQPS, podresources.DefaultBurstTokens))
217
218 podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers))
219 podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers))
220
221 l, err := util.CreateListener(endpoint)
222 if err != nil {
223 klog.ErrorS(err, "Failed to create listener for podResources endpoint")
224 os.Exit(1)
225 }
226
227 klog.InfoS("Starting to serve the podresources API", "endpoint", endpoint)
228 if err := server.Serve(l); err != nil {
229 klog.ErrorS(err, "Failed to serve")
230 os.Exit(1)
231 }
232 }
233
234
235 type AuthInterface interface {
236 authenticator.Request
237 authorizer.RequestAttributesGetter
238 authorizer.Authorizer
239 }
240
241
242
243 type HostInterface interface {
244 stats.Provider
245 GetVersionInfo() (*cadvisorapi.VersionInfo, error)
246 GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
247 GetRunningPods(ctx context.Context) ([]*v1.Pod, error)
248 RunInContainer(ctx context.Context, name string, uid types.UID, container string, cmd []string) ([]byte, error)
249 CheckpointContainer(ctx context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error
250 GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
251 ServeLogs(w http.ResponseWriter, req *http.Request)
252 ResyncInterval() time.Duration
253 GetHostname() string
254 LatestLoopEntryTime() time.Time
255 GetExec(ctx context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
256 GetAttach(ctx context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
257 GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
258 ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error)
259 ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error)
260 }
261
262
263 func NewServer(
264 host HostInterface,
265 resourceAnalyzer stats.ResourceAnalyzer,
266 auth AuthInterface,
267 tp oteltrace.TracerProvider,
268 kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
269
270 server := Server{
271 host: host,
272 resourceAnalyzer: resourceAnalyzer,
273 auth: auth,
274 restfulCont: &filteringContainer{Container: restful.NewContainer()},
275 metricsBuckets: sets.NewString(),
276 metricsMethodBuckets: sets.NewString("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"),
277 }
278 if auth != nil {
279 server.InstallAuthFilter()
280 }
281 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
282 server.InstallTracingFilter(tp)
283 }
284 server.InstallDefaultHandlers()
285 if kubeCfg != nil && kubeCfg.EnableDebuggingHandlers {
286 server.InstallDebuggingHandlers()
287
288
289 server.InstallSystemLogHandler(kubeCfg.EnableSystemLogHandler, kubeCfg.EnableSystemLogQuery)
290 server.InstallProfilingHandler(kubeCfg.EnableProfilingHandler, kubeCfg.EnableContentionProfiling)
291 server.InstallDebugFlagsHandler(kubeCfg.EnableDebugFlagsHandler)
292 } else {
293 server.InstallDebuggingDisabledHandlers()
294 }
295 return server
296 }
297
298
299 func (s *Server) InstallAuthFilter() {
300 s.restfulCont.Filter(func(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
301
302 info, ok, err := s.auth.AuthenticateRequest(req.Request)
303 if err != nil {
304 klog.ErrorS(err, "Unable to authenticate the request due to an error")
305 resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
306 return
307 }
308 if !ok {
309 resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
310 return
311 }
312
313
314 attrs := s.auth.GetRequestAttributes(info.User, req.Request)
315
316
317 decision, _, err := s.auth.Authorize(req.Request.Context(), attrs)
318 if err != nil {
319 klog.ErrorS(err, "Authorization error", "user", attrs.GetUser().GetName(), "verb", attrs.GetVerb(), "resource", attrs.GetResource(), "subresource", attrs.GetSubresource())
320 msg := fmt.Sprintf("Authorization error (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
321 resp.WriteErrorString(http.StatusInternalServerError, msg)
322 return
323 }
324 if decision != authorizer.DecisionAllow {
325 klog.V(2).InfoS("Forbidden", "user", attrs.GetUser().GetName(), "verb", attrs.GetVerb(), "resource", attrs.GetResource(), "subresource", attrs.GetSubresource())
326 msg := fmt.Sprintf("Forbidden (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
327 resp.WriteErrorString(http.StatusForbidden, msg)
328 return
329 }
330
331
332 chain.ProcessFilter(req, resp)
333 })
334 }
335
336
337 func (s *Server) InstallTracingFilter(tp oteltrace.TracerProvider) {
338 s.restfulCont.Filter(otelrestful.OTelFilter("kubelet", otelrestful.WithTracerProvider(tp)))
339 }
340
341
342
343 func (s *Server) addMetricsBucketMatcher(bucket string) {
344 s.metricsBuckets.Insert(bucket)
345 }
346
347
348 func (s *Server) getMetricBucket(path string) string {
349 root := getURLRootPath(path)
350 if s.metricsBuckets.Has(root) {
351 return root
352 }
353 return "other"
354 }
355
356
357 func (s *Server) getMetricMethodBucket(method string) string {
358 if s.metricsMethodBuckets.Has(method) {
359 return method
360 }
361 return "other"
362 }
363
364
365
366 func (s *Server) InstallDefaultHandlers() {
367 s.addMetricsBucketMatcher("healthz")
368 healthz.InstallHandler(s.restfulCont,
369 healthz.PingHealthz,
370 healthz.LogHealthz,
371 healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
372 )
373
374 slis.SLIMetricsWithReset{}.Install(s.restfulCont)
375
376 s.addMetricsBucketMatcher("pods")
377 ws := new(restful.WebService)
378 ws.
379 Path("/pods").
380 Produces(restful.MIME_JSON)
381 ws.Route(ws.GET("").
382 To(s.getPods).
383 Operation("getPods"))
384 s.restfulCont.Add(ws)
385
386 s.addMetricsBucketMatcher("stats")
387 s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer))
388
389 s.addMetricsBucketMatcher("metrics")
390 s.addMetricsBucketMatcher("metrics/cadvisor")
391 s.addMetricsBucketMatcher("metrics/probes")
392 s.addMetricsBucketMatcher("metrics/resource")
393 s.restfulCont.Handle(metricsPath, legacyregistry.Handler())
394
395 includedMetrics := cadvisormetrics.MetricSet{
396 cadvisormetrics.CpuUsageMetrics: struct{}{},
397 cadvisormetrics.MemoryUsageMetrics: struct{}{},
398 cadvisormetrics.CpuLoadMetrics: struct{}{},
399 cadvisormetrics.DiskIOMetrics: struct{}{},
400 cadvisormetrics.DiskUsageMetrics: struct{}{},
401 cadvisormetrics.NetworkUsageMetrics: struct{}{},
402 cadvisormetrics.AppMetrics: struct{}{},
403 cadvisormetrics.ProcessMetrics: struct{}{},
404 cadvisormetrics.OOMMetrics: struct{}{},
405 }
406
407 r := compbasemetrics.NewKubeRegistry()
408 r.RawMustRegister(metrics.NewPrometheusMachineCollector(prometheusHostAdapter{s.host}, includedMetrics))
409 if utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI) {
410 r.CustomRegister(collectors.NewCRIMetricsCollector(context.TODO(), s.host.ListPodSandboxMetrics, s.host.ListMetricDescriptors))
411 } else {
412 cadvisorOpts := cadvisorv2.RequestOptions{
413 IdType: cadvisorv2.TypeName,
414 Count: 1,
415 Recursive: true,
416 }
417 r.RawMustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics, clock.RealClock{}, cadvisorOpts))
418 }
419 s.restfulCont.Handle(cadvisorMetricsPath,
420 compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
421 )
422
423 s.addMetricsBucketMatcher("metrics/resource")
424 resourceRegistry := compbasemetrics.NewKubeRegistry()
425 resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
426 s.restfulCont.Handle(resourceMetricsPath,
427 compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
428 )
429
430
431
432 s.addMetricsBucketMatcher("metrics/probes")
433 p := compbasemetrics.NewKubeRegistry()
434 _ = compbasemetrics.RegisterProcessStartTime(p.Register)
435 p.MustRegister(prober.ProberResults)
436 p.MustRegister(prober.ProberDuration)
437 s.restfulCont.Handle(proberMetricsPath,
438 compbasemetrics.HandlerFor(p, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
439 )
440
441
442 if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) {
443 s.addMetricsBucketMatcher("checkpoint")
444 ws = &restful.WebService{}
445 ws.Path(checkpointPath).Produces(restful.MIME_JSON)
446 ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
447 To(s.checkpoint).
448 Operation("checkpoint"))
449 s.restfulCont.Add(ws)
450 }
451 }
452
453
454 func (s *Server) InstallDebuggingHandlers() {
455 klog.InfoS("Adding debug handlers to kubelet server")
456
457 s.addMetricsBucketMatcher("run")
458 ws := new(restful.WebService)
459 ws.
460 Path("/run")
461 ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
462 To(s.getRun).
463 Operation("getRun"))
464 ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
465 To(s.getRun).
466 Operation("getRun"))
467 s.restfulCont.Add(ws)
468
469 s.addMetricsBucketMatcher("exec")
470 ws = new(restful.WebService)
471 ws.
472 Path("/exec")
473 ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
474 To(s.getExec).
475 Operation("getExec"))
476 ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
477 To(s.getExec).
478 Operation("getExec"))
479 ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
480 To(s.getExec).
481 Operation("getExec"))
482 ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
483 To(s.getExec).
484 Operation("getExec"))
485 s.restfulCont.Add(ws)
486
487 s.addMetricsBucketMatcher("attach")
488 ws = new(restful.WebService)
489 ws.
490 Path("/attach")
491 ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
492 To(s.getAttach).
493 Operation("getAttach"))
494 ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
495 To(s.getAttach).
496 Operation("getAttach"))
497 ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
498 To(s.getAttach).
499 Operation("getAttach"))
500 ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
501 To(s.getAttach).
502 Operation("getAttach"))
503 s.restfulCont.Add(ws)
504
505 s.addMetricsBucketMatcher("portForward")
506 ws = new(restful.WebService)
507 ws.
508 Path("/portForward")
509 ws.Route(ws.GET("/{podNamespace}/{podID}").
510 To(s.getPortForward).
511 Operation("getPortForward"))
512 ws.Route(ws.POST("/{podNamespace}/{podID}").
513 To(s.getPortForward).
514 Operation("getPortForward"))
515 ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
516 To(s.getPortForward).
517 Operation("getPortForward"))
518 ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
519 To(s.getPortForward).
520 Operation("getPortForward"))
521 s.restfulCont.Add(ws)
522
523 s.addMetricsBucketMatcher("containerLogs")
524 ws = new(restful.WebService)
525 ws.
526 Path("/containerLogs")
527 ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
528 To(s.getContainerLogs).
529 Operation("getContainerLogs"))
530 s.restfulCont.Add(ws)
531
532 s.addMetricsBucketMatcher("configz")
533 configz.InstallHandler(s.restfulCont)
534
535
536 s.addMetricsBucketMatcher("runningpods")
537 ws = new(restful.WebService)
538 ws.
539 Path("/runningpods/").
540 Produces(restful.MIME_JSON)
541 ws.Route(ws.GET("").
542 To(s.getRunningPods).
543 Operation("getRunningPods"))
544 s.restfulCont.Add(ws)
545 }
546
547
548 func (s *Server) InstallDebuggingDisabledHandlers() {
549 h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
550 http.Error(w, "Debug endpoints are disabled.", http.StatusMethodNotAllowed)
551 })
552
553 s.addMetricsBucketMatcher("run")
554 s.addMetricsBucketMatcher("exec")
555 s.addMetricsBucketMatcher("attach")
556 s.addMetricsBucketMatcher("portForward")
557 s.addMetricsBucketMatcher("containerLogs")
558 s.addMetricsBucketMatcher("runningpods")
559 s.addMetricsBucketMatcher("pprof")
560 s.addMetricsBucketMatcher("logs")
561 paths := []string{
562 "/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/",
563 "/runningpods/", pprofBasePath, logsPath}
564 for _, p := range paths {
565 s.restfulCont.Handle(p, h)
566 }
567 }
568
569
570 func (s *Server) InstallSystemLogHandler(enableSystemLogHandler bool, enableSystemLogQuery bool) {
571 s.addMetricsBucketMatcher("logs")
572 if enableSystemLogHandler {
573 ws := new(restful.WebService)
574 ws.Path(logsPath)
575 ws.Route(ws.GET("").
576 To(s.getLogs).
577 Operation("getLogs"))
578 if !enableSystemLogQuery {
579 ws.Route(ws.GET("/{logpath:*}").
580 To(s.getLogs).
581 Operation("getLogs").
582 Param(ws.PathParameter("logpath", "path to the log").DataType("string")))
583 } else {
584 ws.Route(ws.GET("/{logpath:*}").
585 To(s.getLogs).
586 Operation("getLogs").
587 Param(ws.PathParameter("logpath", "path to the log").DataType("string")).
588 Param(ws.QueryParameter("query", "query specifies services(s) or files from which to return logs").DataType("string")).
589 Param(ws.QueryParameter("sinceTime", "sinceTime is an RFC3339 timestamp from which to show logs").DataType("string")).
590 Param(ws.QueryParameter("untilTime", "untilTime is an RFC3339 timestamp until which to show logs").DataType("string")).
591 Param(ws.QueryParameter("tailLines", "tailLines is used to retrieve the specified number of lines from the end of the log").DataType("string")).
592 Param(ws.QueryParameter("pattern", "pattern filters log entries by the provided regex pattern").DataType("string")).
593 Param(ws.QueryParameter("boot", "boot show messages from a specific system boot").DataType("string")))
594 }
595 s.restfulCont.Add(ws)
596 } else {
597 s.restfulCont.Handle(logsPath, getHandlerForDisabledEndpoint("logs endpoint is disabled."))
598 }
599 }
600
601 func getHandlerForDisabledEndpoint(errorMessage string) http.HandlerFunc {
602 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
603 http.Error(w, errorMessage, http.StatusMethodNotAllowed)
604 })
605 }
606
607
608 func (s *Server) InstallDebugFlagsHandler(enableDebugFlagsHandler bool) {
609 if enableDebugFlagsHandler {
610
611
612 s.restfulCont.Handle(debugFlagPath, routes.StringFlagPutHandler(logs.GlogSetter))
613 } else {
614 s.restfulCont.Handle(debugFlagPath, getHandlerForDisabledEndpoint("flags endpoint is disabled."))
615 return
616 }
617 }
618
619
620 func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableContentionProfiling bool) {
621 s.addMetricsBucketMatcher("debug")
622 if !enableProfilingLogHandler {
623 s.restfulCont.Handle(pprofBasePath, getHandlerForDisabledEndpoint("profiling endpoint is disabled."))
624 return
625 }
626
627 handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
628 name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
629 switch name {
630 case "profile":
631 pprof.Profile(resp, req.Request)
632 case "symbol":
633 pprof.Symbol(resp, req.Request)
634 case "cmdline":
635 pprof.Cmdline(resp, req.Request)
636 case "trace":
637 pprof.Trace(resp, req.Request)
638 default:
639 pprof.Index(resp, req.Request)
640 }
641 }
642
643
644 ws := new(restful.WebService).Path(pprofBasePath)
645 ws.Route(ws.GET("/{subpath:*}").To(handlePprofEndpoint)).Doc("pprof endpoint")
646 s.restfulCont.Add(ws)
647
648 if enableContentionProfiling {
649 goruntime.SetBlockProfileRate(1)
650 }
651 }
652
653
654 func (s *Server) syncLoopHealthCheck(req *http.Request) error {
655 duration := s.host.ResyncInterval() * 2
656 minDuration := time.Minute * 5
657 if duration < minDuration {
658 duration = minDuration
659 }
660 enterLoopTime := s.host.LatestLoopEntryTime()
661 if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
662 return fmt.Errorf("sync Loop took longer than expected")
663 }
664 return nil
665 }
666
667
668 func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
669 podNamespace := request.PathParameter("podNamespace")
670 podID := request.PathParameter("podID")
671 containerName := request.PathParameter("containerName")
672 ctx := request.Request.Context()
673
674 if len(podID) == 0 {
675
676
677 response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
678 return
679 }
680 if len(containerName) == 0 {
681
682 response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
683 return
684 }
685 if len(podNamespace) == 0 {
686
687 response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
688 return
689 }
690
691 query := request.Request.URL.Query()
692
693 if tail := request.QueryParameter("tail"); len(tail) > 0 {
694 query["tailLines"] = []string{tail}
695
696 if tail == "all" {
697 delete(query, "tailLines")
698 }
699 }
700
701 logOptions := &v1.PodLogOptions{}
702 if err := legacyscheme.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
703 response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
704 return
705 }
706 logOptions.TypeMeta = metav1.TypeMeta{}
707 if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
708 response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
709 return
710 }
711
712 pod, ok := s.host.GetPodByName(podNamespace, podID)
713 if !ok {
714 response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID))
715 return
716 }
717
718 if kubecontainer.GetContainerSpec(pod, containerName) == nil {
719 response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID))
720 return
721 }
722
723 if _, ok := response.ResponseWriter.(http.Flusher); !ok {
724 response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
725 return
726 }
727 fw := flushwriter.Wrap(response.ResponseWriter)
728 response.Header().Set("Transfer-Encoding", "chunked")
729 if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
730 response.WriteError(http.StatusBadRequest, err)
731 return
732 }
733 }
734
735
736
737 func encodePods(pods []*v1.Pod) (data []byte, err error) {
738 podList := new(v1.PodList)
739 for _, pod := range pods {
740 podList.Items = append(podList.Items, *pod)
741 }
742
743
744
745 codec := legacyscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"})
746 return runtime.Encode(codec, podList)
747 }
748
749
750 func (s *Server) getPods(request *restful.Request, response *restful.Response) {
751 pods := s.host.GetPods()
752 data, err := encodePods(pods)
753 if err != nil {
754 response.WriteError(http.StatusInternalServerError, err)
755 return
756 }
757 writeJSONResponse(response, data)
758 }
759
760
761
762
763 func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
764 ctx := request.Request.Context()
765 pods, err := s.host.GetRunningPods(ctx)
766 if err != nil {
767 response.WriteError(http.StatusInternalServerError, err)
768 return
769 }
770 data, err := encodePods(pods)
771 if err != nil {
772 response.WriteError(http.StatusInternalServerError, err)
773 return
774 }
775 writeJSONResponse(response, data)
776 }
777
778
779 func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
780 s.host.ServeLogs(response, request.Request)
781 }
782
783 type execRequestParams struct {
784 podNamespace string
785 podName string
786 podUID types.UID
787 containerName string
788 cmd []string
789 }
790
791 func getExecRequestParams(req *restful.Request) execRequestParams {
792 return execRequestParams{
793 podNamespace: req.PathParameter("podNamespace"),
794 podName: req.PathParameter("podID"),
795 podUID: types.UID(req.PathParameter("uid")),
796 containerName: req.PathParameter("containerName"),
797 cmd: req.Request.URL.Query()[api.ExecCommandParam],
798 }
799 }
800
801 type portForwardRequestParams struct {
802 podNamespace string
803 podName string
804 podUID types.UID
805 }
806
807 func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams {
808 return portForwardRequestParams{
809 podNamespace: req.PathParameter("podNamespace"),
810 podName: req.PathParameter("podID"),
811 podUID: types.UID(req.PathParameter("uid")),
812 }
813 }
814
815 type responder struct{}
816
817 func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
818 klog.ErrorS(err, "Error while proxying request")
819 http.Error(w, err.Error(), http.StatusInternalServerError)
820 }
821
822
823 func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
824
825 handler := proxy.NewUpgradeAwareHandler(url, nil , false , true , &responder{})
826 handler.ServeHTTP(w, r)
827 }
828
829
830 func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
831 params := getExecRequestParams(request)
832 streamOpts, err := remotecommandserver.NewOptions(request.Request)
833 if err != nil {
834 utilruntime.HandleError(err)
835 response.WriteError(http.StatusBadRequest, err)
836 return
837 }
838 pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
839 if !ok {
840 response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
841 return
842 }
843
844 podFullName := kubecontainer.GetPodFullName(pod)
845 url, err := s.host.GetAttach(request.Request.Context(), podFullName, params.podUID, params.containerName, *streamOpts)
846 if err != nil {
847 streaming.WriteError(err, response.ResponseWriter)
848 return
849 }
850
851 proxyStream(response.ResponseWriter, request.Request, url)
852 }
853
854
855 func (s *Server) getExec(request *restful.Request, response *restful.Response) {
856 params := getExecRequestParams(request)
857 streamOpts, err := remotecommandserver.NewOptions(request.Request)
858 if err != nil {
859 utilruntime.HandleError(err)
860 response.WriteError(http.StatusBadRequest, err)
861 return
862 }
863 pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
864 if !ok {
865 response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
866 return
867 }
868
869 podFullName := kubecontainer.GetPodFullName(pod)
870 url, err := s.host.GetExec(request.Request.Context(), podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
871 if err != nil {
872 streaming.WriteError(err, response.ResponseWriter)
873 return
874 }
875 proxyStream(response.ResponseWriter, request.Request, url)
876 }
877
878
879 func (s *Server) getRun(request *restful.Request, response *restful.Response) {
880 params := getExecRequestParams(request)
881 pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
882 if !ok {
883 response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
884 return
885 }
886
887
888 params.cmd = strings.Split(request.QueryParameter("cmd"), " ")
889 data, err := s.host.RunInContainer(request.Request.Context(), kubecontainer.GetPodFullName(pod), params.podUID, params.containerName, params.cmd)
890 if err != nil {
891 response.WriteError(http.StatusInternalServerError, err)
892 return
893 }
894 writeJSONResponse(response, data)
895 }
896
897
898 func writeJSONResponse(response *restful.Response, data []byte) {
899 if data == nil {
900 response.WriteHeader(http.StatusOK)
901
902 return
903 }
904 response.Header().Set(restful.HEADER_ContentType, restful.MIME_JSON)
905 response.WriteHeader(http.StatusOK)
906 if _, err := response.Write(data); err != nil {
907 klog.ErrorS(err, "Error writing response")
908 }
909 }
910
911
912
913 func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
914 params := getPortForwardRequestParams(request)
915
916 portForwardOptions, err := portforward.NewV4Options(request.Request)
917 if err != nil {
918 utilruntime.HandleError(err)
919 response.WriteError(http.StatusBadRequest, err)
920 return
921 }
922 pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
923 if !ok {
924 response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
925 return
926 }
927 if len(params.podUID) > 0 && pod.UID != params.podUID {
928 response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found"))
929 return
930 }
931
932 url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
933 if err != nil {
934 streaming.WriteError(err, response.ResponseWriter)
935 return
936 }
937 proxyStream(response.ResponseWriter, request.Request, url)
938 }
939
940
941
942
943 func (s *Server) checkpoint(request *restful.Request, response *restful.Response) {
944 ctx := request.Request.Context()
945 pod, ok := s.host.GetPodByName(request.PathParameter("podNamespace"), request.PathParameter("podID"))
946 if !ok {
947 response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
948 return
949 }
950
951 containerName := request.PathParameter("containerName")
952
953 found := false
954 for _, container := range pod.Spec.Containers {
955 if container.Name == containerName {
956 found = true
957 break
958 }
959 }
960 if !found {
961 for _, container := range pod.Spec.InitContainers {
962 if container.Name == containerName {
963 found = true
964 break
965 }
966 }
967 }
968 if !found {
969 for _, container := range pod.Spec.EphemeralContainers {
970 if container.Name == containerName {
971 found = true
972 break
973 }
974 }
975 }
976 if !found {
977 response.WriteError(
978 http.StatusNotFound,
979 fmt.Errorf("container %v does not exist", containerName),
980 )
981 return
982 }
983
984 options := &runtimeapi.CheckpointContainerRequest{}
985
986
987 timeouts := request.Request.URL.Query()["timeout"]
988 if len(timeouts) > 0 {
989
990
991 timeout, err := strconv.ParseInt(timeouts[len(timeouts)-1], 10, 64)
992 if err != nil {
993 response.WriteError(
994 http.StatusNotFound,
995 fmt.Errorf("cannot parse value of timeout parameter"),
996 )
997 return
998 }
999 options.Timeout = timeout
1000 }
1001
1002 if err := s.host.CheckpointContainer(ctx, pod.UID, kubecontainer.GetPodFullName(pod), containerName, options); err != nil {
1003 response.WriteError(
1004 http.StatusInternalServerError,
1005 fmt.Errorf(
1006 "checkpointing of %v/%v/%v failed (%v)",
1007 request.PathParameter("podNamespace"),
1008 request.PathParameter("podID"),
1009 containerName,
1010 err,
1011 ),
1012 )
1013 return
1014 }
1015 writeJSONResponse(
1016 response,
1017 []byte(fmt.Sprintf("{\"items\":[\"%s\"]}", options.Location)),
1018 )
1019 }
1020
1021
1022
1023
1024 func getURLRootPath(path string) string {
1025 parts := strings.SplitN(strings.TrimPrefix(path, "/"), "/", 3)
1026 if len(parts) == 0 {
1027 return path
1028 }
1029
1030 if parts[0] == "metrics" && len(parts) > 1 {
1031 return fmt.Sprintf("%s/%s", parts[0], parts[1])
1032
1033 }
1034 return parts[0]
1035 }
1036
1037 var longRunningRequestPathMap = map[string]bool{
1038 "exec": true,
1039 "attach": true,
1040 "portforward": true,
1041 "debug": true,
1042 }
1043
1044
1045 func isLongRunningRequest(path string) bool {
1046 _, ok := longRunningRequestPathMap[path]
1047 return ok
1048 }
1049
1050 var statusesNoTracePred = httplog.StatusIsNot(
1051 http.StatusOK,
1052 http.StatusFound,
1053 http.StatusMovedPermanently,
1054 http.StatusTemporaryRedirect,
1055 http.StatusBadRequest,
1056 http.StatusNotFound,
1057 http.StatusSwitchingProtocols,
1058 )
1059
1060
1061 func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
1062 handler := httplog.WithLogging(s.restfulCont, statusesNoTracePred)
1063
1064
1065 var serverType string
1066 if s.auth == nil {
1067 serverType = "readonly"
1068 } else {
1069 serverType = "readwrite"
1070 }
1071
1072 method, path := s.getMetricMethodBucket(req.Method), s.getMetricBucket(req.URL.Path)
1073
1074 longRunning := strconv.FormatBool(isLongRunningRequest(path))
1075
1076 servermetrics.HTTPRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
1077
1078 servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
1079 defer servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Dec()
1080
1081 startTime := time.Now()
1082 defer servermetrics.HTTPRequestsDuration.WithLabelValues(method, path, serverType, longRunning).Observe(servermetrics.SinceInSeconds(startTime))
1083
1084 handler.ServeHTTP(w, req)
1085 }
1086
1087
1088
1089 type prometheusHostAdapter struct {
1090 host HostInterface
1091 }
1092
1093 func (a prometheusHostAdapter) GetRequestedContainersInfo(containerName string, options cadvisorv2.RequestOptions) (map[string]*cadvisorapi.ContainerInfo, error) {
1094 return a.host.GetRequestedContainersInfo(containerName, options)
1095 }
1096 func (a prometheusHostAdapter) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
1097 return a.host.GetVersionInfo()
1098 }
1099 func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error) {
1100 return a.host.GetCachedMachineInfo()
1101 }
1102
1103 func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc {
1104
1105 return func(c *cadvisorapi.ContainerInfo) map[string]string {
1106
1107
1108 var name, image, podName, namespace, containerName string
1109 if len(c.Aliases) > 0 {
1110 name = c.Aliases[0]
1111 }
1112 image = c.Spec.Image
1113 if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNameLabel]; ok {
1114 podName = v
1115 }
1116 if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNamespaceLabel]; ok {
1117 namespace = v
1118 }
1119 if v, ok := c.Spec.Labels[kubelettypes.KubernetesContainerNameLabel]; ok {
1120 containerName = v
1121 }
1122
1123 if podName == "" && namespace == "" {
1124 if pod, found := s.GetPodByCgroupfs(c.Name); found {
1125 podName = pod.Name
1126 namespace = pod.Namespace
1127 }
1128 }
1129 set := map[string]string{
1130 metrics.LabelID: c.Name,
1131 metrics.LabelName: name,
1132 metrics.LabelImage: image,
1133 "pod": podName,
1134 "namespace": namespace,
1135 "container": containerName,
1136 }
1137 return set
1138 }
1139 }
1140
View as plain text