1
16
17 package rest
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 flowcontrolv1 "k8s.io/api/flowcontrol/v1"
25 "k8s.io/apimachinery/pkg/runtime/schema"
26 "k8s.io/apimachinery/pkg/util/wait"
27 flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
28 "k8s.io/apiserver/pkg/registry/generic"
29 "k8s.io/apiserver/pkg/registry/rest"
30 genericapiserver "k8s.io/apiserver/pkg/server"
31 serverstorage "k8s.io/apiserver/pkg/server/storage"
32 "k8s.io/client-go/informers"
33 flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1"
34 flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1"
35 "k8s.io/client-go/tools/cache"
36 "k8s.io/klog/v2"
37 "k8s.io/kubernetes/pkg/api/legacyscheme"
38 "k8s.io/kubernetes/pkg/apis/flowcontrol"
39 flowcontrolapisv1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1"
40 flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
41 flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
42 flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
43 "k8s.io/kubernetes/pkg/registry/flowcontrol/ensurer"
44 flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage"
45 prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage"
46 )
47
48 var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{}
49
50
51 type RESTStorageProvider struct {
52 InformerFactory informers.SharedInformerFactory
53 }
54
55
56 const PostStartHookName = "priority-and-fairness-config-producer"
57
58
59 func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
60 apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(flowcontrol.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
61
62 if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1beta1.SchemeGroupVersion); err != nil {
63 return genericapiserver.APIGroupInfo{}, err
64 } else if len(storageMap) > 0 {
65 apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1beta1.SchemeGroupVersion.Version] = storageMap
66 }
67
68 if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1beta2.SchemeGroupVersion); err != nil {
69 return genericapiserver.APIGroupInfo{}, err
70 } else if len(storageMap) > 0 {
71 apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1beta2.SchemeGroupVersion.Version] = storageMap
72 }
73
74 if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1beta3.SchemeGroupVersion); err != nil {
75 return genericapiserver.APIGroupInfo{}, err
76 } else if len(storageMap) > 0 {
77 apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1beta3.SchemeGroupVersion.Version] = storageMap
78 }
79
80 if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1.SchemeGroupVersion); err != nil {
81 return genericapiserver.APIGroupInfo{}, err
82 } else if len(storageMap) > 0 {
83 apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1.SchemeGroupVersion.Version] = storageMap
84 }
85
86 return apiGroupInfo, nil
87 }
88
89 func (p RESTStorageProvider) storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, groupVersion schema.GroupVersion) (map[string]rest.Storage, error) {
90 storage := map[string]rest.Storage{}
91
92
93 if resource := "flowschemas"; apiResourceConfigSource.ResourceEnabled(groupVersion.WithResource(resource)) {
94 flowSchemaStorage, flowSchemaStatusStorage, err := flowschemastore.NewREST(restOptionsGetter)
95 if err != nil {
96 return nil, err
97 }
98 storage[resource] = flowSchemaStorage
99 storage[resource+"/status"] = flowSchemaStatusStorage
100 }
101
102
103 if resource := "prioritylevelconfigurations"; apiResourceConfigSource.ResourceEnabled(groupVersion.WithResource(resource)) {
104 priorityLevelConfigurationStorage, priorityLevelConfigurationStatusStorage, err := prioritylevelconfigurationstore.NewREST(restOptionsGetter)
105 if err != nil {
106 return nil, err
107 }
108 storage[resource] = priorityLevelConfigurationStorage
109 storage[resource+"/status"] = priorityLevelConfigurationStatusStorage
110 }
111
112 return storage, nil
113 }
114
115
116 func (p RESTStorageProvider) GroupName() string {
117 return flowcontrol.GroupName
118 }
119
120
121 func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
122 bce := &bootstrapConfigurationEnsurer{
123 informersSynced: []cache.InformerSynced{
124 p.InformerFactory.Flowcontrol().V1().PriorityLevelConfigurations().Informer().HasSynced,
125 p.InformerFactory.Flowcontrol().V1().FlowSchemas().Informer().HasSynced,
126 },
127 fsLister: p.InformerFactory.Flowcontrol().V1().FlowSchemas().Lister(),
128 plcLister: p.InformerFactory.Flowcontrol().V1().PriorityLevelConfigurations().Lister(),
129 }
130 return PostStartHookName, bce.ensureAPFBootstrapConfiguration, nil
131 }
132
133 type bootstrapConfigurationEnsurer struct {
134 informersSynced []cache.InformerSynced
135 fsLister flowcontrollisters.FlowSchemaLister
136 plcLister flowcontrollisters.PriorityLevelConfigurationLister
137 }
138
139 func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error {
140 clientset, err := flowcontrolclient.NewForConfig(hookContext.LoopbackClientConfig)
141 if err != nil {
142 return fmt.Errorf("failed to initialize clientset for APF - %w", err)
143 }
144
145 err = func() error {
146
147
148 ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute)
149 defer cancel()
150
151 if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) {
152 return fmt.Errorf("APF bootstrap ensurer timed out waiting for cache sync")
153 }
154
155 err = wait.PollImmediateUntilWithContext(
156 ctx,
157 time.Second,
158 func(context.Context) (bool, error) {
159 if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
160 klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
161 return false, nil
162 }
163 return true, nil
164 })
165 if err != nil {
166 return fmt.Errorf("unable to initialize APF bootstrap configuration: %w", err)
167 }
168 return nil
169 }()
170 if err != nil {
171 return err
172 }
173
174
175
176 go func() {
177 ctx := wait.ContextForChannel(hookContext.StopCh)
178 wait.PollImmediateUntil(
179 time.Minute,
180 func() (bool, error) {
181 if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
182 klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
183 }
184
185 return false, nil
186 }, hookContext.StopCh)
187 klog.Info("APF bootstrap ensurer is exiting")
188 }()
189
190 return nil
191 }
192
193 func ensure(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
194
195 if err := ensureSuggestedConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
196
197
198
199 return fmt.Errorf("failed ensuring suggested settings - %w", err)
200 }
201
202 if err := ensureMandatoryConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
203 return fmt.Errorf("failed ensuring mandatory settings - %w", err)
204 }
205
206 if err := removeDanglingBootstrapConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
207 return fmt.Errorf("failed to delete removed settings - %w", err)
208 }
209
210 return nil
211 }
212
213 func ensureSuggestedConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
214 plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
215 if err := ensurer.EnsureConfigurations(ctx, plcOps, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations, ensurer.NewSuggestedEnsureStrategy[*flowcontrolv1.PriorityLevelConfiguration]()); err != nil {
216 return err
217 }
218
219 fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
220 return ensurer.EnsureConfigurations(ctx, fsOps, flowcontrolbootstrap.SuggestedFlowSchemas, ensurer.NewSuggestedEnsureStrategy[*flowcontrolv1.FlowSchema]())
221 }
222
223 func ensureMandatoryConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
224 plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
225 if err := ensurer.EnsureConfigurations(ctx, plcOps, flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, ensurer.NewMandatoryEnsureStrategy[*flowcontrolv1.PriorityLevelConfiguration]()); err != nil {
226 return err
227 }
228
229 fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
230 return ensurer.EnsureConfigurations(ctx, fsOps, flowcontrolbootstrap.MandatoryFlowSchemas, ensurer.NewMandatoryEnsureStrategy[*flowcontrolv1.FlowSchema]())
231 }
232
233 func removeDanglingBootstrapConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
234 if err := removeDanglingBootstrapFlowSchema(ctx, clientset, fsLister); err != nil {
235 return err
236 }
237
238 return removeDanglingBootstrapPriorityLevel(ctx, clientset, plcLister)
239 }
240
241 func removeDanglingBootstrapFlowSchema(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister) error {
242 bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...)
243 fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
244 return ensurer.RemoveUnwantedObjects(ctx, fsOps, bootstrap)
245 }
246
247 func removeDanglingBootstrapPriorityLevel(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
248 bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...)
249 plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
250 return ensurer.RemoveUnwantedObjects(ctx, plcOps, bootstrap)
251 }
252
253
254
255
256
257
258
259 func contextFromChannelAndMaxWaitDuration(stopCh <-chan struct{}, maxWait time.Duration) (context.Context, context.CancelFunc) {
260 ctx, cancel := context.WithCancel(context.Background())
261
262 go func() {
263 defer cancel()
264
265 select {
266 case <-stopCh:
267 case <-time.After(maxWait):
268
269
270
271
272
273 case <-ctx.Done():
274 }
275 }()
276 return ctx, cancel
277 }
278
View as plain text