1
16
17 package handler3
18
19 import (
20 "bytes"
21 "crypto/sha512"
22 "encoding/json"
23 "fmt"
24 "net/http"
25 "net/url"
26 "path"
27 "strconv"
28 "strings"
29 "sync"
30 "time"
31
32 "github.com/golang/protobuf/proto"
33 openapi_v3 "github.com/google/gnostic-models/openapiv3"
34 "github.com/google/uuid"
35 "github.com/munnerz/goautoneg"
36
37 "k8s.io/klog/v2"
38 "k8s.io/kube-openapi/pkg/cached"
39 "k8s.io/kube-openapi/pkg/common"
40 "k8s.io/kube-openapi/pkg/spec3"
41 )
42
43 const (
44 subTypeProtobufDeprecated = "com.github.proto-openapi.spec.v3@v1.0+protobuf"
45 subTypeProtobuf = "com.github.proto-openapi.spec.v3.v1.0+protobuf"
46 subTypeJSON = "json"
47 )
48
49
50
51 type OpenAPIV3Discovery struct {
52 Paths map[string]OpenAPIV3DiscoveryGroupVersion `json:"paths"`
53 }
54
55
56
57 type OpenAPIV3DiscoveryGroupVersion struct {
58
59 ServerRelativeURL string `json:"serverRelativeURL"`
60 }
61
62 func ToV3ProtoBinary(json []byte) ([]byte, error) {
63 document, err := openapi_v3.ParseDocument(json)
64 if err != nil {
65 return nil, err
66 }
67 return proto.Marshal(document)
68 }
69
70 type timedSpec struct {
71 spec []byte
72 lastModified time.Time
73 }
74
75
76 type openAPIV3Group struct {
77 specCache cached.LastSuccess[*spec3.OpenAPI]
78 pbCache cached.Value[timedSpec]
79 jsonCache cached.Value[timedSpec]
80 }
81
82 func newOpenAPIV3Group() *openAPIV3Group {
83 o := &openAPIV3Group{}
84 o.jsonCache = cached.Transform[*spec3.OpenAPI](func(spec *spec3.OpenAPI, etag string, err error) (timedSpec, string, error) {
85 if err != nil {
86 return timedSpec{}, "", err
87 }
88 json, err := json.Marshal(spec)
89 if err != nil {
90 return timedSpec{}, "", err
91 }
92 return timedSpec{spec: json, lastModified: time.Now()}, computeETag(json), nil
93 }, &o.specCache)
94 o.pbCache = cached.Transform(func(ts timedSpec, etag string, err error) (timedSpec, string, error) {
95 if err != nil {
96 return timedSpec{}, "", err
97 }
98 proto, err := ToV3ProtoBinary(ts.spec)
99 if err != nil {
100 return timedSpec{}, "", err
101 }
102 return timedSpec{spec: proto, lastModified: ts.lastModified}, etag, nil
103 }, o.jsonCache)
104 return o
105 }
106
107 func (o *openAPIV3Group) UpdateSpec(openapi cached.Value[*spec3.OpenAPI]) {
108 o.specCache.Store(openapi)
109 }
110
111
112
113 type OpenAPIService struct {
114
115 mutex sync.Mutex
116 v3Schema map[string]*openAPIV3Group
117
118 discoveryCache cached.LastSuccess[timedSpec]
119 }
120
121 func computeETag(data []byte) string {
122 if data == nil {
123 return ""
124 }
125 return fmt.Sprintf("%X", sha512.Sum512(data))
126 }
127
128 func constructServerRelativeURL(gvString, etag string) string {
129 u := url.URL{Path: path.Join("/openapi/v3", gvString)}
130 query := url.Values{}
131 query.Set("hash", etag)
132 u.RawQuery = query.Encode()
133 return u.String()
134 }
135
136
137 func NewOpenAPIService() *OpenAPIService {
138 o := &OpenAPIService{}
139 o.v3Schema = make(map[string]*openAPIV3Group)
140
141 o.discoveryCache.Store(o.buildDiscoveryCacheLocked())
142 return o
143 }
144
145 func (o *OpenAPIService) buildDiscoveryCacheLocked() cached.Value[timedSpec] {
146 caches := make(map[string]cached.Value[timedSpec], len(o.v3Schema))
147 for gvName, group := range o.v3Schema {
148 caches[gvName] = group.jsonCache
149 }
150 return cached.Merge(func(results map[string]cached.Result[timedSpec]) (timedSpec, string, error) {
151 discovery := &OpenAPIV3Discovery{Paths: make(map[string]OpenAPIV3DiscoveryGroupVersion)}
152 for gvName, result := range results {
153 if result.Err != nil {
154 return timedSpec{}, "", result.Err
155 }
156 discovery.Paths[gvName] = OpenAPIV3DiscoveryGroupVersion{
157 ServerRelativeURL: constructServerRelativeURL(gvName, result.Etag),
158 }
159 }
160 j, err := json.Marshal(discovery)
161 if err != nil {
162 return timedSpec{}, "", err
163 }
164 return timedSpec{spec: j, lastModified: time.Now()}, computeETag(j), nil
165 }, caches)
166 }
167
168 func (o *OpenAPIService) getSingleGroupBytes(getType string, group string) ([]byte, string, time.Time, error) {
169 o.mutex.Lock()
170 defer o.mutex.Unlock()
171 v, ok := o.v3Schema[group]
172 if !ok {
173 return nil, "", time.Now(), fmt.Errorf("Cannot find CRD group %s", group)
174 }
175 switch getType {
176 case subTypeJSON:
177 ts, etag, err := v.jsonCache.Get()
178 return ts.spec, etag, ts.lastModified, err
179 case subTypeProtobuf, subTypeProtobufDeprecated:
180 ts, etag, err := v.pbCache.Get()
181 return ts.spec, etag, ts.lastModified, err
182 default:
183 return nil, "", time.Now(), fmt.Errorf("Invalid accept clause %s", getType)
184 }
185 }
186
187
188 func (o *OpenAPIService) UpdateGroupVersionLazy(group string, openapi cached.Value[*spec3.OpenAPI]) {
189 o.mutex.Lock()
190 defer o.mutex.Unlock()
191 if _, ok := o.v3Schema[group]; !ok {
192 o.v3Schema[group] = newOpenAPIV3Group()
193
194 o.discoveryCache.Store(o.buildDiscoveryCacheLocked())
195 }
196 o.v3Schema[group].UpdateSpec(openapi)
197 }
198
199 func (o *OpenAPIService) UpdateGroupVersion(group string, openapi *spec3.OpenAPI) {
200 o.UpdateGroupVersionLazy(group, cached.Static(openapi, uuid.New().String()))
201 }
202
203 func (o *OpenAPIService) DeleteGroupVersion(group string) {
204 o.mutex.Lock()
205 defer o.mutex.Unlock()
206 delete(o.v3Schema, group)
207
208 o.discoveryCache.Store(o.buildDiscoveryCacheLocked())
209 }
210
211 func (o *OpenAPIService) HandleDiscovery(w http.ResponseWriter, r *http.Request) {
212 ts, etag, err := o.discoveryCache.Get()
213 if err != nil {
214 klog.Errorf("Error serving discovery: %s", err)
215 w.WriteHeader(http.StatusInternalServerError)
216 return
217 }
218 w.Header().Set("Etag", strconv.Quote(etag))
219 w.Header().Set("Content-Type", "application/json")
220 http.ServeContent(w, r, "/openapi/v3", ts.lastModified, bytes.NewReader(ts.spec))
221 }
222
223 func (o *OpenAPIService) HandleGroupVersion(w http.ResponseWriter, r *http.Request) {
224 url := strings.SplitAfterN(r.URL.Path, "/", 4)
225 group := url[3]
226
227 decipherableFormats := r.Header.Get("Accept")
228 if decipherableFormats == "" {
229 decipherableFormats = "*/*"
230 }
231 clauses := goautoneg.ParseAccept(decipherableFormats)
232 w.Header().Add("Vary", "Accept")
233
234 if len(clauses) == 0 {
235 return
236 }
237
238 accepted := []struct {
239 Type string
240 SubType string
241 ReturnedContentType string
242 }{
243 {"application", subTypeJSON, "application/" + subTypeJSON},
244 {"application", subTypeProtobuf, "application/" + subTypeProtobuf},
245 {"application", subTypeProtobufDeprecated, "application/" + subTypeProtobuf},
246 }
247
248 for _, clause := range clauses {
249 for _, accepts := range accepted {
250 if clause.Type != accepts.Type && clause.Type != "*" {
251 continue
252 }
253 if clause.SubType != accepts.SubType && clause.SubType != "*" {
254 continue
255 }
256 data, etag, lastModified, err := o.getSingleGroupBytes(accepts.SubType, group)
257 if err != nil {
258 return
259 }
260
261 w.Header().Set("Content-Type", accepts.ReturnedContentType)
262
263
264 w.Header().Set("Etag", strconv.Quote(etag))
265
266 if hash := r.URL.Query().Get("hash"); hash != "" {
267 if hash != etag {
268 u := constructServerRelativeURL(group, etag)
269 http.Redirect(w, r, u, 301)
270 return
271 }
272
273
274
275 w.Header().Set("Vary", "Accept")
276
277
278 w.Header().Set("Cache-Control", "public, immutable")
279
280
281 w.Header().Set("Expires", time.Now().AddDate(1, 0, 0).Format(time.RFC1123))
282 }
283 http.ServeContent(w, r, "", lastModified, bytes.NewReader(data))
284 return
285 }
286 }
287 w.WriteHeader(406)
288 return
289 }
290
291 func (o *OpenAPIService) RegisterOpenAPIV3VersionedService(servePath string, handler common.PathHandlerByGroupVersion) error {
292 handler.Handle(servePath, http.HandlerFunc(o.HandleDiscovery))
293 handler.HandlePrefix(servePath+"/", http.HandlerFunc(o.HandleGroupVersion))
294 return nil
295 }
296
View as plain text