...

Source file src/k8s.io/kubernetes/pkg/registry/flowcontrol/rest/storage_flowcontrol.go

Documentation: k8s.io/kubernetes/pkg/registry/flowcontrol/rest

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package rest
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	flowcontrolv1 "k8s.io/api/flowcontrol/v1"
    25  	"k8s.io/apimachinery/pkg/runtime/schema"
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
    28  	"k8s.io/apiserver/pkg/registry/generic"
    29  	"k8s.io/apiserver/pkg/registry/rest"
    30  	genericapiserver "k8s.io/apiserver/pkg/server"
    31  	serverstorage "k8s.io/apiserver/pkg/server/storage"
    32  	"k8s.io/client-go/informers"
    33  	flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1"
    34  	flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1"
    35  	"k8s.io/client-go/tools/cache"
    36  	"k8s.io/klog/v2"
    37  	"k8s.io/kubernetes/pkg/api/legacyscheme"
    38  	"k8s.io/kubernetes/pkg/apis/flowcontrol"
    39  	flowcontrolapisv1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1"
    40  	flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
    41  	flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
    42  	flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
    43  	"k8s.io/kubernetes/pkg/registry/flowcontrol/ensurer"
    44  	flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage"
    45  	prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage"
    46  )
    47  
    48  var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{}
    49  
    50  // RESTStorageProvider is a provider of REST storage
    51  type RESTStorageProvider struct {
    52  	InformerFactory informers.SharedInformerFactory
    53  }
    54  
    55  // PostStartHookName is the name of the post-start-hook provided by flow-control storage
    56  const PostStartHookName = "priority-and-fairness-config-producer"
    57  
    58  // NewRESTStorage creates a new rest storage for flow-control api models.
    59  func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
    60  	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(flowcontrol.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
    61  
    62  	if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1beta1.SchemeGroupVersion); err != nil {
    63  		return genericapiserver.APIGroupInfo{}, err
    64  	} else if len(storageMap) > 0 {
    65  		apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1beta1.SchemeGroupVersion.Version] = storageMap
    66  	}
    67  
    68  	if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1beta2.SchemeGroupVersion); err != nil {
    69  		return genericapiserver.APIGroupInfo{}, err
    70  	} else if len(storageMap) > 0 {
    71  		apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1beta2.SchemeGroupVersion.Version] = storageMap
    72  	}
    73  
    74  	if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1beta3.SchemeGroupVersion); err != nil {
    75  		return genericapiserver.APIGroupInfo{}, err
    76  	} else if len(storageMap) > 0 {
    77  		apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1beta3.SchemeGroupVersion.Version] = storageMap
    78  	}
    79  
    80  	if storageMap, err := p.storage(apiResourceConfigSource, restOptionsGetter, flowcontrolapisv1.SchemeGroupVersion); err != nil {
    81  		return genericapiserver.APIGroupInfo{}, err
    82  	} else if len(storageMap) > 0 {
    83  		apiGroupInfo.VersionedResourcesStorageMap[flowcontrolapisv1.SchemeGroupVersion.Version] = storageMap
    84  	}
    85  
    86  	return apiGroupInfo, nil
    87  }
    88  
    89  func (p RESTStorageProvider) storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, groupVersion schema.GroupVersion) (map[string]rest.Storage, error) {
    90  	storage := map[string]rest.Storage{}
    91  
    92  	// flow-schema
    93  	if resource := "flowschemas"; apiResourceConfigSource.ResourceEnabled(groupVersion.WithResource(resource)) {
    94  		flowSchemaStorage, flowSchemaStatusStorage, err := flowschemastore.NewREST(restOptionsGetter)
    95  		if err != nil {
    96  			return nil, err
    97  		}
    98  		storage[resource] = flowSchemaStorage
    99  		storage[resource+"/status"] = flowSchemaStatusStorage
   100  	}
   101  
   102  	// priority-level-configuration
   103  	if resource := "prioritylevelconfigurations"; apiResourceConfigSource.ResourceEnabled(groupVersion.WithResource(resource)) {
   104  		priorityLevelConfigurationStorage, priorityLevelConfigurationStatusStorage, err := prioritylevelconfigurationstore.NewREST(restOptionsGetter)
   105  		if err != nil {
   106  			return nil, err
   107  		}
   108  		storage[resource] = priorityLevelConfigurationStorage
   109  		storage[resource+"/status"] = priorityLevelConfigurationStatusStorage
   110  	}
   111  
   112  	return storage, nil
   113  }
   114  
   115  // GroupName returns group name of the storage
   116  func (p RESTStorageProvider) GroupName() string {
   117  	return flowcontrol.GroupName
   118  }
   119  
   120  // PostStartHook returns the hook func that launches the config provider
   121  func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
   122  	bce := &bootstrapConfigurationEnsurer{
   123  		informersSynced: []cache.InformerSynced{
   124  			p.InformerFactory.Flowcontrol().V1().PriorityLevelConfigurations().Informer().HasSynced,
   125  			p.InformerFactory.Flowcontrol().V1().FlowSchemas().Informer().HasSynced,
   126  		},
   127  		fsLister:  p.InformerFactory.Flowcontrol().V1().FlowSchemas().Lister(),
   128  		plcLister: p.InformerFactory.Flowcontrol().V1().PriorityLevelConfigurations().Lister(),
   129  	}
   130  	return PostStartHookName, bce.ensureAPFBootstrapConfiguration, nil
   131  }
   132  
   133  type bootstrapConfigurationEnsurer struct {
   134  	informersSynced []cache.InformerSynced
   135  	fsLister        flowcontrollisters.FlowSchemaLister
   136  	plcLister       flowcontrollisters.PriorityLevelConfigurationLister
   137  }
   138  
   139  func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error {
   140  	clientset, err := flowcontrolclient.NewForConfig(hookContext.LoopbackClientConfig)
   141  	if err != nil {
   142  		return fmt.Errorf("failed to initialize clientset for APF - %w", err)
   143  	}
   144  
   145  	err = func() error {
   146  		// get a derived context that gets cancelled after 5m or
   147  		// when the StopCh gets closed, whichever happens first.
   148  		ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute)
   149  		defer cancel()
   150  
   151  		if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) {
   152  			return fmt.Errorf("APF bootstrap ensurer timed out waiting for cache sync")
   153  		}
   154  
   155  		err = wait.PollImmediateUntilWithContext(
   156  			ctx,
   157  			time.Second,
   158  			func(context.Context) (bool, error) {
   159  				if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
   160  					klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
   161  					return false, nil
   162  				}
   163  				return true, nil
   164  			})
   165  		if err != nil {
   166  			return fmt.Errorf("unable to initialize APF bootstrap configuration: %w", err)
   167  		}
   168  		return nil
   169  	}()
   170  	if err != nil {
   171  		return err
   172  	}
   173  
   174  	// we have successfully initialized the bootstrap configuration, now we
   175  	// spin up a goroutine which reconciles the bootstrap configuration periodically.
   176  	go func() {
   177  		ctx := wait.ContextForChannel(hookContext.StopCh)
   178  		wait.PollImmediateUntil(
   179  			time.Minute,
   180  			func() (bool, error) {
   181  				if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
   182  					klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
   183  				}
   184  				// always auto update both suggested and mandatory configuration
   185  				return false, nil
   186  			}, hookContext.StopCh)
   187  		klog.Info("APF bootstrap ensurer is exiting")
   188  	}()
   189  
   190  	return nil
   191  }
   192  
   193  func ensure(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
   194  
   195  	if err := ensureSuggestedConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
   196  		// We should not attempt creation of mandatory objects if ensuring the suggested
   197  		// configuration resulted in an error.
   198  		// This only happens when the stop channel is closed.
   199  		return fmt.Errorf("failed ensuring suggested settings - %w", err)
   200  	}
   201  
   202  	if err := ensureMandatoryConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
   203  		return fmt.Errorf("failed ensuring mandatory settings - %w", err)
   204  	}
   205  
   206  	if err := removeDanglingBootstrapConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
   207  		return fmt.Errorf("failed to delete removed settings - %w", err)
   208  	}
   209  
   210  	return nil
   211  }
   212  
   213  func ensureSuggestedConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
   214  	plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
   215  	if err := ensurer.EnsureConfigurations(ctx, plcOps, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations, ensurer.NewSuggestedEnsureStrategy[*flowcontrolv1.PriorityLevelConfiguration]()); err != nil {
   216  		return err
   217  	}
   218  
   219  	fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
   220  	return ensurer.EnsureConfigurations(ctx, fsOps, flowcontrolbootstrap.SuggestedFlowSchemas, ensurer.NewSuggestedEnsureStrategy[*flowcontrolv1.FlowSchema]())
   221  }
   222  
   223  func ensureMandatoryConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
   224  	plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
   225  	if err := ensurer.EnsureConfigurations(ctx, plcOps, flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, ensurer.NewMandatoryEnsureStrategy[*flowcontrolv1.PriorityLevelConfiguration]()); err != nil {
   226  		return err
   227  	}
   228  
   229  	fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
   230  	return ensurer.EnsureConfigurations(ctx, fsOps, flowcontrolbootstrap.MandatoryFlowSchemas, ensurer.NewMandatoryEnsureStrategy[*flowcontrolv1.FlowSchema]())
   231  }
   232  
   233  func removeDanglingBootstrapConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
   234  	if err := removeDanglingBootstrapFlowSchema(ctx, clientset, fsLister); err != nil {
   235  		return err
   236  	}
   237  
   238  	return removeDanglingBootstrapPriorityLevel(ctx, clientset, plcLister)
   239  }
   240  
   241  func removeDanglingBootstrapFlowSchema(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, fsLister flowcontrollisters.FlowSchemaLister) error {
   242  	bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...)
   243  	fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
   244  	return ensurer.RemoveUnwantedObjects(ctx, fsOps, bootstrap)
   245  }
   246  
   247  func removeDanglingBootstrapPriorityLevel(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1Interface, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
   248  	bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...)
   249  	plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
   250  	return ensurer.RemoveUnwantedObjects(ctx, plcOps, bootstrap)
   251  }
   252  
   253  // contextFromChannelAndMaxWaitDuration returns a Context that is bound to the
   254  // specified channel and the wait duration. The derived context will be
   255  // cancelled when the specified channel stopCh is closed or the maximum wait
   256  // duration specified in maxWait elapses, whichever happens first.
   257  //
   258  // Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
   259  func contextFromChannelAndMaxWaitDuration(stopCh <-chan struct{}, maxWait time.Duration) (context.Context, context.CancelFunc) {
   260  	ctx, cancel := context.WithCancel(context.Background())
   261  
   262  	go func() {
   263  		defer cancel()
   264  
   265  		select {
   266  		case <-stopCh:
   267  		case <-time.After(maxWait):
   268  
   269  		// the caller can explicitly cancel the context which is an
   270  		// indication to us to exit the goroutine immediately.
   271  		// Note that we are calling cancel more than once when we are here,
   272  		// CancelFunc is idempotent and we expect no ripple effects here.
   273  		case <-ctx.Done():
   274  		}
   275  	}()
   276  	return ctx, cancel
   277  }
   278  

View as plain text