...

Source file src/k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go

Documentation: k8s.io/kubernetes/pkg/registry/core/pod/storage

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"net/url"
    24  
    25  	"k8s.io/apimachinery/pkg/api/errors"
    26  	"k8s.io/apimachinery/pkg/api/meta"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	"k8s.io/apimachinery/pkg/types"
    30  	"k8s.io/apiserver/pkg/registry/generic"
    31  	genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
    32  	"k8s.io/apiserver/pkg/registry/rest"
    33  	"k8s.io/apiserver/pkg/storage"
    34  	storeerr "k8s.io/apiserver/pkg/storage/errors"
    35  	"k8s.io/apiserver/pkg/util/dryrun"
    36  	policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
    37  	podutil "k8s.io/kubernetes/pkg/api/pod"
    38  	api "k8s.io/kubernetes/pkg/apis/core"
    39  	"k8s.io/kubernetes/pkg/apis/core/validation"
    40  	"k8s.io/kubernetes/pkg/kubelet/client"
    41  	"k8s.io/kubernetes/pkg/printers"
    42  	printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
    43  	printerstorage "k8s.io/kubernetes/pkg/printers/storage"
    44  	registrypod "k8s.io/kubernetes/pkg/registry/core/pod"
    45  	podrest "k8s.io/kubernetes/pkg/registry/core/pod/rest"
    46  	"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
    47  )
    48  
    49  // PodStorage includes storage for pods and all sub resources
    50  type PodStorage struct {
    51  	Pod                 *REST
    52  	Binding             *BindingREST
    53  	LegacyBinding       *LegacyBindingREST
    54  	Eviction            *EvictionREST
    55  	Status              *StatusREST
    56  	EphemeralContainers *EphemeralContainersREST
    57  	Log                 *podrest.LogREST
    58  	Proxy               *podrest.ProxyREST
    59  	Exec                *podrest.ExecREST
    60  	Attach              *podrest.AttachREST
    61  	PortForward         *podrest.PortForwardREST
    62  }
    63  
    64  // REST implements a RESTStorage for pods
    65  type REST struct {
    66  	*genericregistry.Store
    67  	proxyTransport http.RoundTripper
    68  }
    69  
    70  // NewStorage returns a RESTStorage object that will work against pods.
    71  func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
    72  
    73  	store := &genericregistry.Store{
    74  		NewFunc:                   func() runtime.Object { return &api.Pod{} },
    75  		NewListFunc:               func() runtime.Object { return &api.PodList{} },
    76  		PredicateFunc:             registrypod.MatchPod,
    77  		DefaultQualifiedResource:  api.Resource("pods"),
    78  		SingularQualifiedResource: api.Resource("pod"),
    79  
    80  		CreateStrategy:      registrypod.Strategy,
    81  		UpdateStrategy:      registrypod.Strategy,
    82  		DeleteStrategy:      registrypod.Strategy,
    83  		ResetFieldsStrategy: registrypod.Strategy,
    84  		ReturnDeletedObject: true,
    85  
    86  		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    87  	}
    88  	options := &generic.StoreOptions{
    89  		RESTOptions: optsGetter,
    90  		AttrFunc:    registrypod.GetAttrs,
    91  		TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
    92  		Indexers:    registrypod.Indexers(),
    93  	}
    94  	if err := store.CompleteWithOptions(options); err != nil {
    95  		return PodStorage{}, err
    96  	}
    97  
    98  	statusStore := *store
    99  	statusStore.UpdateStrategy = registrypod.StatusStrategy
   100  	statusStore.ResetFieldsStrategy = registrypod.StatusStrategy
   101  	ephemeralContainersStore := *store
   102  	ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy
   103  
   104  	bindingREST := &BindingREST{store: store}
   105  	return PodStorage{
   106  		Pod:                 &REST{store, proxyTransport},
   107  		Binding:             &BindingREST{store: store},
   108  		LegacyBinding:       &LegacyBindingREST{bindingREST},
   109  		Eviction:            newEvictionStorage(&statusStore, podDisruptionBudgetClient),
   110  		Status:              &StatusREST{store: &statusStore},
   111  		EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
   112  		Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
   113  		Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
   114  		Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
   115  		Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
   116  		PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
   117  	}, nil
   118  }
   119  
   120  // Implement Redirector.
   121  var _ = rest.Redirector(&REST{})
   122  
   123  // ResourceLocation returns a pods location from its HostIP
   124  func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error) {
   125  	return registrypod.ResourceLocation(ctx, r, r.proxyTransport, name)
   126  }
   127  
   128  // Implement ShortNamesProvider
   129  var _ rest.ShortNamesProvider = &REST{}
   130  
   131  // ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
   132  func (r *REST) ShortNames() []string {
   133  	return []string{"po"}
   134  }
   135  
   136  // Implement CategoriesProvider
   137  var _ rest.CategoriesProvider = &REST{}
   138  
   139  // Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of.
   140  func (r *REST) Categories() []string {
   141  	return []string{"all"}
   142  }
   143  
   144  // BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
   145  type BindingREST struct {
   146  	store *genericregistry.Store
   147  }
   148  
   149  // NamespaceScoped fulfill rest.Scoper
   150  func (r *BindingREST) NamespaceScoped() bool {
   151  	return r.store.NamespaceScoped()
   152  }
   153  
   154  // New creates a new binding resource
   155  func (r *BindingREST) New() runtime.Object {
   156  	return &api.Binding{}
   157  }
   158  
   159  // Destroy cleans up resources on shutdown.
   160  func (r *BindingREST) Destroy() {
   161  	// Given that underlying store is shared with REST,
   162  	// we don't destroy it here explicitly.
   163  }
   164  
   165  var _ = rest.NamedCreater(&BindingREST{})
   166  var _ = rest.SubresourceObjectMetaPreserver(&BindingREST{})
   167  
   168  // Create ensures a pod is bound to a specific host.
   169  func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) {
   170  	binding, ok := obj.(*api.Binding)
   171  	if !ok {
   172  		return nil, errors.NewBadRequest(fmt.Sprintf("not a Binding object: %#v", obj))
   173  	}
   174  
   175  	if name != binding.Name {
   176  		return nil, errors.NewBadRequest("name in URL does not match name in Binding object")
   177  	}
   178  
   179  	// TODO: move me to a binding strategy
   180  	if errs := validation.ValidatePodBinding(binding); len(errs) != 0 {
   181  		return nil, errs.ToAggregate()
   182  	}
   183  
   184  	if createValidation != nil {
   185  		if err := createValidation(ctx, binding.DeepCopyObject()); err != nil {
   186  			return nil, err
   187  		}
   188  	}
   189  
   190  	err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun))
   191  	out = &metav1.Status{Status: metav1.StatusSuccess}
   192  	return
   193  }
   194  
   195  // PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate indicates to a
   196  // handler that this endpoint requires the UID and ResourceVersion to use as
   197  // preconditions. Other fields, such as timestamp, are ignored.
   198  func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() bool {
   199  	return true
   200  }
   201  
   202  // setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if
   203  // the pod is unassigned and merges the provided annotations with those of the pod.
   204  // Returns the current state of the pod, or an error.
   205  func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {
   206  	podKey, err := r.store.KeyFunc(ctx, podID)
   207  	if err != nil {
   208  		return nil, err
   209  	}
   210  
   211  	var preconditions *storage.Preconditions
   212  	if podUID != "" || podResourceVersion != "" {
   213  		preconditions = &storage.Preconditions{}
   214  		if podUID != "" {
   215  			preconditions.UID = &podUID
   216  		}
   217  		if podResourceVersion != "" {
   218  			preconditions.ResourceVersion = &podResourceVersion
   219  		}
   220  	}
   221  
   222  	err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, preconditions, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
   223  		pod, ok := obj.(*api.Pod)
   224  		if !ok {
   225  			return nil, fmt.Errorf("unexpected object: %#v", obj)
   226  		}
   227  		if pod.DeletionTimestamp != nil {
   228  			return nil, fmt.Errorf("pod %s is being deleted, cannot be assigned to a host", pod.Name)
   229  		}
   230  		if pod.Spec.NodeName != "" {
   231  			return nil, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName)
   232  		}
   233  		// Reject binding to a scheduling un-ready Pod.
   234  		if len(pod.Spec.SchedulingGates) != 0 {
   235  			return nil, fmt.Errorf("pod %v has non-empty .spec.schedulingGates", pod.Name)
   236  		}
   237  		pod.Spec.NodeName = machine
   238  		if pod.Annotations == nil {
   239  			pod.Annotations = make(map[string]string)
   240  		}
   241  		for k, v := range annotations {
   242  			pod.Annotations[k] = v
   243  		}
   244  		podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
   245  			Type:   api.PodScheduled,
   246  			Status: api.ConditionTrue,
   247  		})
   248  		finalPod = pod
   249  		return pod, nil
   250  	}), dryRun, nil)
   251  	return finalPod, err
   252  }
   253  
   254  // assignPod assigns the given pod to the given machine.
   255  func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) {
   256  	if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, dryRun); err != nil {
   257  		err = storeerr.InterpretGetError(err, api.Resource("pods"), podID)
   258  		err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID)
   259  		if _, ok := err.(*errors.StatusError); !ok {
   260  			err = errors.NewConflict(api.Resource("pods/binding"), podID, err)
   261  		}
   262  	}
   263  	return
   264  }
   265  
   266  var _ = rest.Creater(&LegacyBindingREST{})
   267  
   268  // LegacyBindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
   269  type LegacyBindingREST struct {
   270  	bindingRest *BindingREST
   271  }
   272  
   273  // NamespaceScoped fulfill rest.Scoper
   274  func (r *LegacyBindingREST) NamespaceScoped() bool {
   275  	return r.bindingRest.NamespaceScoped()
   276  }
   277  
   278  // New creates a new binding resource
   279  func (r *LegacyBindingREST) New() runtime.Object {
   280  	return r.bindingRest.New()
   281  }
   282  
   283  // Destroy cleans up resources on shutdown.
   284  func (r *LegacyBindingREST) Destroy() {
   285  	// Given that underlying store is shared with REST,
   286  	// we don't destroy it here explicitly.
   287  }
   288  
   289  // Create ensures a pod is bound to a specific host.
   290  func (r *LegacyBindingREST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) {
   291  	metadata, err := meta.Accessor(obj)
   292  	if err != nil {
   293  		return nil, errors.NewBadRequest(fmt.Sprintf("not a Binding object: %T", obj))
   294  	}
   295  	return r.bindingRest.Create(ctx, metadata.GetName(), obj, createValidation, options)
   296  }
   297  
   298  func (r *LegacyBindingREST) GetSingularName() string {
   299  	return "binding"
   300  }
   301  
   302  // StatusREST implements the REST endpoint for changing the status of a pod.
   303  type StatusREST struct {
   304  	store *genericregistry.Store
   305  }
   306  
   307  // New creates a new pod resource
   308  func (r *StatusREST) New() runtime.Object {
   309  	return &api.Pod{}
   310  }
   311  
   312  // Destroy cleans up resources on shutdown.
   313  func (r *StatusREST) Destroy() {
   314  	// Given that underlying store is shared with REST,
   315  	// we don't destroy it here explicitly.
   316  }
   317  
   318  // Get retrieves the object from the storage. It is required to support Patch.
   319  func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
   320  	return r.store.Get(ctx, name, options)
   321  }
   322  
   323  // Update alters the status subset of an object.
   324  func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
   325  	// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
   326  	// subresources should never allow create on update.
   327  	return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
   328  }
   329  
   330  // GetResetFields implements rest.ResetFieldsStrategy
   331  func (r *StatusREST) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
   332  	return r.store.GetResetFields()
   333  }
   334  
   335  func (r *StatusREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
   336  	return r.store.ConvertToTable(ctx, object, tableOptions)
   337  }
   338  
   339  // EphemeralContainersREST implements the REST endpoint for adding EphemeralContainers
   340  type EphemeralContainersREST struct {
   341  	store *genericregistry.Store
   342  }
   343  
   344  var _ = rest.Patcher(&EphemeralContainersREST{})
   345  
   346  // Get retrieves the object from the storage. It is required to support Patch.
   347  func (r *EphemeralContainersREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
   348  	return r.store.Get(ctx, name, options)
   349  }
   350  
   351  // New creates a new pod resource
   352  func (r *EphemeralContainersREST) New() runtime.Object {
   353  	return &api.Pod{}
   354  }
   355  
   356  // Destroy cleans up resources on shutdown.
   357  func (r *EphemeralContainersREST) Destroy() {
   358  	// Given that underlying store is shared with REST,
   359  	// we don't destroy it here explicitly.
   360  }
   361  
   362  // Update alters the EphemeralContainers field in PodSpec
   363  func (r *EphemeralContainersREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
   364  	// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
   365  	// subresources should never allow create on update.
   366  	return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
   367  }
   368  

View as plain text