1
16
17 package aggregator
18
19 import (
20 "crypto/sha512"
21 "fmt"
22 "net/http"
23 "strings"
24 "sync/atomic"
25
26 "k8s.io/apiserver/pkg/authentication/user"
27 "k8s.io/apiserver/pkg/endpoints/request"
28 "k8s.io/kube-openapi/pkg/validation/spec"
29 )
30
31 type CacheableDownloader interface {
32 UpdateHandler(http.Handler)
33 Get() (*spec.Swagger, string, error)
34 }
35
36
37
38 type cacheableDownloader struct {
39 name string
40 downloader *Downloader
41
42 handler atomic.Pointer[http.Handler]
43 etag string
44 spec *spec.Swagger
45 }
46
47
48 func NewCacheableDownloader(apiServiceName string, downloader *Downloader, handler http.Handler) CacheableDownloader {
49 c := &cacheableDownloader{
50 name: apiServiceName,
51 downloader: downloader,
52 }
53 c.handler.Store(&handler)
54 return c
55 }
56 func (d *cacheableDownloader) UpdateHandler(handler http.Handler) {
57 d.handler.Store(&handler)
58 }
59
60 func (d *cacheableDownloader) Get() (*spec.Swagger, string, error) {
61 spec, etag, err := d.get()
62 if err != nil {
63 return spec, etag, fmt.Errorf("failed to download %v: %v", d.name, err)
64 }
65 return spec, etag, err
66 }
67
68 func (d *cacheableDownloader) get() (*spec.Swagger, string, error) {
69 h := *d.handler.Load()
70 swagger, etag, status, err := d.downloader.Download(h, d.etag)
71 if err != nil {
72 return nil, "", err
73 }
74 switch status {
75 case http.StatusNotModified:
76
77 case http.StatusOK:
78 if swagger != nil {
79 d.etag = etag
80 d.spec = swagger
81 break
82 }
83 fallthrough
84 case http.StatusNotFound:
85 return nil, "", ErrAPIServiceNotFound
86 default:
87 return nil, "", fmt.Errorf("invalid status code: %v", status)
88 }
89 return d.spec, d.etag, nil
90 }
91
92
93 type Downloader struct {
94 }
95
96
97 func NewDownloader() Downloader {
98 return Downloader{}
99 }
100
101 func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
102 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
103 req = req.WithContext(request.WithUser(req.Context(), info))
104 handler.ServeHTTP(w, req)
105 })
106 }
107
108 func etagFor(data []byte) string {
109 return fmt.Sprintf("%s%X\"", locallyGeneratedEtagPrefix, sha512.Sum512(data))
110 }
111
112
113
114 func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *spec.Swagger, newEtag string, httpStatus int, err error) {
115 handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
116 handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
117
118 req, err := http.NewRequest("GET", "/openapi/v2", nil)
119 if err != nil {
120 return nil, "", 0, err
121 }
122 req.Header.Add("Accept", "application/json")
123
124
125 if len(etag) > 0 && !strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
126 req.Header.Add("If-None-Match", etag)
127 }
128
129 writer := newInMemoryResponseWriter()
130 handler.ServeHTTP(writer, req)
131
132 switch writer.respCode {
133 case http.StatusNotModified:
134 if len(etag) == 0 {
135 return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag")
136 }
137 return nil, etag, http.StatusNotModified, nil
138 case http.StatusNotFound:
139
140 return nil, "", http.StatusNotFound, nil
141 case http.StatusOK:
142 openAPISpec := &spec.Swagger{}
143 if err := openAPISpec.UnmarshalJSON(writer.data); err != nil {
144 return nil, "", 0, err
145 }
146 newEtag = writer.Header().Get("Etag")
147 if len(newEtag) == 0 {
148 newEtag = etagFor(writer.data)
149 if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
150
151
152
153
154 if etag == newEtag {
155 return nil, etag, http.StatusNotModified, nil
156 }
157 }
158 }
159 return openAPISpec, newEtag, http.StatusOK, nil
160 default:
161 return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
162 }
163 }
164
165
166 type inMemoryResponseWriter struct {
167 writeHeaderCalled bool
168 header http.Header
169 respCode int
170 data []byte
171 }
172
173 func newInMemoryResponseWriter() *inMemoryResponseWriter {
174 return &inMemoryResponseWriter{header: http.Header{}}
175 }
176
177 func (r *inMemoryResponseWriter) Header() http.Header {
178 return r.header
179 }
180
181 func (r *inMemoryResponseWriter) WriteHeader(code int) {
182 r.writeHeaderCalled = true
183 r.respCode = code
184 }
185
186 func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
187 if !r.writeHeaderCalled {
188 r.WriteHeader(http.StatusOK)
189 }
190 r.data = append(r.data, in...)
191 return len(in), nil
192 }
193
194 func (r *inMemoryResponseWriter) String() string {
195 s := fmt.Sprintf("ResponseCode: %d", r.respCode)
196 if r.data != nil {
197 s += fmt.Sprintf(", Body: %s", string(r.data))
198 }
199 if r.header != nil {
200 s += fmt.Sprintf(", Header: %s", r.header)
201 }
202 return s
203 }
204
View as plain text