...

Source file src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go

Documentation: k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package aggregator
    18  
    19  import (
    20  	"crypto/sha512"
    21  	"fmt"
    22  	"net/http"
    23  	"strings"
    24  	"sync/atomic"
    25  
    26  	"k8s.io/apiserver/pkg/authentication/user"
    27  	"k8s.io/apiserver/pkg/endpoints/request"
    28  	"k8s.io/kube-openapi/pkg/validation/spec"
    29  )
    30  
    31  type CacheableDownloader interface {
    32  	UpdateHandler(http.Handler)
    33  	Get() (*spec.Swagger, string, error)
    34  }
    35  
    36  // cacheableDownloader is a downloader that will always return the data
    37  // and the etag.
    38  type cacheableDownloader struct {
    39  	name       string
    40  	downloader *Downloader
    41  	// handler is the http Handler for the apiservice that can be replaced
    42  	handler atomic.Pointer[http.Handler]
    43  	etag    string
    44  	spec    *spec.Swagger
    45  }
    46  
    47  // NewCacheableDownloader creates a downloader that also returns the etag, making it useful to use as a cached dependency.
    48  func NewCacheableDownloader(apiServiceName string, downloader *Downloader, handler http.Handler) CacheableDownloader {
    49  	c := &cacheableDownloader{
    50  		name:       apiServiceName,
    51  		downloader: downloader,
    52  	}
    53  	c.handler.Store(&handler)
    54  	return c
    55  }
    56  func (d *cacheableDownloader) UpdateHandler(handler http.Handler) {
    57  	d.handler.Store(&handler)
    58  }
    59  
    60  func (d *cacheableDownloader) Get() (*spec.Swagger, string, error) {
    61  	spec, etag, err := d.get()
    62  	if err != nil {
    63  		return spec, etag, fmt.Errorf("failed to download %v: %v", d.name, err)
    64  	}
    65  	return spec, etag, err
    66  }
    67  
    68  func (d *cacheableDownloader) get() (*spec.Swagger, string, error) {
    69  	h := *d.handler.Load()
    70  	swagger, etag, status, err := d.downloader.Download(h, d.etag)
    71  	if err != nil {
    72  		return nil, "", err
    73  	}
    74  	switch status {
    75  	case http.StatusNotModified:
    76  		// Nothing has changed, do nothing.
    77  	case http.StatusOK:
    78  		if swagger != nil {
    79  			d.etag = etag
    80  			d.spec = swagger
    81  			break
    82  		}
    83  		fallthrough
    84  	case http.StatusNotFound:
    85  		return nil, "", ErrAPIServiceNotFound
    86  	default:
    87  		return nil, "", fmt.Errorf("invalid status code: %v", status)
    88  	}
    89  	return d.spec, d.etag, nil
    90  }
    91  
    92  // Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v2 or /swagger.json endpoint.
    93  type Downloader struct {
    94  }
    95  
    96  // NewDownloader creates a new OpenAPI Downloader.
    97  func NewDownloader() Downloader {
    98  	return Downloader{}
    99  }
   100  
   101  func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
   102  	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
   103  		req = req.WithContext(request.WithUser(req.Context(), info))
   104  		handler.ServeHTTP(w, req)
   105  	})
   106  }
   107  
   108  func etagFor(data []byte) string {
   109  	return fmt.Sprintf("%s%X\"", locallyGeneratedEtagPrefix, sha512.Sum512(data))
   110  }
   111  
   112  // Download downloads openAPI spec from /openapi/v2 endpoint of the given handler.
   113  // httpStatus is only valid if err == nil
   114  func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *spec.Swagger, newEtag string, httpStatus int, err error) {
   115  	handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
   116  	handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
   117  
   118  	req, err := http.NewRequest("GET", "/openapi/v2", nil)
   119  	if err != nil {
   120  		return nil, "", 0, err
   121  	}
   122  	req.Header.Add("Accept", "application/json")
   123  
   124  	// Only pass eTag if it is not generated locally
   125  	if len(etag) > 0 && !strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
   126  		req.Header.Add("If-None-Match", etag)
   127  	}
   128  
   129  	writer := newInMemoryResponseWriter()
   130  	handler.ServeHTTP(writer, req)
   131  
   132  	switch writer.respCode {
   133  	case http.StatusNotModified:
   134  		if len(etag) == 0 {
   135  			return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag")
   136  		}
   137  		return nil, etag, http.StatusNotModified, nil
   138  	case http.StatusNotFound:
   139  		// Gracefully skip 404, assuming the server won't provide any spec
   140  		return nil, "", http.StatusNotFound, nil
   141  	case http.StatusOK:
   142  		openAPISpec := &spec.Swagger{}
   143  		if err := openAPISpec.UnmarshalJSON(writer.data); err != nil {
   144  			return nil, "", 0, err
   145  		}
   146  		newEtag = writer.Header().Get("Etag")
   147  		if len(newEtag) == 0 {
   148  			newEtag = etagFor(writer.data)
   149  			if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
   150  				// The function call with an etag and server does not report an etag.
   151  				// That means this server does not support etag and the etag that passed
   152  				// to the function generated previously by us. Just compare etags and
   153  				// return StatusNotModified if they are the same.
   154  				if etag == newEtag {
   155  					return nil, etag, http.StatusNotModified, nil
   156  				}
   157  			}
   158  		}
   159  		return openAPISpec, newEtag, http.StatusOK, nil
   160  	default:
   161  		return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
   162  	}
   163  }
   164  
   165  // inMemoryResponseWriter is a http.Writer that keep the response in memory.
   166  type inMemoryResponseWriter struct {
   167  	writeHeaderCalled bool
   168  	header            http.Header
   169  	respCode          int
   170  	data              []byte
   171  }
   172  
   173  func newInMemoryResponseWriter() *inMemoryResponseWriter {
   174  	return &inMemoryResponseWriter{header: http.Header{}}
   175  }
   176  
   177  func (r *inMemoryResponseWriter) Header() http.Header {
   178  	return r.header
   179  }
   180  
   181  func (r *inMemoryResponseWriter) WriteHeader(code int) {
   182  	r.writeHeaderCalled = true
   183  	r.respCode = code
   184  }
   185  
   186  func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
   187  	if !r.writeHeaderCalled {
   188  		r.WriteHeader(http.StatusOK)
   189  	}
   190  	r.data = append(r.data, in...)
   191  	return len(in), nil
   192  }
   193  
   194  func (r *inMemoryResponseWriter) String() string {
   195  	s := fmt.Sprintf("ResponseCode: %d", r.respCode)
   196  	if r.data != nil {
   197  		s += fmt.Sprintf(", Body: %s", string(r.data))
   198  	}
   199  	if r.header != nil {
   200  		s += fmt.Sprintf(", Header: %s", r.header)
   201  	}
   202  	return s
   203  }
   204  

View as plain text