1
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "net/url"
24
25 "k8s.io/apimachinery/pkg/api/errors"
26 "k8s.io/apimachinery/pkg/api/meta"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/apiserver/pkg/registry/generic"
31 genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
32 "k8s.io/apiserver/pkg/registry/rest"
33 "k8s.io/apiserver/pkg/storage"
34 storeerr "k8s.io/apiserver/pkg/storage/errors"
35 "k8s.io/apiserver/pkg/util/dryrun"
36 policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
37 podutil "k8s.io/kubernetes/pkg/api/pod"
38 api "k8s.io/kubernetes/pkg/apis/core"
39 "k8s.io/kubernetes/pkg/apis/core/validation"
40 "k8s.io/kubernetes/pkg/kubelet/client"
41 "k8s.io/kubernetes/pkg/printers"
42 printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
43 printerstorage "k8s.io/kubernetes/pkg/printers/storage"
44 registrypod "k8s.io/kubernetes/pkg/registry/core/pod"
45 podrest "k8s.io/kubernetes/pkg/registry/core/pod/rest"
46 "sigs.k8s.io/structured-merge-diff/v4/fieldpath"
47 )
48
49
50 type PodStorage struct {
51 Pod *REST
52 Binding *BindingREST
53 LegacyBinding *LegacyBindingREST
54 Eviction *EvictionREST
55 Status *StatusREST
56 EphemeralContainers *EphemeralContainersREST
57 Log *podrest.LogREST
58 Proxy *podrest.ProxyREST
59 Exec *podrest.ExecREST
60 Attach *podrest.AttachREST
61 PortForward *podrest.PortForwardREST
62 }
63
64
65 type REST struct {
66 *genericregistry.Store
67 proxyTransport http.RoundTripper
68 }
69
70
71 func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
72
73 store := &genericregistry.Store{
74 NewFunc: func() runtime.Object { return &api.Pod{} },
75 NewListFunc: func() runtime.Object { return &api.PodList{} },
76 PredicateFunc: registrypod.MatchPod,
77 DefaultQualifiedResource: api.Resource("pods"),
78 SingularQualifiedResource: api.Resource("pod"),
79
80 CreateStrategy: registrypod.Strategy,
81 UpdateStrategy: registrypod.Strategy,
82 DeleteStrategy: registrypod.Strategy,
83 ResetFieldsStrategy: registrypod.Strategy,
84 ReturnDeletedObject: true,
85
86 TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
87 }
88 options := &generic.StoreOptions{
89 RESTOptions: optsGetter,
90 AttrFunc: registrypod.GetAttrs,
91 TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
92 Indexers: registrypod.Indexers(),
93 }
94 if err := store.CompleteWithOptions(options); err != nil {
95 return PodStorage{}, err
96 }
97
98 statusStore := *store
99 statusStore.UpdateStrategy = registrypod.StatusStrategy
100 statusStore.ResetFieldsStrategy = registrypod.StatusStrategy
101 ephemeralContainersStore := *store
102 ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy
103
104 bindingREST := &BindingREST{store: store}
105 return PodStorage{
106 Pod: &REST{store, proxyTransport},
107 Binding: &BindingREST{store: store},
108 LegacyBinding: &LegacyBindingREST{bindingREST},
109 Eviction: newEvictionStorage(&statusStore, podDisruptionBudgetClient),
110 Status: &StatusREST{store: &statusStore},
111 EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
112 Log: &podrest.LogREST{Store: store, KubeletConn: k},
113 Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
114 Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
115 Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
116 PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
117 }, nil
118 }
119
120
121 var _ = rest.Redirector(&REST{})
122
123
124 func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error) {
125 return registrypod.ResourceLocation(ctx, r, r.proxyTransport, name)
126 }
127
128
129 var _ rest.ShortNamesProvider = &REST{}
130
131
132 func (r *REST) ShortNames() []string {
133 return []string{"po"}
134 }
135
136
137 var _ rest.CategoriesProvider = &REST{}
138
139
140 func (r *REST) Categories() []string {
141 return []string{"all"}
142 }
143
144
145 type BindingREST struct {
146 store *genericregistry.Store
147 }
148
149
150 func (r *BindingREST) NamespaceScoped() bool {
151 return r.store.NamespaceScoped()
152 }
153
154
155 func (r *BindingREST) New() runtime.Object {
156 return &api.Binding{}
157 }
158
159
160 func (r *BindingREST) Destroy() {
161
162
163 }
164
165 var _ = rest.NamedCreater(&BindingREST{})
166 var _ = rest.SubresourceObjectMetaPreserver(&BindingREST{})
167
168
169 func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) {
170 binding, ok := obj.(*api.Binding)
171 if !ok {
172 return nil, errors.NewBadRequest(fmt.Sprintf("not a Binding object: %#v", obj))
173 }
174
175 if name != binding.Name {
176 return nil, errors.NewBadRequest("name in URL does not match name in Binding object")
177 }
178
179
180 if errs := validation.ValidatePodBinding(binding); len(errs) != 0 {
181 return nil, errs.ToAggregate()
182 }
183
184 if createValidation != nil {
185 if err := createValidation(ctx, binding.DeepCopyObject()); err != nil {
186 return nil, err
187 }
188 }
189
190 err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun))
191 out = &metav1.Status{Status: metav1.StatusSuccess}
192 return
193 }
194
195
196
197
198 func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() bool {
199 return true
200 }
201
202
203
204
205 func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {
206 podKey, err := r.store.KeyFunc(ctx, podID)
207 if err != nil {
208 return nil, err
209 }
210
211 var preconditions *storage.Preconditions
212 if podUID != "" || podResourceVersion != "" {
213 preconditions = &storage.Preconditions{}
214 if podUID != "" {
215 preconditions.UID = &podUID
216 }
217 if podResourceVersion != "" {
218 preconditions.ResourceVersion = &podResourceVersion
219 }
220 }
221
222 err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, preconditions, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
223 pod, ok := obj.(*api.Pod)
224 if !ok {
225 return nil, fmt.Errorf("unexpected object: %#v", obj)
226 }
227 if pod.DeletionTimestamp != nil {
228 return nil, fmt.Errorf("pod %s is being deleted, cannot be assigned to a host", pod.Name)
229 }
230 if pod.Spec.NodeName != "" {
231 return nil, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName)
232 }
233
234 if len(pod.Spec.SchedulingGates) != 0 {
235 return nil, fmt.Errorf("pod %v has non-empty .spec.schedulingGates", pod.Name)
236 }
237 pod.Spec.NodeName = machine
238 if pod.Annotations == nil {
239 pod.Annotations = make(map[string]string)
240 }
241 for k, v := range annotations {
242 pod.Annotations[k] = v
243 }
244 podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
245 Type: api.PodScheduled,
246 Status: api.ConditionTrue,
247 })
248 finalPod = pod
249 return pod, nil
250 }), dryRun, nil)
251 return finalPod, err
252 }
253
254
255 func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) {
256 if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, dryRun); err != nil {
257 err = storeerr.InterpretGetError(err, api.Resource("pods"), podID)
258 err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID)
259 if _, ok := err.(*errors.StatusError); !ok {
260 err = errors.NewConflict(api.Resource("pods/binding"), podID, err)
261 }
262 }
263 return
264 }
265
266 var _ = rest.Creater(&LegacyBindingREST{})
267
268
269 type LegacyBindingREST struct {
270 bindingRest *BindingREST
271 }
272
273
274 func (r *LegacyBindingREST) NamespaceScoped() bool {
275 return r.bindingRest.NamespaceScoped()
276 }
277
278
279 func (r *LegacyBindingREST) New() runtime.Object {
280 return r.bindingRest.New()
281 }
282
283
284 func (r *LegacyBindingREST) Destroy() {
285
286
287 }
288
289
290 func (r *LegacyBindingREST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) {
291 metadata, err := meta.Accessor(obj)
292 if err != nil {
293 return nil, errors.NewBadRequest(fmt.Sprintf("not a Binding object: %T", obj))
294 }
295 return r.bindingRest.Create(ctx, metadata.GetName(), obj, createValidation, options)
296 }
297
298 func (r *LegacyBindingREST) GetSingularName() string {
299 return "binding"
300 }
301
302
303 type StatusREST struct {
304 store *genericregistry.Store
305 }
306
307
308 func (r *StatusREST) New() runtime.Object {
309 return &api.Pod{}
310 }
311
312
313 func (r *StatusREST) Destroy() {
314
315
316 }
317
318
319 func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
320 return r.store.Get(ctx, name, options)
321 }
322
323
324 func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
325
326
327 return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
328 }
329
330
331 func (r *StatusREST) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
332 return r.store.GetResetFields()
333 }
334
335 func (r *StatusREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
336 return r.store.ConvertToTable(ctx, object, tableOptions)
337 }
338
339
340 type EphemeralContainersREST struct {
341 store *genericregistry.Store
342 }
343
344 var _ = rest.Patcher(&EphemeralContainersREST{})
345
346
347 func (r *EphemeralContainersREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
348 return r.store.Get(ctx, name, options)
349 }
350
351
352 func (r *EphemeralContainersREST) New() runtime.Object {
353 return &api.Pod{}
354 }
355
356
357 func (r *EphemeralContainersREST) Destroy() {
358
359
360 }
361
362
363 func (r *EphemeralContainersREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
364
365
366 return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
367 }
368
View as plain text