...

Source file src/k8s.io/kubernetes/pkg/kubelet/server/server.go

Documentation: k8s.io/kubernetes/pkg/kubelet/server

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package server
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"fmt"
    23  	"io"
    24  	"net"
    25  	"net/http"
    26  	"net/http/pprof"
    27  	"net/url"
    28  	"os"
    29  	"reflect"
    30  	goruntime "runtime"
    31  	"strconv"
    32  	"strings"
    33  	"time"
    34  
    35  	"github.com/emicklei/go-restful/v3"
    36  	cadvisormetrics "github.com/google/cadvisor/container"
    37  	cadvisorapi "github.com/google/cadvisor/info/v1"
    38  	cadvisorv2 "github.com/google/cadvisor/info/v2"
    39  	"github.com/google/cadvisor/metrics"
    40  	"go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful"
    41  	oteltrace "go.opentelemetry.io/otel/trace"
    42  	"google.golang.org/grpc"
    43  	"k8s.io/klog/v2"
    44  	"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
    45  	"k8s.io/utils/clock"
    46  	netutils "k8s.io/utils/net"
    47  
    48  	v1 "k8s.io/api/core/v1"
    49  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    50  	"k8s.io/apimachinery/pkg/runtime"
    51  	"k8s.io/apimachinery/pkg/runtime/schema"
    52  	"k8s.io/apimachinery/pkg/types"
    53  	"k8s.io/apimachinery/pkg/util/proxy"
    54  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    55  	"k8s.io/apimachinery/pkg/util/sets"
    56  	"k8s.io/apiserver/pkg/authentication/authenticator"
    57  	"k8s.io/apiserver/pkg/authorization/authorizer"
    58  	"k8s.io/apiserver/pkg/server/healthz"
    59  	"k8s.io/apiserver/pkg/server/httplog"
    60  	"k8s.io/apiserver/pkg/server/routes"
    61  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    62  	"k8s.io/apiserver/pkg/util/flushwriter"
    63  	"k8s.io/component-base/configz"
    64  	"k8s.io/component-base/logs"
    65  	compbasemetrics "k8s.io/component-base/metrics"
    66  	metricsfeatures "k8s.io/component-base/metrics/features"
    67  	"k8s.io/component-base/metrics/legacyregistry"
    68  	"k8s.io/component-base/metrics/prometheus/slis"
    69  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    70  	podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
    71  	podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
    72  	"k8s.io/kubelet/pkg/cri/streaming"
    73  	"k8s.io/kubelet/pkg/cri/streaming/portforward"
    74  	remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
    75  	kubelettypes "k8s.io/kubelet/pkg/types"
    76  	"k8s.io/kubernetes/pkg/api/legacyscheme"
    77  	api "k8s.io/kubernetes/pkg/apis/core"
    78  	"k8s.io/kubernetes/pkg/apis/core/v1/validation"
    79  	"k8s.io/kubernetes/pkg/features"
    80  	kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
    81  	apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
    82  	"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
    83  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    84  	"k8s.io/kubernetes/pkg/kubelet/prober"
    85  	servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
    86  	"k8s.io/kubernetes/pkg/kubelet/server/stats"
    87  	"k8s.io/kubernetes/pkg/kubelet/util"
    88  )
    89  
    90  func init() {
    91  	utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    92  }
    93  
    94  const (
    95  	metricsPath         = "/metrics"
    96  	cadvisorMetricsPath = "/metrics/cadvisor"
    97  	resourceMetricsPath = "/metrics/resource"
    98  	proberMetricsPath   = "/metrics/probes"
    99  	statsPath           = "/stats/"
   100  	logsPath            = "/logs/"
   101  	checkpointPath      = "/checkpoint/"
   102  	pprofBasePath       = "/debug/pprof/"
   103  	debugFlagPath       = "/debug/flags/v"
   104  )
   105  
   106  // Server is a http.Handler which exposes kubelet functionality over HTTP.
   107  type Server struct {
   108  	auth                 AuthInterface
   109  	host                 HostInterface
   110  	restfulCont          containerInterface
   111  	metricsBuckets       sets.String
   112  	metricsMethodBuckets sets.String
   113  	resourceAnalyzer     stats.ResourceAnalyzer
   114  }
   115  
   116  // TLSOptions holds the TLS options.
   117  type TLSOptions struct {
   118  	Config   *tls.Config
   119  	CertFile string
   120  	KeyFile  string
   121  }
   122  
   123  // containerInterface defines the restful.Container functions used on the root container
   124  type containerInterface interface {
   125  	Add(service *restful.WebService) *restful.Container
   126  	Handle(path string, handler http.Handler)
   127  	Filter(filter restful.FilterFunction)
   128  	ServeHTTP(w http.ResponseWriter, r *http.Request)
   129  	RegisteredWebServices() []*restful.WebService
   130  
   131  	// RegisteredHandlePaths returns the paths of handlers registered directly with the container (non-web-services)
   132  	// Used to test filters are being applied on non-web-service handlers
   133  	RegisteredHandlePaths() []string
   134  }
   135  
   136  // filteringContainer delegates all Handle(...) calls to Container.HandleWithFilter(...),
   137  // so we can ensure restful.FilterFunctions are used for all handlers
   138  type filteringContainer struct {
   139  	*restful.Container
   140  
   141  	registeredHandlePaths []string
   142  }
   143  
   144  func (a *filteringContainer) Handle(path string, handler http.Handler) {
   145  	a.HandleWithFilter(path, handler)
   146  	a.registeredHandlePaths = append(a.registeredHandlePaths, path)
   147  }
   148  func (a *filteringContainer) RegisteredHandlePaths() []string {
   149  	return a.registeredHandlePaths
   150  }
   151  
   152  // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
   153  func ListenAndServeKubeletServer(
   154  	host HostInterface,
   155  	resourceAnalyzer stats.ResourceAnalyzer,
   156  	kubeCfg *kubeletconfiginternal.KubeletConfiguration,
   157  	tlsOptions *TLSOptions,
   158  	auth AuthInterface,
   159  	tp oteltrace.TracerProvider) {
   160  
   161  	address := netutils.ParseIPSloppy(kubeCfg.Address)
   162  	port := uint(kubeCfg.Port)
   163  	klog.InfoS("Starting to listen", "address", address, "port", port)
   164  	handler := NewServer(host, resourceAnalyzer, auth, tp, kubeCfg)
   165  	s := &http.Server{
   166  		Addr:           net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
   167  		Handler:        &handler,
   168  		IdleTimeout:    90 * time.Second, // matches http.DefaultTransport keep-alive timeout
   169  		ReadTimeout:    4 * 60 * time.Minute,
   170  		WriteTimeout:   4 * 60 * time.Minute,
   171  		MaxHeaderBytes: 1 << 20,
   172  	}
   173  
   174  	if tlsOptions != nil {
   175  		s.TLSConfig = tlsOptions.Config
   176  		// Passing empty strings as the cert and key files means no
   177  		// cert/keys are specified and GetCertificate in the TLSConfig
   178  		// should be called instead.
   179  		if err := s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile); err != nil {
   180  			klog.ErrorS(err, "Failed to listen and serve")
   181  			os.Exit(1)
   182  		}
   183  	} else if err := s.ListenAndServe(); err != nil {
   184  		klog.ErrorS(err, "Failed to listen and serve")
   185  		os.Exit(1)
   186  	}
   187  }
   188  
   189  // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
   190  func ListenAndServeKubeletReadOnlyServer(
   191  	host HostInterface,
   192  	resourceAnalyzer stats.ResourceAnalyzer,
   193  	address net.IP,
   194  	port uint) {
   195  	klog.InfoS("Starting to listen read-only", "address", address, "port", port)
   196  	// TODO: https://github.com/kubernetes/kubernetes/issues/109829 tracer should use WithPublicEndpoint
   197  	s := NewServer(host, resourceAnalyzer, nil, oteltrace.NewNoopTracerProvider(), nil)
   198  
   199  	server := &http.Server{
   200  		Addr:           net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
   201  		Handler:        &s,
   202  		IdleTimeout:    90 * time.Second, // matches http.DefaultTransport keep-alive timeout
   203  		ReadTimeout:    4 * 60 * time.Minute,
   204  		WriteTimeout:   4 * 60 * time.Minute,
   205  		MaxHeaderBytes: 1 << 20,
   206  	}
   207  
   208  	if err := server.ListenAndServe(); err != nil {
   209  		klog.ErrorS(err, "Failed to listen and serve")
   210  		os.Exit(1)
   211  	}
   212  }
   213  
   214  // ListenAndServePodResources initializes a gRPC server to serve the PodResources service
   215  func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) {
   216  	server := grpc.NewServer(apisgrpc.WithRateLimiter("podresources", podresources.DefaultQPS, podresources.DefaultBurstTokens))
   217  
   218  	podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers))
   219  	podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers))
   220  
   221  	l, err := util.CreateListener(endpoint)
   222  	if err != nil {
   223  		klog.ErrorS(err, "Failed to create listener for podResources endpoint")
   224  		os.Exit(1)
   225  	}
   226  
   227  	klog.InfoS("Starting to serve the podresources API", "endpoint", endpoint)
   228  	if err := server.Serve(l); err != nil {
   229  		klog.ErrorS(err, "Failed to serve")
   230  		os.Exit(1)
   231  	}
   232  }
   233  
   234  // AuthInterface contains all methods required by the auth filters
   235  type AuthInterface interface {
   236  	authenticator.Request
   237  	authorizer.RequestAttributesGetter
   238  	authorizer.Authorizer
   239  }
   240  
   241  // HostInterface contains all the kubelet methods required by the server.
   242  // For testability.
   243  type HostInterface interface {
   244  	stats.Provider
   245  	GetVersionInfo() (*cadvisorapi.VersionInfo, error)
   246  	GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
   247  	GetRunningPods(ctx context.Context) ([]*v1.Pod, error)
   248  	RunInContainer(ctx context.Context, name string, uid types.UID, container string, cmd []string) ([]byte, error)
   249  	CheckpointContainer(ctx context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error
   250  	GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
   251  	ServeLogs(w http.ResponseWriter, req *http.Request)
   252  	ResyncInterval() time.Duration
   253  	GetHostname() string
   254  	LatestLoopEntryTime() time.Time
   255  	GetExec(ctx context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
   256  	GetAttach(ctx context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
   257  	GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
   258  	ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error)
   259  	ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error)
   260  }
   261  
   262  // NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
   263  func NewServer(
   264  	host HostInterface,
   265  	resourceAnalyzer stats.ResourceAnalyzer,
   266  	auth AuthInterface,
   267  	tp oteltrace.TracerProvider,
   268  	kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
   269  
   270  	server := Server{
   271  		host:                 host,
   272  		resourceAnalyzer:     resourceAnalyzer,
   273  		auth:                 auth,
   274  		restfulCont:          &filteringContainer{Container: restful.NewContainer()},
   275  		metricsBuckets:       sets.NewString(),
   276  		metricsMethodBuckets: sets.NewString("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"),
   277  	}
   278  	if auth != nil {
   279  		server.InstallAuthFilter()
   280  	}
   281  	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
   282  		server.InstallTracingFilter(tp)
   283  	}
   284  	server.InstallDefaultHandlers()
   285  	if kubeCfg != nil && kubeCfg.EnableDebuggingHandlers {
   286  		server.InstallDebuggingHandlers()
   287  		// To maintain backward compatibility serve logs and pprof only when enableDebuggingHandlers is also enabled
   288  		// see https://github.com/kubernetes/kubernetes/pull/87273
   289  		server.InstallSystemLogHandler(kubeCfg.EnableSystemLogHandler, kubeCfg.EnableSystemLogQuery)
   290  		server.InstallProfilingHandler(kubeCfg.EnableProfilingHandler, kubeCfg.EnableContentionProfiling)
   291  		server.InstallDebugFlagsHandler(kubeCfg.EnableDebugFlagsHandler)
   292  	} else {
   293  		server.InstallDebuggingDisabledHandlers()
   294  	}
   295  	return server
   296  }
   297  
   298  // InstallAuthFilter installs authentication filters with the restful Container.
   299  func (s *Server) InstallAuthFilter() {
   300  	s.restfulCont.Filter(func(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
   301  		// Authenticate
   302  		info, ok, err := s.auth.AuthenticateRequest(req.Request)
   303  		if err != nil {
   304  			klog.ErrorS(err, "Unable to authenticate the request due to an error")
   305  			resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
   306  			return
   307  		}
   308  		if !ok {
   309  			resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
   310  			return
   311  		}
   312  
   313  		// Get authorization attributes
   314  		attrs := s.auth.GetRequestAttributes(info.User, req.Request)
   315  
   316  		// Authorize
   317  		decision, _, err := s.auth.Authorize(req.Request.Context(), attrs)
   318  		if err != nil {
   319  			klog.ErrorS(err, "Authorization error", "user", attrs.GetUser().GetName(), "verb", attrs.GetVerb(), "resource", attrs.GetResource(), "subresource", attrs.GetSubresource())
   320  			msg := fmt.Sprintf("Authorization error (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
   321  			resp.WriteErrorString(http.StatusInternalServerError, msg)
   322  			return
   323  		}
   324  		if decision != authorizer.DecisionAllow {
   325  			klog.V(2).InfoS("Forbidden", "user", attrs.GetUser().GetName(), "verb", attrs.GetVerb(), "resource", attrs.GetResource(), "subresource", attrs.GetSubresource())
   326  			msg := fmt.Sprintf("Forbidden (user=%s, verb=%s, resource=%s, subresource=%s)", attrs.GetUser().GetName(), attrs.GetVerb(), attrs.GetResource(), attrs.GetSubresource())
   327  			resp.WriteErrorString(http.StatusForbidden, msg)
   328  			return
   329  		}
   330  
   331  		// Continue
   332  		chain.ProcessFilter(req, resp)
   333  	})
   334  }
   335  
   336  // InstallTracingFilter installs OpenTelemetry tracing filter with the restful Container.
   337  func (s *Server) InstallTracingFilter(tp oteltrace.TracerProvider) {
   338  	s.restfulCont.Filter(otelrestful.OTelFilter("kubelet", otelrestful.WithTracerProvider(tp)))
   339  }
   340  
   341  // addMetricsBucketMatcher adds a regexp matcher and the relevant bucket to use when
   342  // it matches. Please be aware this is not thread safe and should not be used dynamically
   343  func (s *Server) addMetricsBucketMatcher(bucket string) {
   344  	s.metricsBuckets.Insert(bucket)
   345  }
   346  
   347  // getMetricBucket find the appropriate metrics reporting bucket for the given path
   348  func (s *Server) getMetricBucket(path string) string {
   349  	root := getURLRootPath(path)
   350  	if s.metricsBuckets.Has(root) {
   351  		return root
   352  	}
   353  	return "other"
   354  }
   355  
   356  // getMetricMethodBucket checks for unknown or invalid HTTP verbs
   357  func (s *Server) getMetricMethodBucket(method string) string {
   358  	if s.metricsMethodBuckets.Has(method) {
   359  		return method
   360  	}
   361  	return "other"
   362  }
   363  
   364  // InstallDefaultHandlers registers the default set of supported HTTP request
   365  // patterns with the restful Container.
   366  func (s *Server) InstallDefaultHandlers() {
   367  	s.addMetricsBucketMatcher("healthz")
   368  	healthz.InstallHandler(s.restfulCont,
   369  		healthz.PingHealthz,
   370  		healthz.LogHealthz,
   371  		healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
   372  	)
   373  
   374  	slis.SLIMetricsWithReset{}.Install(s.restfulCont)
   375  
   376  	s.addMetricsBucketMatcher("pods")
   377  	ws := new(restful.WebService)
   378  	ws.
   379  		Path("/pods").
   380  		Produces(restful.MIME_JSON)
   381  	ws.Route(ws.GET("").
   382  		To(s.getPods).
   383  		Operation("getPods"))
   384  	s.restfulCont.Add(ws)
   385  
   386  	s.addMetricsBucketMatcher("stats")
   387  	s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer))
   388  
   389  	s.addMetricsBucketMatcher("metrics")
   390  	s.addMetricsBucketMatcher("metrics/cadvisor")
   391  	s.addMetricsBucketMatcher("metrics/probes")
   392  	s.addMetricsBucketMatcher("metrics/resource")
   393  	s.restfulCont.Handle(metricsPath, legacyregistry.Handler())
   394  
   395  	includedMetrics := cadvisormetrics.MetricSet{
   396  		cadvisormetrics.CpuUsageMetrics:     struct{}{},
   397  		cadvisormetrics.MemoryUsageMetrics:  struct{}{},
   398  		cadvisormetrics.CpuLoadMetrics:      struct{}{},
   399  		cadvisormetrics.DiskIOMetrics:       struct{}{},
   400  		cadvisormetrics.DiskUsageMetrics:    struct{}{},
   401  		cadvisormetrics.NetworkUsageMetrics: struct{}{},
   402  		cadvisormetrics.AppMetrics:          struct{}{},
   403  		cadvisormetrics.ProcessMetrics:      struct{}{},
   404  		cadvisormetrics.OOMMetrics:          struct{}{},
   405  	}
   406  	// cAdvisor metrics are exposed under the secured handler as well
   407  	r := compbasemetrics.NewKubeRegistry()
   408  	r.RawMustRegister(metrics.NewPrometheusMachineCollector(prometheusHostAdapter{s.host}, includedMetrics))
   409  	if utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI) {
   410  		r.CustomRegister(collectors.NewCRIMetricsCollector(context.TODO(), s.host.ListPodSandboxMetrics, s.host.ListMetricDescriptors))
   411  	} else {
   412  		cadvisorOpts := cadvisorv2.RequestOptions{
   413  			IdType:    cadvisorv2.TypeName,
   414  			Count:     1,
   415  			Recursive: true,
   416  		}
   417  		r.RawMustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics, clock.RealClock{}, cadvisorOpts))
   418  	}
   419  	s.restfulCont.Handle(cadvisorMetricsPath,
   420  		compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
   421  	)
   422  
   423  	s.addMetricsBucketMatcher("metrics/resource")
   424  	resourceRegistry := compbasemetrics.NewKubeRegistry()
   425  	resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
   426  	s.restfulCont.Handle(resourceMetricsPath,
   427  		compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
   428  	)
   429  
   430  	// prober metrics are exposed under a different endpoint
   431  
   432  	s.addMetricsBucketMatcher("metrics/probes")
   433  	p := compbasemetrics.NewKubeRegistry()
   434  	_ = compbasemetrics.RegisterProcessStartTime(p.Register)
   435  	p.MustRegister(prober.ProberResults)
   436  	p.MustRegister(prober.ProberDuration)
   437  	s.restfulCont.Handle(proberMetricsPath,
   438  		compbasemetrics.HandlerFor(p, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
   439  	)
   440  
   441  	// Only enable checkpoint API if the feature is enabled
   442  	if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) {
   443  		s.addMetricsBucketMatcher("checkpoint")
   444  		ws = &restful.WebService{}
   445  		ws.Path(checkpointPath).Produces(restful.MIME_JSON)
   446  		ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
   447  			To(s.checkpoint).
   448  			Operation("checkpoint"))
   449  		s.restfulCont.Add(ws)
   450  	}
   451  }
   452  
   453  // InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
   454  func (s *Server) InstallDebuggingHandlers() {
   455  	klog.InfoS("Adding debug handlers to kubelet server")
   456  
   457  	s.addMetricsBucketMatcher("run")
   458  	ws := new(restful.WebService)
   459  	ws.
   460  		Path("/run")
   461  	ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
   462  		To(s.getRun).
   463  		Operation("getRun"))
   464  	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
   465  		To(s.getRun).
   466  		Operation("getRun"))
   467  	s.restfulCont.Add(ws)
   468  
   469  	s.addMetricsBucketMatcher("exec")
   470  	ws = new(restful.WebService)
   471  	ws.
   472  		Path("/exec")
   473  	ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
   474  		To(s.getExec).
   475  		Operation("getExec"))
   476  	ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
   477  		To(s.getExec).
   478  		Operation("getExec"))
   479  	ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
   480  		To(s.getExec).
   481  		Operation("getExec"))
   482  	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
   483  		To(s.getExec).
   484  		Operation("getExec"))
   485  	s.restfulCont.Add(ws)
   486  
   487  	s.addMetricsBucketMatcher("attach")
   488  	ws = new(restful.WebService)
   489  	ws.
   490  		Path("/attach")
   491  	ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
   492  		To(s.getAttach).
   493  		Operation("getAttach"))
   494  	ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
   495  		To(s.getAttach).
   496  		Operation("getAttach"))
   497  	ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
   498  		To(s.getAttach).
   499  		Operation("getAttach"))
   500  	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
   501  		To(s.getAttach).
   502  		Operation("getAttach"))
   503  	s.restfulCont.Add(ws)
   504  
   505  	s.addMetricsBucketMatcher("portForward")
   506  	ws = new(restful.WebService)
   507  	ws.
   508  		Path("/portForward")
   509  	ws.Route(ws.GET("/{podNamespace}/{podID}").
   510  		To(s.getPortForward).
   511  		Operation("getPortForward"))
   512  	ws.Route(ws.POST("/{podNamespace}/{podID}").
   513  		To(s.getPortForward).
   514  		Operation("getPortForward"))
   515  	ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
   516  		To(s.getPortForward).
   517  		Operation("getPortForward"))
   518  	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
   519  		To(s.getPortForward).
   520  		Operation("getPortForward"))
   521  	s.restfulCont.Add(ws)
   522  
   523  	s.addMetricsBucketMatcher("containerLogs")
   524  	ws = new(restful.WebService)
   525  	ws.
   526  		Path("/containerLogs")
   527  	ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
   528  		To(s.getContainerLogs).
   529  		Operation("getContainerLogs"))
   530  	s.restfulCont.Add(ws)
   531  
   532  	s.addMetricsBucketMatcher("configz")
   533  	configz.InstallHandler(s.restfulCont)
   534  
   535  	// The /runningpods endpoint is used for testing only.
   536  	s.addMetricsBucketMatcher("runningpods")
   537  	ws = new(restful.WebService)
   538  	ws.
   539  		Path("/runningpods/").
   540  		Produces(restful.MIME_JSON)
   541  	ws.Route(ws.GET("").
   542  		To(s.getRunningPods).
   543  		Operation("getRunningPods"))
   544  	s.restfulCont.Add(ws)
   545  }
   546  
   547  // InstallDebuggingDisabledHandlers registers the HTTP request patterns that provide better error message
   548  func (s *Server) InstallDebuggingDisabledHandlers() {
   549  	h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   550  		http.Error(w, "Debug endpoints are disabled.", http.StatusMethodNotAllowed)
   551  	})
   552  
   553  	s.addMetricsBucketMatcher("run")
   554  	s.addMetricsBucketMatcher("exec")
   555  	s.addMetricsBucketMatcher("attach")
   556  	s.addMetricsBucketMatcher("portForward")
   557  	s.addMetricsBucketMatcher("containerLogs")
   558  	s.addMetricsBucketMatcher("runningpods")
   559  	s.addMetricsBucketMatcher("pprof")
   560  	s.addMetricsBucketMatcher("logs")
   561  	paths := []string{
   562  		"/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/",
   563  		"/runningpods/", pprofBasePath, logsPath}
   564  	for _, p := range paths {
   565  		s.restfulCont.Handle(p, h)
   566  	}
   567  }
   568  
   569  // InstallSystemLogHandler registers the HTTP request patterns for logs endpoint.
   570  func (s *Server) InstallSystemLogHandler(enableSystemLogHandler bool, enableSystemLogQuery bool) {
   571  	s.addMetricsBucketMatcher("logs")
   572  	if enableSystemLogHandler {
   573  		ws := new(restful.WebService)
   574  		ws.Path(logsPath)
   575  		ws.Route(ws.GET("").
   576  			To(s.getLogs).
   577  			Operation("getLogs"))
   578  		if !enableSystemLogQuery {
   579  			ws.Route(ws.GET("/{logpath:*}").
   580  				To(s.getLogs).
   581  				Operation("getLogs").
   582  				Param(ws.PathParameter("logpath", "path to the log").DataType("string")))
   583  		} else {
   584  			ws.Route(ws.GET("/{logpath:*}").
   585  				To(s.getLogs).
   586  				Operation("getLogs").
   587  				Param(ws.PathParameter("logpath", "path to the log").DataType("string")).
   588  				Param(ws.QueryParameter("query", "query specifies services(s) or files from which to return logs").DataType("string")).
   589  				Param(ws.QueryParameter("sinceTime", "sinceTime is an RFC3339 timestamp from which to show logs").DataType("string")).
   590  				Param(ws.QueryParameter("untilTime", "untilTime is an RFC3339 timestamp until which to show logs").DataType("string")).
   591  				Param(ws.QueryParameter("tailLines", "tailLines is used to retrieve the specified number of lines from the end of the log").DataType("string")).
   592  				Param(ws.QueryParameter("pattern", "pattern filters log entries by the provided regex pattern").DataType("string")).
   593  				Param(ws.QueryParameter("boot", "boot show messages from a specific system boot").DataType("string")))
   594  		}
   595  		s.restfulCont.Add(ws)
   596  	} else {
   597  		s.restfulCont.Handle(logsPath, getHandlerForDisabledEndpoint("logs endpoint is disabled."))
   598  	}
   599  }
   600  
   601  func getHandlerForDisabledEndpoint(errorMessage string) http.HandlerFunc {
   602  	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   603  		http.Error(w, errorMessage, http.StatusMethodNotAllowed)
   604  	})
   605  }
   606  
   607  // InstallDebugFlagsHandler registers the HTTP request patterns for /debug/flags/v endpoint.
   608  func (s *Server) InstallDebugFlagsHandler(enableDebugFlagsHandler bool) {
   609  	if enableDebugFlagsHandler {
   610  		// Setup flags handlers.
   611  		// so far, only logging related endpoints are considered valid to add for these debug flags.
   612  		s.restfulCont.Handle(debugFlagPath, routes.StringFlagPutHandler(logs.GlogSetter))
   613  	} else {
   614  		s.restfulCont.Handle(debugFlagPath, getHandlerForDisabledEndpoint("flags endpoint is disabled."))
   615  		return
   616  	}
   617  }
   618  
   619  // InstallProfilingHandler registers the HTTP request patterns for /debug/pprof endpoint.
   620  func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableContentionProfiling bool) {
   621  	s.addMetricsBucketMatcher("debug")
   622  	if !enableProfilingLogHandler {
   623  		s.restfulCont.Handle(pprofBasePath, getHandlerForDisabledEndpoint("profiling endpoint is disabled."))
   624  		return
   625  	}
   626  
   627  	handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
   628  		name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
   629  		switch name {
   630  		case "profile":
   631  			pprof.Profile(resp, req.Request)
   632  		case "symbol":
   633  			pprof.Symbol(resp, req.Request)
   634  		case "cmdline":
   635  			pprof.Cmdline(resp, req.Request)
   636  		case "trace":
   637  			pprof.Trace(resp, req.Request)
   638  		default:
   639  			pprof.Index(resp, req.Request)
   640  		}
   641  	}
   642  
   643  	// Setup pprof handlers.
   644  	ws := new(restful.WebService).Path(pprofBasePath)
   645  	ws.Route(ws.GET("/{subpath:*}").To(handlePprofEndpoint)).Doc("pprof endpoint")
   646  	s.restfulCont.Add(ws)
   647  
   648  	if enableContentionProfiling {
   649  		goruntime.SetBlockProfileRate(1)
   650  	}
   651  }
   652  
   653  // Checks if kubelet's sync loop  that updates containers is working.
   654  func (s *Server) syncLoopHealthCheck(req *http.Request) error {
   655  	duration := s.host.ResyncInterval() * 2
   656  	minDuration := time.Minute * 5
   657  	if duration < minDuration {
   658  		duration = minDuration
   659  	}
   660  	enterLoopTime := s.host.LatestLoopEntryTime()
   661  	if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
   662  		return fmt.Errorf("sync Loop took longer than expected")
   663  	}
   664  	return nil
   665  }
   666  
   667  // getContainerLogs handles containerLogs request against the Kubelet
   668  func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
   669  	podNamespace := request.PathParameter("podNamespace")
   670  	podID := request.PathParameter("podID")
   671  	containerName := request.PathParameter("containerName")
   672  	ctx := request.Request.Context()
   673  
   674  	if len(podID) == 0 {
   675  		// TODO: Why return JSON when the rest return plaintext errors?
   676  		// TODO: Why return plaintext errors?
   677  		response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
   678  		return
   679  	}
   680  	if len(containerName) == 0 {
   681  		// TODO: Why return JSON when the rest return plaintext errors?
   682  		response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
   683  		return
   684  	}
   685  	if len(podNamespace) == 0 {
   686  		// TODO: Why return JSON when the rest return plaintext errors?
   687  		response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
   688  		return
   689  	}
   690  
   691  	query := request.Request.URL.Query()
   692  	// backwards compatibility for the "tail" query parameter
   693  	if tail := request.QueryParameter("tail"); len(tail) > 0 {
   694  		query["tailLines"] = []string{tail}
   695  		// "all" is the same as omitting tail
   696  		if tail == "all" {
   697  			delete(query, "tailLines")
   698  		}
   699  	}
   700  	// container logs on the kubelet are locked to the v1 API version of PodLogOptions
   701  	logOptions := &v1.PodLogOptions{}
   702  	if err := legacyscheme.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
   703  		response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
   704  		return
   705  	}
   706  	logOptions.TypeMeta = metav1.TypeMeta{}
   707  	if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
   708  		response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
   709  		return
   710  	}
   711  
   712  	pod, ok := s.host.GetPodByName(podNamespace, podID)
   713  	if !ok {
   714  		response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID))
   715  		return
   716  	}
   717  	// Check if containerName is valid.
   718  	if kubecontainer.GetContainerSpec(pod, containerName) == nil {
   719  		response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID))
   720  		return
   721  	}
   722  
   723  	if _, ok := response.ResponseWriter.(http.Flusher); !ok {
   724  		response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
   725  		return
   726  	}
   727  	fw := flushwriter.Wrap(response.ResponseWriter)
   728  	response.Header().Set("Transfer-Encoding", "chunked")
   729  	if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
   730  		response.WriteError(http.StatusBadRequest, err)
   731  		return
   732  	}
   733  }
   734  
   735  // encodePods creates an v1.PodList object from pods and returns the encoded
   736  // PodList.
   737  func encodePods(pods []*v1.Pod) (data []byte, err error) {
   738  	podList := new(v1.PodList)
   739  	for _, pod := range pods {
   740  		podList.Items = append(podList.Items, *pod)
   741  	}
   742  	// TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet
   743  	//   as API server refactor.
   744  	// TODO: Locked to v1, needs to be made generic
   745  	codec := legacyscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"})
   746  	return runtime.Encode(codec, podList)
   747  }
   748  
   749  // getPods returns a list of pods bound to the Kubelet and their spec.
   750  func (s *Server) getPods(request *restful.Request, response *restful.Response) {
   751  	pods := s.host.GetPods()
   752  	data, err := encodePods(pods)
   753  	if err != nil {
   754  		response.WriteError(http.StatusInternalServerError, err)
   755  		return
   756  	}
   757  	writeJSONResponse(response, data)
   758  }
   759  
   760  // getRunningPods returns a list of pods running on Kubelet. The list is
   761  // provided by the container runtime, and is different from the list returned
   762  // by getPods, which is a set of desired pods to run.
   763  func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
   764  	ctx := request.Request.Context()
   765  	pods, err := s.host.GetRunningPods(ctx)
   766  	if err != nil {
   767  		response.WriteError(http.StatusInternalServerError, err)
   768  		return
   769  	}
   770  	data, err := encodePods(pods)
   771  	if err != nil {
   772  		response.WriteError(http.StatusInternalServerError, err)
   773  		return
   774  	}
   775  	writeJSONResponse(response, data)
   776  }
   777  
   778  // getLogs handles logs requests against the Kubelet.
   779  func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
   780  	s.host.ServeLogs(response, request.Request)
   781  }
   782  
   783  type execRequestParams struct {
   784  	podNamespace  string
   785  	podName       string
   786  	podUID        types.UID
   787  	containerName string
   788  	cmd           []string
   789  }
   790  
   791  func getExecRequestParams(req *restful.Request) execRequestParams {
   792  	return execRequestParams{
   793  		podNamespace:  req.PathParameter("podNamespace"),
   794  		podName:       req.PathParameter("podID"),
   795  		podUID:        types.UID(req.PathParameter("uid")),
   796  		containerName: req.PathParameter("containerName"),
   797  		cmd:           req.Request.URL.Query()[api.ExecCommandParam],
   798  	}
   799  }
   800  
   801  type portForwardRequestParams struct {
   802  	podNamespace string
   803  	podName      string
   804  	podUID       types.UID
   805  }
   806  
   807  func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams {
   808  	return portForwardRequestParams{
   809  		podNamespace: req.PathParameter("podNamespace"),
   810  		podName:      req.PathParameter("podID"),
   811  		podUID:       types.UID(req.PathParameter("uid")),
   812  	}
   813  }
   814  
   815  type responder struct{}
   816  
   817  func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
   818  	klog.ErrorS(err, "Error while proxying request")
   819  	http.Error(w, err.Error(), http.StatusInternalServerError)
   820  }
   821  
   822  // proxyStream proxies stream to url.
   823  func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
   824  	// TODO(random-liu): Set MaxBytesPerSec to throttle the stream.
   825  	handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{})
   826  	handler.ServeHTTP(w, r)
   827  }
   828  
   829  // getAttach handles requests to attach to a container.
   830  func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
   831  	params := getExecRequestParams(request)
   832  	streamOpts, err := remotecommandserver.NewOptions(request.Request)
   833  	if err != nil {
   834  		utilruntime.HandleError(err)
   835  		response.WriteError(http.StatusBadRequest, err)
   836  		return
   837  	}
   838  	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
   839  	if !ok {
   840  		response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
   841  		return
   842  	}
   843  
   844  	podFullName := kubecontainer.GetPodFullName(pod)
   845  	url, err := s.host.GetAttach(request.Request.Context(), podFullName, params.podUID, params.containerName, *streamOpts)
   846  	if err != nil {
   847  		streaming.WriteError(err, response.ResponseWriter)
   848  		return
   849  	}
   850  
   851  	proxyStream(response.ResponseWriter, request.Request, url)
   852  }
   853  
   854  // getExec handles requests to run a command inside a container.
   855  func (s *Server) getExec(request *restful.Request, response *restful.Response) {
   856  	params := getExecRequestParams(request)
   857  	streamOpts, err := remotecommandserver.NewOptions(request.Request)
   858  	if err != nil {
   859  		utilruntime.HandleError(err)
   860  		response.WriteError(http.StatusBadRequest, err)
   861  		return
   862  	}
   863  	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
   864  	if !ok {
   865  		response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
   866  		return
   867  	}
   868  
   869  	podFullName := kubecontainer.GetPodFullName(pod)
   870  	url, err := s.host.GetExec(request.Request.Context(), podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
   871  	if err != nil {
   872  		streaming.WriteError(err, response.ResponseWriter)
   873  		return
   874  	}
   875  	proxyStream(response.ResponseWriter, request.Request, url)
   876  }
   877  
   878  // getRun handles requests to run a command inside a container.
   879  func (s *Server) getRun(request *restful.Request, response *restful.Response) {
   880  	params := getExecRequestParams(request)
   881  	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
   882  	if !ok {
   883  		response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
   884  		return
   885  	}
   886  
   887  	// For legacy reasons, run uses different query param than exec.
   888  	params.cmd = strings.Split(request.QueryParameter("cmd"), " ")
   889  	data, err := s.host.RunInContainer(request.Request.Context(), kubecontainer.GetPodFullName(pod), params.podUID, params.containerName, params.cmd)
   890  	if err != nil {
   891  		response.WriteError(http.StatusInternalServerError, err)
   892  		return
   893  	}
   894  	writeJSONResponse(response, data)
   895  }
   896  
   897  // Derived from go-restful writeJSON.
   898  func writeJSONResponse(response *restful.Response, data []byte) {
   899  	if data == nil {
   900  		response.WriteHeader(http.StatusOK)
   901  		// do not write a nil representation
   902  		return
   903  	}
   904  	response.Header().Set(restful.HEADER_ContentType, restful.MIME_JSON)
   905  	response.WriteHeader(http.StatusOK)
   906  	if _, err := response.Write(data); err != nil {
   907  		klog.ErrorS(err, "Error writing response")
   908  	}
   909  }
   910  
   911  // getPortForward handles a new restful port forward request. It determines the
   912  // pod name and uid and then calls ServePortForward.
   913  func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
   914  	params := getPortForwardRequestParams(request)
   915  
   916  	portForwardOptions, err := portforward.NewV4Options(request.Request)
   917  	if err != nil {
   918  		utilruntime.HandleError(err)
   919  		response.WriteError(http.StatusBadRequest, err)
   920  		return
   921  	}
   922  	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
   923  	if !ok {
   924  		response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
   925  		return
   926  	}
   927  	if len(params.podUID) > 0 && pod.UID != params.podUID {
   928  		response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found"))
   929  		return
   930  	}
   931  
   932  	url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
   933  	if err != nil {
   934  		streaming.WriteError(err, response.ResponseWriter)
   935  		return
   936  	}
   937  	proxyStream(response.ResponseWriter, request.Request, url)
   938  }
   939  
   940  // checkpoint handles the checkpoint API request. It checks if the requested
   941  // podNamespace, pod and container actually exist and only then calls out
   942  // to the runtime to actually checkpoint the container.
   943  func (s *Server) checkpoint(request *restful.Request, response *restful.Response) {
   944  	ctx := request.Request.Context()
   945  	pod, ok := s.host.GetPodByName(request.PathParameter("podNamespace"), request.PathParameter("podID"))
   946  	if !ok {
   947  		response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
   948  		return
   949  	}
   950  
   951  	containerName := request.PathParameter("containerName")
   952  
   953  	found := false
   954  	for _, container := range pod.Spec.Containers {
   955  		if container.Name == containerName {
   956  			found = true
   957  			break
   958  		}
   959  	}
   960  	if !found {
   961  		for _, container := range pod.Spec.InitContainers {
   962  			if container.Name == containerName {
   963  				found = true
   964  				break
   965  			}
   966  		}
   967  	}
   968  	if !found {
   969  		for _, container := range pod.Spec.EphemeralContainers {
   970  			if container.Name == containerName {
   971  				found = true
   972  				break
   973  			}
   974  		}
   975  	}
   976  	if !found {
   977  		response.WriteError(
   978  			http.StatusNotFound,
   979  			fmt.Errorf("container %v does not exist", containerName),
   980  		)
   981  		return
   982  	}
   983  
   984  	options := &runtimeapi.CheckpointContainerRequest{}
   985  	// Query parameter to select an optional timeout. Without the timeout parameter
   986  	// the checkpoint command will use the default CRI timeout.
   987  	timeouts := request.Request.URL.Query()["timeout"]
   988  	if len(timeouts) > 0 {
   989  		// If the user specified one or multiple values for timeouts we
   990  		// are using the last available value.
   991  		timeout, err := strconv.ParseInt(timeouts[len(timeouts)-1], 10, 64)
   992  		if err != nil {
   993  			response.WriteError(
   994  				http.StatusNotFound,
   995  				fmt.Errorf("cannot parse value of timeout parameter"),
   996  			)
   997  			return
   998  		}
   999  		options.Timeout = timeout
  1000  	}
  1001  
  1002  	if err := s.host.CheckpointContainer(ctx, pod.UID, kubecontainer.GetPodFullName(pod), containerName, options); err != nil {
  1003  		response.WriteError(
  1004  			http.StatusInternalServerError,
  1005  			fmt.Errorf(
  1006  				"checkpointing of %v/%v/%v failed (%v)",
  1007  				request.PathParameter("podNamespace"),
  1008  				request.PathParameter("podID"),
  1009  				containerName,
  1010  				err,
  1011  			),
  1012  		)
  1013  		return
  1014  	}
  1015  	writeJSONResponse(
  1016  		response,
  1017  		[]byte(fmt.Sprintf("{\"items\":[\"%s\"]}", options.Location)),
  1018  	)
  1019  }
  1020  
  1021  // getURLRootPath trims a URL path.
  1022  // For paths in the format of "/metrics/xxx", "metrics/xxx" is returned;
  1023  // For all other paths, the first part of the path is returned.
  1024  func getURLRootPath(path string) string {
  1025  	parts := strings.SplitN(strings.TrimPrefix(path, "/"), "/", 3)
  1026  	if len(parts) == 0 {
  1027  		return path
  1028  	}
  1029  
  1030  	if parts[0] == "metrics" && len(parts) > 1 {
  1031  		return fmt.Sprintf("%s/%s", parts[0], parts[1])
  1032  
  1033  	}
  1034  	return parts[0]
  1035  }
  1036  
  1037  var longRunningRequestPathMap = map[string]bool{
  1038  	"exec":        true,
  1039  	"attach":      true,
  1040  	"portforward": true,
  1041  	"debug":       true,
  1042  }
  1043  
  1044  // isLongRunningRequest determines whether the request is long-running or not.
  1045  func isLongRunningRequest(path string) bool {
  1046  	_, ok := longRunningRequestPathMap[path]
  1047  	return ok
  1048  }
  1049  
  1050  var statusesNoTracePred = httplog.StatusIsNot(
  1051  	http.StatusOK,
  1052  	http.StatusFound,
  1053  	http.StatusMovedPermanently,
  1054  	http.StatusTemporaryRedirect,
  1055  	http.StatusBadRequest,
  1056  	http.StatusNotFound,
  1057  	http.StatusSwitchingProtocols,
  1058  )
  1059  
  1060  // ServeHTTP responds to HTTP requests on the Kubelet.
  1061  func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  1062  	handler := httplog.WithLogging(s.restfulCont, statusesNoTracePred)
  1063  
  1064  	// monitor http requests
  1065  	var serverType string
  1066  	if s.auth == nil {
  1067  		serverType = "readonly"
  1068  	} else {
  1069  		serverType = "readwrite"
  1070  	}
  1071  
  1072  	method, path := s.getMetricMethodBucket(req.Method), s.getMetricBucket(req.URL.Path)
  1073  
  1074  	longRunning := strconv.FormatBool(isLongRunningRequest(path))
  1075  
  1076  	servermetrics.HTTPRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
  1077  
  1078  	servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Inc()
  1079  	defer servermetrics.HTTPInflightRequests.WithLabelValues(method, path, serverType, longRunning).Dec()
  1080  
  1081  	startTime := time.Now()
  1082  	defer servermetrics.HTTPRequestsDuration.WithLabelValues(method, path, serverType, longRunning).Observe(servermetrics.SinceInSeconds(startTime))
  1083  
  1084  	handler.ServeHTTP(w, req)
  1085  }
  1086  
  1087  // prometheusHostAdapter adapts the HostInterface to the interface expected by the
  1088  // cAdvisor prometheus collector.
  1089  type prometheusHostAdapter struct {
  1090  	host HostInterface
  1091  }
  1092  
  1093  func (a prometheusHostAdapter) GetRequestedContainersInfo(containerName string, options cadvisorv2.RequestOptions) (map[string]*cadvisorapi.ContainerInfo, error) {
  1094  	return a.host.GetRequestedContainersInfo(containerName, options)
  1095  }
  1096  func (a prometheusHostAdapter) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
  1097  	return a.host.GetVersionInfo()
  1098  }
  1099  func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error) {
  1100  	return a.host.GetCachedMachineInfo()
  1101  }
  1102  
  1103  func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc {
  1104  	// containerPrometheusLabels maps cAdvisor labels to prometheus labels.
  1105  	return func(c *cadvisorapi.ContainerInfo) map[string]string {
  1106  		// Prometheus requires that all metrics in the same family have the same labels,
  1107  		// so we arrange to supply blank strings for missing labels
  1108  		var name, image, podName, namespace, containerName string
  1109  		if len(c.Aliases) > 0 {
  1110  			name = c.Aliases[0]
  1111  		}
  1112  		image = c.Spec.Image
  1113  		if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNameLabel]; ok {
  1114  			podName = v
  1115  		}
  1116  		if v, ok := c.Spec.Labels[kubelettypes.KubernetesPodNamespaceLabel]; ok {
  1117  			namespace = v
  1118  		}
  1119  		if v, ok := c.Spec.Labels[kubelettypes.KubernetesContainerNameLabel]; ok {
  1120  			containerName = v
  1121  		}
  1122  		// Associate pod cgroup with pod so we have an accurate accounting of sandbox
  1123  		if podName == "" && namespace == "" {
  1124  			if pod, found := s.GetPodByCgroupfs(c.Name); found {
  1125  				podName = pod.Name
  1126  				namespace = pod.Namespace
  1127  			}
  1128  		}
  1129  		set := map[string]string{
  1130  			metrics.LabelID:    c.Name,
  1131  			metrics.LabelName:  name,
  1132  			metrics.LabelImage: image,
  1133  			"pod":              podName,
  1134  			"namespace":        namespace,
  1135  			"container":        containerName,
  1136  		}
  1137  		return set
  1138  	}
  1139  }
  1140  

View as plain text