1
16
17 package aggregator
18
19 import (
20 "crypto/sha256"
21 "errors"
22 "fmt"
23 "net/http"
24 "sync"
25 "time"
26
27 restful "github.com/emicklei/go-restful/v3"
28
29 "k8s.io/apiserver/pkg/server"
30 "k8s.io/klog/v2"
31 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
32 "k8s.io/kube-openapi/pkg/aggregator"
33 "k8s.io/kube-openapi/pkg/builder"
34 "k8s.io/kube-openapi/pkg/cached"
35 "k8s.io/kube-openapi/pkg/common"
36 "k8s.io/kube-openapi/pkg/common/restfuladapter"
37 "k8s.io/kube-openapi/pkg/handler"
38 "k8s.io/kube-openapi/pkg/validation/spec"
39 )
40
41 var ErrAPIServiceNotFound = errors.New("resource not found")
42
43
44
45 type SpecAggregator interface {
46 AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error
47
48 UpdateAPIServiceSpec(apiServiceName string) error
49 RemoveAPIService(apiServiceName string)
50 }
51
52 const (
53 aggregatorUser = "system:aggregator"
54 specDownloadTimeout = time.Minute
55 localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d"
56
57
58 locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
59 )
60
61
62
63 type openAPISpecInfo struct {
64 apiService v1.APIService
65
66 spec cached.LastSuccess[*spec.Swagger]
67
68
69
70
71
72 downloader CacheableDownloader
73 }
74
75 type specAggregator struct {
76
77 mutex sync.Mutex
78
79
80 specsByAPIServiceName map[string]*openAPISpecInfo
81
82
83 openAPIVersionedService *handler.OpenAPIService
84
85 downloader *Downloader
86 }
87
88 func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggregatorSpec *spec.Swagger, delegationHandlers []http.Handler, pathHandler common.PathHandler) *specAggregator {
89 s := &specAggregator{
90 downloader: downloader,
91 specsByAPIServiceName: map[string]*openAPISpecInfo{},
92 }
93 cachedAggregatorSpec := cached.Static(aggregatorSpec, "never-changes")
94 s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec)
95 for i, handler := range delegationHandlers {
96 name := fmt.Sprintf(localDelegateChainNamePattern, i+1)
97
98 spec := NewCacheableDownloader(name, downloader, handler)
99 s.addLocalSpec(name, spec)
100 }
101
102 s.openAPIVersionedService = handler.NewOpenAPIServiceLazy(s.buildMergeSpecLocked())
103 s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler)
104 return s
105 }
106
107
108 func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
109 config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) {
110
111 aggregatorOpenAPISpec, err := builder.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(webServices), config)
112 if err != nil {
113 return nil, err
114 }
115 aggregatorOpenAPISpec.Definitions = handler.PruneDefaults(aggregatorOpenAPISpec.Definitions)
116
117 var delegationHandlers []http.Handler
118
119 for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
120 handler := delegate.UnprotectedHandler()
121 if handler == nil {
122 continue
123 }
124
125
126
127 if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil {
128 continue
129 }
130 delegationHandlers = append(delegationHandlers, handler)
131 }
132 s := buildAndRegisterSpecAggregatorForLocalServices(downloader, aggregatorOpenAPISpec, delegationHandlers, pathHandler)
133 return s, nil
134 }
135
136 func (s *specAggregator) addLocalSpec(name string, cachedSpec cached.Value[*spec.Swagger]) {
137 service := v1.APIService{}
138 service.Name = name
139 info := &openAPISpecInfo{
140 apiService: service,
141 }
142 info.spec.Store(cachedSpec)
143 s.specsByAPIServiceName[name] = info
144 }
145
146
147 func (s *specAggregator) buildMergeSpecLocked() cached.Value[*spec.Swagger] {
148 apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName))
149 for k := range s.specsByAPIServiceName {
150 apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService)
151 }
152 sortByPriority(apiServices)
153 caches := make([]cached.Value[*spec.Swagger], len(apiServices))
154 for i, apiService := range apiServices {
155 caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec)
156 }
157
158 return cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) {
159 var merged *spec.Swagger
160 etags := make([]string, 0, len(results))
161 for _, specInfo := range results {
162 result, etag, err := specInfo.Get()
163 if err != nil {
164
165
166 klog.Warning(err)
167 continue
168 }
169 if merged == nil {
170 merged = &spec.Swagger{}
171 *merged = *result
172
173
174 merged.Paths = nil
175 merged.Definitions = nil
176 merged.Parameters = nil
177 }
178 etags = append(etags, etag)
179 if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result); err != nil {
180 return nil, "", fmt.Errorf("failed to build merge specs: %v", err)
181 }
182 }
183
184 return merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags)))), nil
185 }, caches)
186 }
187
188
189
190 func (s *specAggregator) updateServiceLocked(name string) error {
191 specInfo, exists := s.specsByAPIServiceName[name]
192 if !exists {
193 return ErrAPIServiceNotFound
194 }
195 result, etag, err := specInfo.downloader.Get()
196 filteredResult := cached.Transform[*spec.Swagger](func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) {
197 if err != nil {
198 return nil, "", err
199 }
200 group := specInfo.apiService.Spec.Group
201 version := specInfo.apiService.Spec.Version
202 return aggregator.FilterSpecByPathsWithoutSideEffects(result, []string{"/apis/" + group + "/" + version + "/"}), etag, nil
203 }, cached.Result[*spec.Swagger]{Value: result, Etag: etag, Err: err})
204 specInfo.spec.Store(filteredResult)
205 return err
206 }
207
208
209 func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error {
210 s.mutex.Lock()
211 defer s.mutex.Unlock()
212 return s.updateServiceLocked(apiServiceName)
213 }
214
215
216
217 func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error {
218 if apiService.Spec.Service == nil {
219 return nil
220 }
221 s.mutex.Lock()
222 defer s.mutex.Unlock()
223
224 existingSpec, exists := s.specsByAPIServiceName[apiService.Name]
225 if !exists {
226 specInfo := &openAPISpecInfo{
227 apiService: *apiService,
228 downloader: NewCacheableDownloader(apiService.Name, s.downloader, handler),
229 }
230 specInfo.spec.Store(cached.Result[*spec.Swagger]{Err: fmt.Errorf("spec for apiservice %s is not yet available", apiService.Name)})
231 s.specsByAPIServiceName[apiService.Name] = specInfo
232 s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
233 } else {
234 existingSpec.apiService = *apiService
235 existingSpec.downloader.UpdateHandler(handler)
236 }
237
238 return nil
239 }
240
241
242
243 func (s *specAggregator) RemoveAPIService(apiServiceName string) {
244 s.mutex.Lock()
245 defer s.mutex.Unlock()
246
247 if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists {
248 return
249 }
250 delete(s.specsByAPIServiceName, apiServiceName)
251
252 s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
253 }
254
View as plain text