...

Source file src/k8s.io/kubernetes/pkg/registry/apps/statefulset/storage/storage.go

Documentation: k8s.io/kubernetes/pkg/registry/apps/statefulset/storage

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	"k8s.io/apimachinery/pkg/api/errors"
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	"k8s.io/apimachinery/pkg/runtime/schema"
    27  	"k8s.io/apimachinery/pkg/util/managedfields"
    28  	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    29  	"k8s.io/apiserver/pkg/registry/generic"
    30  	genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
    31  	"k8s.io/apiserver/pkg/registry/rest"
    32  	"k8s.io/klog/v2"
    33  	"k8s.io/kubernetes/pkg/apis/apps"
    34  	appsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
    35  	appsv1beta2 "k8s.io/kubernetes/pkg/apis/apps/v1beta2"
    36  	"k8s.io/kubernetes/pkg/apis/autoscaling"
    37  	autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
    38  	autoscalingvalidation "k8s.io/kubernetes/pkg/apis/autoscaling/validation"
    39  	"k8s.io/kubernetes/pkg/printers"
    40  	printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
    41  	printerstorage "k8s.io/kubernetes/pkg/printers/storage"
    42  	"k8s.io/kubernetes/pkg/registry/apps/statefulset"
    43  	"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
    44  )
    45  
    46  // StatefulSetStorage includes dummy storage for StatefulSets, and their Status and Scale subresource.
    47  type StatefulSetStorage struct {
    48  	StatefulSet *REST
    49  	Status      *StatusREST
    50  	Scale       *ScaleREST
    51  }
    52  
    53  // ReplicasPathMappings returns the mappings between each group version and a replicas path
    54  func ReplicasPathMappings() managedfields.ResourcePathMappings {
    55  	return replicasPathInStatefulSet
    56  }
    57  
    58  // maps a group version to the replicas path in a statefulset object
    59  var replicasPathInStatefulSet = managedfields.ResourcePathMappings{
    60  	schema.GroupVersion{Group: "apps", Version: "v1beta1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
    61  	schema.GroupVersion{Group: "apps", Version: "v1beta2"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
    62  	schema.GroupVersion{Group: "apps", Version: "v1"}.String():      fieldpath.MakePathOrDie("spec", "replicas"),
    63  }
    64  
    65  // NewStorage returns new instance of StatefulSetStorage.
    66  func NewStorage(optsGetter generic.RESTOptionsGetter) (StatefulSetStorage, error) {
    67  	statefulSetRest, statefulSetStatusRest, err := NewREST(optsGetter)
    68  	if err != nil {
    69  		return StatefulSetStorage{}, err
    70  	}
    71  
    72  	return StatefulSetStorage{
    73  		StatefulSet: statefulSetRest,
    74  		Status:      statefulSetStatusRest,
    75  		Scale:       &ScaleREST{store: statefulSetRest.Store},
    76  	}, nil
    77  }
    78  
    79  // REST implements a RESTStorage for statefulsets against etcd
    80  type REST struct {
    81  	*genericregistry.Store
    82  }
    83  
    84  // NewREST returns a RESTStorage object that will work against statefulsets.
    85  func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
    86  	store := &genericregistry.Store{
    87  		NewFunc:                   func() runtime.Object { return &apps.StatefulSet{} },
    88  		NewListFunc:               func() runtime.Object { return &apps.StatefulSetList{} },
    89  		DefaultQualifiedResource:  apps.Resource("statefulsets"),
    90  		SingularQualifiedResource: apps.Resource("statefulset"),
    91  
    92  		CreateStrategy:      statefulset.Strategy,
    93  		UpdateStrategy:      statefulset.Strategy,
    94  		DeleteStrategy:      statefulset.Strategy,
    95  		ResetFieldsStrategy: statefulset.Strategy,
    96  
    97  		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    98  	}
    99  	options := &generic.StoreOptions{RESTOptions: optsGetter}
   100  	if err := store.CompleteWithOptions(options); err != nil {
   101  		return nil, nil, err
   102  	}
   103  
   104  	statusStore := *store
   105  	statusStore.UpdateStrategy = statefulset.StatusStrategy
   106  	statusStore.ResetFieldsStrategy = statefulset.StatusStrategy
   107  	return &REST{store}, &StatusREST{store: &statusStore}, nil
   108  }
   109  
   110  // Implement CategoriesProvider
   111  var _ rest.CategoriesProvider = &REST{}
   112  
   113  // Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of.
   114  func (r *REST) Categories() []string {
   115  	return []string{"all"}
   116  }
   117  
   118  // StatusREST implements the REST endpoint for changing the status of an statefulSet
   119  type StatusREST struct {
   120  	store *genericregistry.Store
   121  }
   122  
   123  // New returns empty StatefulSet object.
   124  func (r *StatusREST) New() runtime.Object {
   125  	return &apps.StatefulSet{}
   126  }
   127  
   128  // Destroy cleans up resources on shutdown.
   129  func (r *StatusREST) Destroy() {
   130  	// Given that underlying store is shared with REST,
   131  	// we don't destroy it here explicitly.
   132  }
   133  
   134  // Get retrieves the object from the storage. It is required to support Patch.
   135  func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
   136  	return r.store.Get(ctx, name, options)
   137  }
   138  
   139  // Update alters the status subset of an object.
   140  func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
   141  	// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
   142  	// subresources should never allow create on update.
   143  	return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
   144  }
   145  
   146  // GetResetFields implements rest.ResetFieldsStrategy
   147  func (r *StatusREST) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
   148  	return r.store.GetResetFields()
   149  }
   150  
   151  func (r *StatusREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
   152  	return r.store.ConvertToTable(ctx, object, tableOptions)
   153  }
   154  
   155  // Implement ShortNamesProvider
   156  var _ rest.ShortNamesProvider = &REST{}
   157  
   158  // ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
   159  func (r *REST) ShortNames() []string {
   160  	return []string{"sts"}
   161  }
   162  
   163  // ScaleREST implements a Scale for Deployment.
   164  type ScaleREST struct {
   165  	store *genericregistry.Store
   166  }
   167  
   168  // ScaleREST implements Patcher
   169  var _ = rest.Patcher(&ScaleREST{})
   170  var _ = rest.GroupVersionKindProvider(&ScaleREST{})
   171  
   172  // GroupVersionKind returns GroupVersionKind for StatefulSet Scale object
   173  func (r *ScaleREST) GroupVersionKind(containingGV schema.GroupVersion) schema.GroupVersionKind {
   174  	switch containingGV {
   175  	case appsv1beta1.SchemeGroupVersion:
   176  		return appsv1beta1.SchemeGroupVersion.WithKind("Scale")
   177  	case appsv1beta2.SchemeGroupVersion:
   178  		return appsv1beta2.SchemeGroupVersion.WithKind("Scale")
   179  	default:
   180  		return autoscalingv1.SchemeGroupVersion.WithKind("Scale")
   181  	}
   182  }
   183  
   184  // New creates a new Scale object
   185  func (r *ScaleREST) New() runtime.Object {
   186  	return &autoscaling.Scale{}
   187  }
   188  
   189  // Destroy cleans up resources on shutdown.
   190  func (r *ScaleREST) Destroy() {
   191  	// Given that underlying store is shared with REST,
   192  	// we don't destroy it here explicitly.
   193  }
   194  
   195  // Get retrieves object from Scale storage.
   196  func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
   197  	obj, err := r.store.Get(ctx, name, options)
   198  	if err != nil {
   199  		return nil, err
   200  	}
   201  	ss := obj.(*apps.StatefulSet)
   202  	scale, err := scaleFromStatefulSet(ss)
   203  	if err != nil {
   204  		return nil, errors.NewBadRequest(fmt.Sprintf("%v", err))
   205  	}
   206  	return scale, err
   207  }
   208  
   209  // Update alters scale subset of StatefulSet object.
   210  func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
   211  	obj, _, err := r.store.Update(
   212  		ctx,
   213  		name,
   214  		&scaleUpdatedObjectInfo{name, objInfo},
   215  		toScaleCreateValidation(createValidation),
   216  		toScaleUpdateValidation(updateValidation),
   217  		false,
   218  		options,
   219  	)
   220  	if err != nil {
   221  		return nil, false, err
   222  	}
   223  	ss := obj.(*apps.StatefulSet)
   224  	newScale, err := scaleFromStatefulSet(ss)
   225  	if err != nil {
   226  		return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))
   227  	}
   228  	return newScale, false, err
   229  }
   230  
   231  func (r *ScaleREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
   232  	return r.store.ConvertToTable(ctx, object, tableOptions)
   233  }
   234  
   235  func toScaleCreateValidation(f rest.ValidateObjectFunc) rest.ValidateObjectFunc {
   236  	return func(ctx context.Context, obj runtime.Object) error {
   237  		scale, err := scaleFromStatefulSet(obj.(*apps.StatefulSet))
   238  		if err != nil {
   239  			return err
   240  		}
   241  		return f(ctx, scale)
   242  	}
   243  }
   244  
   245  func toScaleUpdateValidation(f rest.ValidateObjectUpdateFunc) rest.ValidateObjectUpdateFunc {
   246  	return func(ctx context.Context, obj, old runtime.Object) error {
   247  		newScale, err := scaleFromStatefulSet(obj.(*apps.StatefulSet))
   248  		if err != nil {
   249  			return err
   250  		}
   251  		oldScale, err := scaleFromStatefulSet(old.(*apps.StatefulSet))
   252  		if err != nil {
   253  			return err
   254  		}
   255  		return f(ctx, newScale, oldScale)
   256  	}
   257  }
   258  
   259  // scaleFromStatefulSet returns a scale subresource for a statefulset.
   260  func scaleFromStatefulSet(ss *apps.StatefulSet) (*autoscaling.Scale, error) {
   261  	selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
   262  	if err != nil {
   263  		return nil, err
   264  	}
   265  	return &autoscaling.Scale{
   266  		// TODO: Create a variant of ObjectMeta type that only contains the fields below.
   267  		ObjectMeta: metav1.ObjectMeta{
   268  			Name:              ss.Name,
   269  			Namespace:         ss.Namespace,
   270  			UID:               ss.UID,
   271  			ResourceVersion:   ss.ResourceVersion,
   272  			CreationTimestamp: ss.CreationTimestamp,
   273  		},
   274  		Spec: autoscaling.ScaleSpec{
   275  			Replicas: ss.Spec.Replicas,
   276  		},
   277  		Status: autoscaling.ScaleStatus{
   278  			Replicas: ss.Status.Replicas,
   279  			Selector: selector.String(),
   280  		},
   281  	}, nil
   282  }
   283  
   284  // scaleUpdatedObjectInfo transforms existing statefulset -> existing scale -> new scale -> new statefulset
   285  type scaleUpdatedObjectInfo struct {
   286  	name       string
   287  	reqObjInfo rest.UpdatedObjectInfo
   288  }
   289  
   290  func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
   291  	return i.reqObjInfo.Preconditions()
   292  }
   293  
   294  func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
   295  	statefulset, ok := oldObj.DeepCopyObject().(*apps.StatefulSet)
   296  	if !ok {
   297  		return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be StatefulSet, got %T", statefulset))
   298  	}
   299  	// if zero-value, the existing object does not exist
   300  	if len(statefulset.ResourceVersion) == 0 {
   301  		return nil, errors.NewNotFound(apps.Resource("statefulsets/scale"), i.name)
   302  	}
   303  
   304  	groupVersion := schema.GroupVersion{Group: "apps", Version: "v1"}
   305  	if requestInfo, found := genericapirequest.RequestInfoFrom(ctx); found {
   306  		requestGroupVersion := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
   307  		if _, ok := replicasPathInStatefulSet[requestGroupVersion.String()]; ok {
   308  			groupVersion = requestGroupVersion
   309  		} else {
   310  			klog.Fatalf("Unrecognized group/version in request info %q", requestGroupVersion.String())
   311  		}
   312  	}
   313  
   314  	managedFieldsHandler := managedfields.NewScaleHandler(
   315  		statefulset.ManagedFields,
   316  		groupVersion,
   317  		replicasPathInStatefulSet,
   318  	)
   319  
   320  	// statefulset -> old scale
   321  	oldScale, err := scaleFromStatefulSet(statefulset)
   322  	if err != nil {
   323  		return nil, err
   324  	}
   325  	scaleManagedFields, err := managedFieldsHandler.ToSubresource()
   326  	if err != nil {
   327  		return nil, err
   328  	}
   329  	oldScale.ManagedFields = scaleManagedFields
   330  
   331  	// old scale -> new scale
   332  	newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
   333  	if err != nil {
   334  		return nil, err
   335  	}
   336  	if newScaleObj == nil {
   337  		return nil, errors.NewBadRequest("nil update passed to Scale")
   338  	}
   339  	scale, ok := newScaleObj.(*autoscaling.Scale)
   340  	if !ok {
   341  		return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj))
   342  	}
   343  
   344  	// validate
   345  	if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
   346  		return nil, errors.NewInvalid(autoscaling.Kind("Scale"), statefulset.Name, errs)
   347  	}
   348  
   349  	// validate precondition if specified (resourceVersion matching is handled by storage)
   350  	if len(scale.UID) > 0 && scale.UID != statefulset.UID {
   351  		return nil, errors.NewConflict(
   352  			apps.Resource("statefulsets/scale"),
   353  			statefulset.Name,
   354  			fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, statefulset.UID),
   355  		)
   356  	}
   357  
   358  	// move replicas/resourceVersion fields to object and return
   359  	statefulset.Spec.Replicas = scale.Spec.Replicas
   360  	statefulset.ResourceVersion = scale.ResourceVersion
   361  
   362  	updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
   363  	if err != nil {
   364  		return nil, err
   365  	}
   366  	statefulset.ManagedFields = updatedEntries
   367  
   368  	return statefulset, nil
   369  }
   370  

View as plain text