...

Source file src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go

Documentation: k8s.io/kube-aggregator/pkg/apiserver

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"net/http"
    21  	"net/url"
    22  	"sync/atomic"
    23  
    24  	"k8s.io/apimachinery/pkg/runtime"
    25  	"k8s.io/apimachinery/pkg/util/httpstream"
    26  	"k8s.io/apimachinery/pkg/util/proxy"
    27  	"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
    28  	endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
    29  	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    30  	utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
    31  	apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
    32  	"k8s.io/apiserver/pkg/util/x509metrics"
    33  	"k8s.io/client-go/transport"
    34  	"k8s.io/klog/v2"
    35  	apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    36  	apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
    37  )
    38  
    39  const (
    40  	aggregatorComponent string = "aggregator"
    41  )
    42  
    43  type certKeyFunc func() ([]byte, []byte)
    44  
    45  // proxyHandler provides a http.Handler which will proxy traffic to locations
    46  // specified by items implementing Redirector.
    47  type proxyHandler struct {
    48  	// localDelegate is used to satisfy local APIServices
    49  	localDelegate http.Handler
    50  
    51  	// proxyCurrentCertKeyContent holds the client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
    52  	proxyCurrentCertKeyContent certKeyFunc
    53  	proxyTransportDial         *transport.DialHolder
    54  
    55  	// Endpoints based routing to map from cluster IP to routable IP
    56  	serviceResolver ServiceResolver
    57  
    58  	handlingInfo atomic.Value
    59  
    60  	// reject to forward redirect response
    61  	rejectForwardingRedirects bool
    62  }
    63  
    64  type proxyHandlingInfo struct {
    65  	// local indicates that this APIService is locally satisfied
    66  	local bool
    67  
    68  	// name is the name of the APIService
    69  	name string
    70  	// transportConfig holds the information for building a roundtripper
    71  	transportConfig *transport.Config
    72  	// transportBuildingError is an error produced while building the transport.  If this
    73  	// is non-nil, it will be reported to clients.
    74  	transportBuildingError error
    75  	// proxyRoundTripper is the re-useable portion of the transport.  It does not vary with any request.
    76  	proxyRoundTripper http.RoundTripper
    77  	// serviceName is the name of the service this handler proxies to
    78  	serviceName string
    79  	// namespace is the namespace the service lives in
    80  	serviceNamespace string
    81  	// serviceAvailable indicates this APIService is available or not
    82  	serviceAvailable bool
    83  	// servicePort is the port of the service this handler proxies to
    84  	servicePort int32
    85  }
    86  
    87  func proxyError(w http.ResponseWriter, req *http.Request, error string, code int) {
    88  	http.Error(w, error, code)
    89  
    90  	ctx := req.Context()
    91  	info, ok := genericapirequest.RequestInfoFrom(ctx)
    92  	if !ok {
    93  		klog.Warning("no RequestInfo found in the context")
    94  		return
    95  	}
    96  	// TODO: record long-running request differently? The long-running check func does not necessarily match the one of the aggregated apiserver
    97  	endpointmetrics.RecordRequestTermination(req, info, aggregatorComponent, code)
    98  }
    99  
   100  func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
   101  	value := r.handlingInfo.Load()
   102  	if value == nil {
   103  		r.localDelegate.ServeHTTP(w, req)
   104  		return
   105  	}
   106  	handlingInfo := value.(proxyHandlingInfo)
   107  	if handlingInfo.local {
   108  		if r.localDelegate == nil {
   109  			http.Error(w, "", http.StatusNotFound)
   110  			return
   111  		}
   112  		r.localDelegate.ServeHTTP(w, req)
   113  		return
   114  	}
   115  
   116  	if !handlingInfo.serviceAvailable {
   117  		proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
   118  		return
   119  	}
   120  
   121  	if handlingInfo.transportBuildingError != nil {
   122  		proxyError(w, req, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError)
   123  		return
   124  	}
   125  
   126  	user, ok := genericapirequest.UserFrom(req.Context())
   127  	if !ok {
   128  		proxyError(w, req, "missing user", http.StatusInternalServerError)
   129  		return
   130  	}
   131  
   132  	// write a new location based on the existing request pointed at the target service
   133  	location := &url.URL{}
   134  	location.Scheme = "https"
   135  	rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
   136  	if err != nil {
   137  		klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
   138  		proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
   139  		return
   140  	}
   141  	location.Host = rloc.Host
   142  	location.Path = req.URL.Path
   143  	location.RawQuery = req.URL.Query().Encode()
   144  
   145  	newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, req)
   146  	defer cancelFn()
   147  
   148  	if handlingInfo.proxyRoundTripper == nil {
   149  		proxyError(w, req, "", http.StatusNotFound)
   150  		return
   151  	}
   152  
   153  	proxyRoundTripper := handlingInfo.proxyRoundTripper
   154  	upgrade := httpstream.IsUpgradeRequest(req)
   155  
   156  	proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
   157  
   158  	// If we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does
   159  	// NOT use the proxyRoundTripper.  It's a direct dial that bypasses the proxyRoundTripper.  This means that we have to
   160  	// attach the "correct" user headers to the request ahead of time.
   161  	if upgrade {
   162  		transport.SetAuthProxyHeaders(newReq, user.GetName(), user.GetGroups(), user.GetExtra())
   163  	}
   164  
   165  	handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
   166  	if r.rejectForwardingRedirects {
   167  		handler.RejectForwardingRedirects = true
   168  	}
   169  	utilflowcontrol.RequestDelegated(req.Context())
   170  	handler.ServeHTTP(w, newReq)
   171  }
   172  
   173  // responder implements rest.Responder for assisting a connector in writing objects or errors.
   174  type responder struct {
   175  	w http.ResponseWriter
   176  }
   177  
   178  // TODO this should properly handle content type negotiation
   179  // if the caller asked for protobuf and you write JSON bad things happen.
   180  func (r *responder) Object(statusCode int, obj runtime.Object) {
   181  	responsewriters.WriteRawJSON(statusCode, obj, r.w)
   182  }
   183  
   184  func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {
   185  	http.Error(r.w, err.Error(), http.StatusServiceUnavailable)
   186  }
   187  
   188  // these methods provide locked access to fields
   189  
   190  // Sets serviceAvailable value on proxyHandler
   191  // not thread safe
   192  func (r *proxyHandler) setServiceAvailable() {
   193  	info := r.handlingInfo.Load().(proxyHandlingInfo)
   194  	info.serviceAvailable = true
   195  	r.handlingInfo.Store(info)
   196  }
   197  
   198  func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) {
   199  	if apiService.Spec.Service == nil {
   200  		r.handlingInfo.Store(proxyHandlingInfo{local: true})
   201  		return
   202  	}
   203  
   204  	proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()
   205  
   206  	transportConfig := &transport.Config{
   207  		TLS: transport.TLSConfig{
   208  			Insecure:   apiService.Spec.InsecureSkipTLSVerify,
   209  			ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
   210  			CertData:   proxyClientCert,
   211  			KeyData:    proxyClientKey,
   212  			CAData:     apiService.Spec.CABundle,
   213  		},
   214  		DialHolder: r.proxyTransportDial,
   215  	}
   216  	transportConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
   217  		x509MissingSANCounter,
   218  		x509InsecureSHA1Counter,
   219  	))
   220  
   221  	newInfo := proxyHandlingInfo{
   222  		name:             apiService.Name,
   223  		transportConfig:  transportConfig,
   224  		serviceName:      apiService.Spec.Service.Name,
   225  		serviceNamespace: apiService.Spec.Service.Namespace,
   226  		servicePort:      *apiService.Spec.Service.Port,
   227  		serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
   228  	}
   229  	newInfo.proxyRoundTripper, newInfo.transportBuildingError = transport.New(newInfo.transportConfig)
   230  	if newInfo.transportBuildingError != nil {
   231  		klog.Warning(newInfo.transportBuildingError.Error())
   232  	}
   233  	r.handlingInfo.Store(newInfo)
   234  }
   235  

View as plain text