1
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23 "net"
24 "net/http"
25 "net/url"
26 "strconv"
27
28 "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/runtime"
31 utilnet "k8s.io/apimachinery/pkg/util/net"
32 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33 "k8s.io/apimachinery/pkg/util/sets"
34 genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
35 "k8s.io/apiserver/pkg/registry/generic"
36 genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
37 "k8s.io/apiserver/pkg/registry/rest"
38 "k8s.io/apiserver/pkg/util/dryrun"
39 "k8s.io/klog/v2"
40 api "k8s.io/kubernetes/pkg/apis/core"
41 "k8s.io/kubernetes/pkg/printers"
42 printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
43 printerstorage "k8s.io/kubernetes/pkg/printers/storage"
44 svcreg "k8s.io/kubernetes/pkg/registry/core/service"
45 "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
46 "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
47 netutil "k8s.io/utils/net"
48 "sigs.k8s.io/structured-merge-diff/v4/fieldpath"
49 )
50
51 type EndpointsStorage interface {
52 rest.Getter
53 rest.GracefulDeleter
54 }
55
56 type PodStorage interface {
57 rest.Getter
58 }
59
60 type REST struct {
61 *genericregistry.Store
62 primaryIPFamily api.IPFamily
63 secondaryIPFamily api.IPFamily
64 alloc Allocators
65 endpoints EndpointsStorage
66 pods PodStorage
67 proxyTransport http.RoundTripper
68 }
69
70 var (
71 _ rest.CategoriesProvider = &REST{}
72 _ rest.ShortNamesProvider = &REST{}
73 _ rest.StorageVersionProvider = &REST{}
74 _ rest.ResetFieldsStrategy = &REST{}
75 _ rest.Redirector = &REST{}
76 )
77
78
79 func NewREST(
80 optsGetter generic.RESTOptionsGetter,
81 serviceIPFamily api.IPFamily,
82 ipAllocs map[api.IPFamily]ipallocator.Interface,
83 portAlloc portallocator.Interface,
84 endpoints EndpointsStorage,
85 pods PodStorage,
86 proxyTransport http.RoundTripper) (*REST, *StatusREST, *svcreg.ProxyREST, error) {
87
88 store := &genericregistry.Store{
89 NewFunc: func() runtime.Object { return &api.Service{} },
90 NewListFunc: func() runtime.Object { return &api.ServiceList{} },
91 DefaultQualifiedResource: api.Resource("services"),
92 SingularQualifiedResource: api.Resource("service"),
93 ReturnDeletedObject: true,
94
95 CreateStrategy: svcreg.Strategy,
96 UpdateStrategy: svcreg.Strategy,
97 DeleteStrategy: svcreg.Strategy,
98 ResetFieldsStrategy: svcreg.Strategy,
99
100 TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
101 }
102 options := &generic.StoreOptions{RESTOptions: optsGetter}
103 if err := store.CompleteWithOptions(options); err != nil {
104 return nil, nil, nil, err
105 }
106
107 statusStore := *store
108 statusStore.UpdateStrategy = svcreg.StatusStrategy
109 statusStore.ResetFieldsStrategy = svcreg.StatusStrategy
110
111 var primaryIPFamily api.IPFamily = serviceIPFamily
112 var secondaryIPFamily api.IPFamily = ""
113 if len(ipAllocs) > 1 {
114 secondaryIPFamily = otherFamily(serviceIPFamily)
115 }
116 genericStore := &REST{
117 Store: store,
118 primaryIPFamily: primaryIPFamily,
119 secondaryIPFamily: secondaryIPFamily,
120 alloc: makeAlloc(serviceIPFamily, ipAllocs, portAlloc),
121 endpoints: endpoints,
122 pods: pods,
123 proxyTransport: proxyTransport,
124 }
125 store.Decorator = genericStore.defaultOnRead
126 store.AfterDelete = genericStore.afterDelete
127 store.BeginCreate = genericStore.beginCreate
128 store.BeginUpdate = genericStore.beginUpdate
129
130
131
132
133 statusStore.AfterDelete = genericStore.afterDelete
134
135 return genericStore, &StatusREST{store: &statusStore}, &svcreg.ProxyREST{Redirector: genericStore, ProxyTransport: proxyTransport}, nil
136 }
137
138
139
140 func otherFamily(fam api.IPFamily) api.IPFamily {
141 if fam == api.IPv4Protocol {
142 return api.IPv6Protocol
143 }
144 return api.IPv4Protocol
145 }
146
147 var (
148 _ rest.ShortNamesProvider = &REST{}
149 _ rest.CategoriesProvider = &REST{}
150 )
151
152
153 func (r *REST) ShortNames() []string {
154 return []string{"svc"}
155 }
156
157
158 func (r *REST) Categories() []string {
159 return []string{"all"}
160 }
161
162
163 func (r *REST) Destroy() {
164 r.Store.Destroy()
165 r.alloc.Destroy()
166 }
167
168
169 type StatusREST struct {
170 store *genericregistry.Store
171 }
172
173 func (r *StatusREST) New() runtime.Object {
174 return &api.Service{}
175 }
176
177
178 func (r *StatusREST) Destroy() {
179
180
181 }
182
183
184 func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
185 return r.store.Get(ctx, name, options)
186 }
187
188
189 func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
190
191
192 return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
193 }
194
195 func (r *StatusREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
196 return r.store.ConvertToTable(ctx, object, tableOptions)
197 }
198
199
200 func (r *StatusREST) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
201 return r.store.GetResetFields()
202 }
203
204
205
206
207
208
209
210
211
212
213
214
215
216 type Before struct {
217 *api.Service
218 }
219 type After struct {
220 *api.Service
221 }
222
223
224
225
226
227
228 func (r *REST) defaultOnRead(obj runtime.Object) {
229 switch s := obj.(type) {
230 case *api.Service:
231 r.defaultOnReadService(s)
232 case *api.ServiceList:
233 r.defaultOnReadServiceList(s)
234 default:
235
236
237 }
238 }
239
240
241 func (r *REST) defaultOnReadServiceList(serviceList *api.ServiceList) {
242 if serviceList == nil {
243 return
244 }
245
246 for i := range serviceList.Items {
247 r.defaultOnReadService(&serviceList.Items[i])
248 }
249 }
250
251
252 func (r *REST) defaultOnReadService(service *api.Service) {
253 if service == nil {
254 return
255 }
256
257
258
259 normalizeClusterIPs(After{service}, Before{nil})
260
261
262 r.defaultOnReadIPFamilies(service)
263
264
265
266
267 defaultOnReadInternalTrafficPolicy(service)
268 }
269
270 func defaultOnReadInternalTrafficPolicy(service *api.Service) {
271 if service.Spec.Type == api.ServiceTypeExternalName {
272 service.Spec.InternalTrafficPolicy = nil
273 }
274 }
275
276 func (r *REST) defaultOnReadIPFamilies(service *api.Service) {
277
278 if !needsClusterIP(service) {
279 return
280 }
281
282
283
284
285
286 if len(service.Spec.IPFamilies) > 0 {
287 return
288 }
289
290 singleStack := api.IPFamilyPolicySingleStack
291 requireDualStack := api.IPFamilyPolicyRequireDualStack
292
293 if service.Spec.ClusterIP == api.ClusterIPNone {
294
295 if len(service.Spec.Selector) == 0 {
296
297
298
299
300
301
302
303 service.Spec.IPFamilyPolicy = &requireDualStack
304 service.Spec.IPFamilies = []api.IPFamily{r.primaryIPFamily, otherFamily(r.primaryIPFamily)}
305 } else {
306
307 service.Spec.IPFamilyPolicy = &singleStack
308 service.Spec.IPFamilies = []api.IPFamily{r.primaryIPFamily}
309 }
310 } else {
311
312 service.Spec.IPFamilies = make([]api.IPFamily, len(service.Spec.ClusterIPs))
313 for idx, ip := range service.Spec.ClusterIPs {
314 if netutil.IsIPv6String(ip) {
315 service.Spec.IPFamilies[idx] = api.IPv6Protocol
316 } else {
317 service.Spec.IPFamilies[idx] = api.IPv4Protocol
318 }
319 }
320 if len(service.Spec.IPFamilies) == 1 {
321 service.Spec.IPFamilyPolicy = &singleStack
322 } else if len(service.Spec.IPFamilies) == 2 {
323
324 service.Spec.IPFamilyPolicy = &requireDualStack
325 }
326 }
327 }
328
329 func (r *REST) afterDelete(obj runtime.Object, options *metav1.DeleteOptions) {
330 svc := obj.(*api.Service)
331
332
333
334
335 r.defaultOnReadService(svc)
336
337
338 if !dryrun.IsDryRun(options.DryRun) {
339
340
341 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), svc.Namespace)
342
343
344
345
346 _, _, err := r.endpoints.Delete(ctx, svc.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{})
347 if err != nil && !errors.IsNotFound(err) {
348 klog.Errorf("delete service endpoints %s/%s failed: %v", svc.Name, svc.Namespace, err)
349 }
350
351 r.alloc.releaseAllocatedResources(svc)
352 }
353 }
354
355 func (r *REST) beginCreate(ctx context.Context, obj runtime.Object, options *metav1.CreateOptions) (genericregistry.FinishFunc, error) {
356 svc := obj.(*api.Service)
357
358
359
360 normalizeClusterIPs(After{svc}, Before{nil})
361
362
363
364
365
366
367 txn, err := r.alloc.allocateCreate(svc, dryrun.IsDryRun(options.DryRun))
368 if err != nil {
369 return nil, err
370 }
371
372
373 finish := func(_ context.Context, success bool) {
374 if success {
375 txn.Commit()
376 } else {
377 txn.Revert()
378 }
379 }
380
381 return finish, nil
382 }
383
384 func (r *REST) beginUpdate(ctx context.Context, obj, oldObj runtime.Object, options *metav1.UpdateOptions) (genericregistry.FinishFunc, error) {
385 newSvc := obj.(*api.Service)
386 oldSvc := oldObj.(*api.Service)
387
388
389
390
391 r.defaultOnReadService(oldSvc)
392
393
394
395 patchAllocatedValues(After{newSvc}, Before{oldSvc})
396
397
398
399 normalizeClusterIPs(After{newSvc}, Before{oldSvc})
400
401
402 txn, err := r.alloc.allocateUpdate(After{newSvc}, Before{oldSvc}, dryrun.IsDryRun(options.DryRun))
403 if err != nil {
404 return nil, err
405 }
406
407
408 finish := func(_ context.Context, success bool) {
409 if success {
410 txn.Commit()
411 } else {
412 txn.Revert()
413 }
414 }
415
416 return finish, nil
417 }
418
419
420 func (r *REST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.RoundTripper, error) {
421
422 svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
423 if !valid {
424 return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
425 }
426
427
428 if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
429 obj, err := r.Get(ctx, svcName, &metav1.GetOptions{})
430 if err != nil {
431 return nil, nil, err
432 }
433 svc := obj.(*api.Service)
434 found := false
435 for _, svcPort := range svc.Spec.Ports {
436 if int64(svcPort.Port) == portNum {
437
438 portStr = svcPort.Name
439 found = true
440 break
441 }
442 }
443 if !found {
444 return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
445 }
446 }
447
448 obj, err := r.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
449 if err != nil {
450 return nil, nil, err
451 }
452 eps := obj.(*api.Endpoints)
453 if len(eps.Subsets) == 0 {
454 return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
455 }
456
457 ssSeed := rand.Intn(len(eps.Subsets))
458
459 for ssi := 0; ssi < len(eps.Subsets); ssi++ {
460 ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
461 if len(ss.Addresses) == 0 {
462 continue
463 }
464 for i := range ss.Ports {
465 if ss.Ports[i].Name == portStr {
466 addrSeed := rand.Intn(len(ss.Addresses))
467
468
469
470
471 for try := 0; try < len(ss.Addresses); try++ {
472 addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
473
474 if err := isValidAddress(ctx, &addr, r.pods); err != nil {
475 utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
476 continue
477 }
478 ip := addr.IP
479 port := int(ss.Ports[i].Port)
480 return &url.URL{
481 Scheme: svcScheme,
482 Host: net.JoinHostPort(ip, strconv.Itoa(port)),
483 }, r.proxyTransport, nil
484 }
485 utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
486 }
487 }
488 }
489 return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
490 }
491
492 func isValidAddress(ctx context.Context, addr *api.EndpointAddress, pods rest.Getter) error {
493 if addr.TargetRef == nil {
494 return fmt.Errorf("Address has no target ref, skipping: %v", addr)
495 }
496 if genericapirequest.NamespaceValue(ctx) != addr.TargetRef.Namespace {
497 return fmt.Errorf("Address namespace doesn't match context namespace")
498 }
499 obj, err := pods.Get(ctx, addr.TargetRef.Name, &metav1.GetOptions{})
500 if err != nil {
501 return err
502 }
503 pod, ok := obj.(*api.Pod)
504 if !ok {
505 return fmt.Errorf("failed to cast to pod: %v", obj)
506 }
507 if pod == nil {
508 return fmt.Errorf("pod is missing, skipping (%s/%s)", addr.TargetRef.Namespace, addr.TargetRef.Name)
509 }
510 for _, podIP := range pod.Status.PodIPs {
511 if podIP.IP == addr.IP {
512 return nil
513 }
514 }
515 return fmt.Errorf("pod ip(s) doesn't match endpoint ip, skipping: %v vs %s (%s/%s)", pod.Status.PodIPs, addr.IP, addr.TargetRef.Namespace, addr.TargetRef.Name)
516 }
517
518
519
520 func normalizeClusterIPs(after After, before Before) {
521 oldSvc, newSvc := before.Service, after.Service
522
523
524
525
526
527
528
529
530
531
532
533 if oldSvc == nil {
534
535
536
537 if len(newSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
538 newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
539 return
540 }
541
542
543
544
545
546
547 return
548 }
549
550
551
552
553
554 if len(oldSvc.Spec.ClusterIPs) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
555
556
557 if oldSvc.Spec.ClusterIP == newSvc.Spec.ClusterIP {
558 newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
559 }
560 }
561
562
563 if oldSvc.Spec.ClusterIP != newSvc.Spec.ClusterIP {
564
565 if len(oldSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIP) == 0 {
566
567 if sameClusterIPs(oldSvc, newSvc) {
568 newSvc.Spec.ClusterIPs = nil
569 }
570
571
572
573 } else {
574
575
576 if sameClusterIPs(oldSvc, newSvc) {
577 newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
578 }
579 }
580 }
581 }
582
583
584
585
586
587 func patchAllocatedValues(after After, before Before) {
588 oldSvc, newSvc := before.Service, after.Service
589
590 if needsClusterIP(oldSvc) && needsClusterIP(newSvc) {
591 if newSvc.Spec.ClusterIP == "" {
592 newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP
593 }
594 if len(newSvc.Spec.ClusterIPs) == 0 && len(oldSvc.Spec.ClusterIPs) > 0 {
595 newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
596 }
597 }
598
599 if needsNodePort(oldSvc) && needsNodePort(newSvc) {
600 nodePortsUsed := func(svc *api.Service) sets.Int32 {
601 used := sets.NewInt32()
602 for _, p := range svc.Spec.Ports {
603 if p.NodePort != 0 {
604 used.Insert(p.NodePort)
605 }
606 }
607 return used
608 }
609
610
611
612 used := nodePortsUsed(oldSvc).Intersection(nodePortsUsed(newSvc))
613
614
615
616 np := map[string]int32{}
617 for i := range oldSvc.Spec.Ports {
618 p := &oldSvc.Spec.Ports[i]
619 np[p.Name] = p.NodePort
620 }
621
622
623
624
625 for i := range newSvc.Spec.Ports {
626 p := &newSvc.Spec.Ports[i]
627 if p.NodePort == 0 {
628 oldVal := np[p.Name]
629 if !used.Has(oldVal) {
630 p.NodePort = oldVal
631 }
632 }
633 }
634 }
635
636 if needsHCNodePort(oldSvc) && needsHCNodePort(newSvc) {
637 if newSvc.Spec.HealthCheckNodePort == 0 {
638 newSvc.Spec.HealthCheckNodePort = oldSvc.Spec.HealthCheckNodePort
639 }
640 }
641 }
642
643 func needsClusterIP(svc *api.Service) bool {
644 if svc.Spec.Type == api.ServiceTypeExternalName {
645 return false
646 }
647 return true
648 }
649
650 func needsNodePort(svc *api.Service) bool {
651 if svc.Spec.Type == api.ServiceTypeNodePort {
652 return true
653 }
654 if svc.Spec.Type == api.ServiceTypeLoadBalancer &&
655 (svc.Spec.AllocateLoadBalancerNodePorts == nil || *svc.Spec.AllocateLoadBalancerNodePorts) {
656 return true
657 }
658 return false
659 }
660
661 func needsHCNodePort(svc *api.Service) bool {
662 if svc.Spec.Type != api.ServiceTypeLoadBalancer {
663 return false
664 }
665 if svc.Spec.ExternalTrafficPolicy != api.ServiceExternalTrafficPolicyLocal {
666 return false
667 }
668 return true
669 }
670
View as plain text