1
16
17 package storage
18
19 import (
20 "fmt"
21 "net"
22
23 "k8s.io/apimachinery/pkg/api/errors"
24 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25 "k8s.io/apimachinery/pkg/util/validation/field"
26 utilfeature "k8s.io/apiserver/pkg/util/feature"
27 "k8s.io/klog/v2"
28 apiservice "k8s.io/kubernetes/pkg/api/service"
29 api "k8s.io/kubernetes/pkg/apis/core"
30 "k8s.io/kubernetes/pkg/apis/core/validation"
31 "k8s.io/kubernetes/pkg/features"
32 "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
33 "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
34 netutils "k8s.io/utils/net"
35 )
36
37
38
39 type Allocators struct {
40 serviceIPAllocatorsByFamily map[api.IPFamily]ipallocator.Interface
41 defaultServiceIPFamily api.IPFamily
42 serviceNodePorts portallocator.Interface
43 }
44
45
46 type ServiceNodePort struct {
47
48 Protocol api.Protocol
49
50
51
52 NodePort int32
53 }
54
55
56 func makeAlloc(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) Allocators {
57 return Allocators{
58 defaultServiceIPFamily: defaultFamily,
59 serviceIPAllocatorsByFamily: ipAllocs,
60 serviceNodePorts: portAlloc,
61 }
62 }
63
64 func (al *Allocators) allocateCreate(service *api.Service, dryRun bool) (transaction, error) {
65 result := metaTransaction{}
66 success := false
67
68 defer func() {
69 if !success {
70 result.Revert()
71 }
72 }()
73
74
75
76 if err := al.initIPFamilyFields(After{service}, Before{nil}); err != nil {
77 return nil, err
78 }
79
80
81
82
83
84 if txn, err := al.txnAllocClusterIPs(service, dryRun); err != nil {
85 return nil, err
86 } else {
87 result = append(result, txn)
88 }
89
90
91 if txn, err := al.txnAllocNodePorts(service, dryRun); err != nil {
92 return nil, err
93 } else {
94 result = append(result, txn)
95 }
96
97 success = true
98 return result, nil
99 }
100
101
102
103 func (al *Allocators) initIPFamilyFields(after After, before Before) error {
104 oldService, service := before.Service, after.Service
105
106
107 if service.Spec.Type == api.ServiceTypeExternalName {
108 return nil
109 }
110
111
112
113
114
115
116
117
118
119 if isMatchingPreferDualStackClusterIPFields(after, before) {
120 return nil
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135 if service.Spec.IPFamilyPolicy == nil {
136 if oldService != nil && oldService.Spec.IPFamilyPolicy != nil {
137
138 service.Spec.IPFamilyPolicy = oldService.Spec.IPFamilyPolicy
139 } else if service.Spec.ClusterIP == api.ClusterIPNone && len(service.Spec.Selector) == 0 {
140
141 requireDualStack := api.IPFamilyPolicyRequireDualStack
142 service.Spec.IPFamilyPolicy = &requireDualStack
143 } else {
144
145
146 singleStack := api.IPFamilyPolicySingleStack
147 service.Spec.IPFamilyPolicy = &singleStack
148 }
149 }
150
151
152
153
154
155
156 if el := validation.ValidateServiceClusterIPsRelatedFields(service); len(el) != 0 {
157 return errors.NewInvalid(api.Kind("Service"), service.Name, el)
158 }
159
160
161 el := make(field.ErrorList, 0)
162
163
164 if oldService != nil {
165 if getIPFamilyPolicy(service) == api.IPFamilyPolicySingleStack {
166
167
168
169 if sameClusterIPs(oldService, service) && len(service.Spec.ClusterIPs) > 1 {
170 service.Spec.ClusterIPs = service.Spec.ClusterIPs[0:1]
171 }
172 if sameIPFamilies(oldService, service) && len(service.Spec.IPFamilies) > 1 {
173 service.Spec.IPFamilies = service.Spec.IPFamilies[0:1]
174 }
175 } else {
176
177
178 if reducedClusterIPs(After{service}, Before{oldService}) {
179 el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
180 "must be 'SingleStack' to release the secondary cluster IP"))
181 }
182 if reducedIPFamilies(After{service}, Before{oldService}) {
183 el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
184 "must be 'SingleStack' to release the secondary IP family"))
185 }
186 }
187 }
188
189
190
191 if getIPFamilyPolicy(service) == api.IPFamilyPolicySingleStack {
192 if len(service.Spec.ClusterIPs) == 2 {
193 el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
194 "must be 'RequireDualStack' or 'PreferDualStack' when multiple cluster IPs are specified"))
195 }
196 if len(service.Spec.IPFamilies) == 2 {
197 el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
198 "must be 'RequireDualStack' or 'PreferDualStack' when multiple IP families are specified"))
199 }
200 }
201
202
203
204 for i, ip := range service.Spec.ClusterIPs {
205 if ip == api.ClusterIPNone {
206 break
207 }
208
209
210
211 fam := familyOf(ip)
212
213
214 if i >= len(service.Spec.IPFamilies) {
215
216
217
218 if _, found := al.serviceIPAllocatorsByFamily[fam]; !found {
219 el = append(el, field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs,
220 fmt.Sprintf("%s is not configured on this cluster", fam)))
221 } else {
222
223 service.Spec.IPFamilies = append(service.Spec.IPFamilies, fam)
224 }
225 }
226 }
227
228
229 if len(el) > 0 {
230 return errors.NewInvalid(api.Kind("Service"), service.Name, el)
231 }
232
233
234
235
236 if service.Spec.ClusterIP == api.ClusterIPNone && len(service.Spec.Selector) == 0 {
237
238
239 if len(service.Spec.IPFamilies) == 0 {
240 service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily}
241 }
242
243
244
245
246 if len(service.Spec.IPFamilies) < 2 {
247 if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack {
248
249 if service.Spec.IPFamilies[0] == api.IPv4Protocol {
250 service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
251 } else {
252 service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv4Protocol)
253 }
254 }
255 }
256
257
258 return nil
259 }
260
261
262
263
264
265
266 if getIPFamilyPolicy(service) == api.IPFamilyPolicyRequireDualStack {
267 if len(al.serviceIPAllocatorsByFamily) < 2 {
268 el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
269 "this cluster is not configured for dual-stack services"))
270 }
271 }
272
273
274 for i, ipFamily := range service.Spec.IPFamilies {
275 if _, found := al.serviceIPAllocatorsByFamily[ipFamily]; !found {
276 el = append(el, field.Invalid(field.NewPath("spec", "ipFamilies").Index(i), ipFamily, "not configured on this cluster"))
277 }
278 }
279
280
281 if len(el) > 0 {
282 return errors.NewInvalid(api.Kind("Service"), service.Name, el)
283 }
284
285
286 if len(service.Spec.IPFamilies) == 0 {
287 service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily}
288 }
289
290
291
292 if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack &&
293 len(service.Spec.IPFamilies) == 1 &&
294 len(al.serviceIPAllocatorsByFamily) == 2 {
295
296 if service.Spec.IPFamilies[0] == api.IPv4Protocol {
297 service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
298 } else if service.Spec.IPFamilies[0] == api.IPv6Protocol {
299 service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv4Protocol)
300 }
301 }
302
303 return nil
304 }
305
306 func (al *Allocators) txnAllocClusterIPs(service *api.Service, dryRun bool) (transaction, error) {
307
308
309 allocated, err := al.allocClusterIPs(service, dryRun)
310 if err != nil {
311 return nil, err
312 }
313
314 txn := callbackTransaction{
315 revert: func() {
316 if dryRun {
317 return
318 }
319 actuallyReleased, err := al.releaseIPs(allocated)
320 if err != nil {
321 klog.ErrorS(err, "failed to clean up after failed service create",
322 "service", klog.KObj(service),
323 "shouldRelease", allocated,
324 "released", actuallyReleased)
325 }
326 },
327 commit: func() {
328 if !dryRun {
329 if len(allocated) > 0 {
330 klog.InfoS("allocated clusterIPs",
331 "service", klog.KObj(service),
332 "clusterIPs", allocated)
333 }
334 }
335 },
336 }
337 return txn, nil
338 }
339
340
341 func (al *Allocators) allocClusterIPs(service *api.Service, dryRun bool) (map[api.IPFamily]string, error) {
342
343 if service.Spec.Type == api.ServiceTypeExternalName {
344 return nil, nil
345 }
346
347
348 if len(service.Spec.ClusterIPs) > 0 && service.Spec.ClusterIPs[0] == api.ClusterIPNone {
349 return nil, nil
350 }
351
352 toAlloc := make(map[api.IPFamily]string)
353
354
355
356
357
358
359 if service.Spec.ClusterIPs == nil {
360 service.Spec.ClusterIPs = make([]string, 0, len(service.Spec.IPFamilies))
361 }
362
363 for i, ipFamily := range service.Spec.IPFamilies {
364 if i > (len(service.Spec.ClusterIPs) - 1) {
365 service.Spec.ClusterIPs = append(service.Spec.ClusterIPs, "" )
366 }
367
368 toAlloc[ipFamily] = service.Spec.ClusterIPs[i]
369 }
370
371
372 allocated, err := al.allocIPs(service, toAlloc, dryRun)
373
374
375 if err == nil {
376 for family, ip := range allocated {
377 for i, check := range service.Spec.IPFamilies {
378 if family == check {
379 service.Spec.ClusterIPs[i] = ip
380
381
382
383
384
385 if i == 0 {
386 service.Spec.ClusterIP = ip
387 }
388 }
389 }
390 }
391 }
392
393 return allocated, err
394 }
395
396 func (al *Allocators) allocIPs(service *api.Service, toAlloc map[api.IPFamily]string, dryRun bool) (map[api.IPFamily]string, error) {
397 allocated := make(map[api.IPFamily]string)
398
399 for family, ip := range toAlloc {
400 allocator := al.serviceIPAllocatorsByFamily[family]
401 if dryRun {
402 allocator = allocator.DryRun()
403 }
404 if ip == "" {
405 var allocatedIP net.IP
406 var err error
407 if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
408
409 svcAllocator, ok := allocator.(*ipallocator.MetaAllocator)
410 if ok {
411 allocatedIP, err = svcAllocator.AllocateNextService(service)
412 } else {
413 allocatedIP, err = allocator.AllocateNext()
414 }
415 } else {
416 allocatedIP, err = allocator.AllocateNext()
417 }
418 if err != nil {
419 return allocated, errors.NewInternalError(fmt.Errorf("failed to allocate a serviceIP: %v", err))
420 }
421 allocated[family] = allocatedIP.String()
422 } else {
423 parsedIP := netutils.ParseIPSloppy(ip)
424 if parsedIP == nil {
425 return allocated, errors.NewInternalError(fmt.Errorf("failed to parse service IP %q", ip))
426 }
427 var err error
428 if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
429
430 svcAllocator, ok := allocator.(*ipallocator.MetaAllocator)
431 if ok {
432 err = svcAllocator.AllocateService(service, parsedIP)
433 } else {
434 err = allocator.Allocate(parsedIP)
435 }
436 } else {
437 err = allocator.Allocate(parsedIP)
438 }
439 if err != nil {
440 el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs"), service.Spec.ClusterIPs, fmt.Sprintf("failed to allocate IP %v: %v", ip, err))}
441 return allocated, errors.NewInvalid(api.Kind("Service"), service.Name, el)
442 }
443 allocated[family] = ip
444 }
445 }
446 return allocated, nil
447 }
448
449
450 func (al *Allocators) releaseIPs(toRelease map[api.IPFamily]string) (map[api.IPFamily]string, error) {
451 if toRelease == nil {
452 return nil, nil
453 }
454
455 released := make(map[api.IPFamily]string)
456 for family, ip := range toRelease {
457 allocator, ok := al.serviceIPAllocatorsByFamily[family]
458 if !ok {
459
460
461 klog.InfoS("Not releasing ClusterIP because related family is not enabled", "clusterIP", ip, "family", family)
462 continue
463 }
464
465 parsedIP := netutils.ParseIPSloppy(ip)
466 if parsedIP == nil {
467 return released, errors.NewInternalError(fmt.Errorf("failed to parse service IP %q", ip))
468 }
469 if err := allocator.Release(parsedIP); err != nil {
470 return released, err
471 }
472 released[family] = ip
473 }
474
475 return released, nil
476 }
477
478 func (al *Allocators) txnAllocNodePorts(service *api.Service, dryRun bool) (transaction, error) {
479
480 nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun)
481
482 txn := callbackTransaction{
483 commit: func() {
484 nodePortOp.Commit()
485
486
487 nodePortOp.Finish()
488 },
489 revert: func() {
490
491 nodePortOp.Finish()
492 },
493 }
494
495
496 if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
497 if err := initNodePorts(service, nodePortOp); err != nil {
498 txn.Revert()
499 return nil, err
500 }
501 }
502
503
504 if apiservice.NeedsHealthCheck(service) {
505 if err := al.allocHealthCheckNodePort(service, nodePortOp); err != nil {
506 txn.Revert()
507 return nil, errors.NewInternalError(err)
508 }
509 }
510
511 return txn, nil
512 }
513
514 func initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
515 svcPortToNodePort := map[int]int{}
516 for i := range service.Spec.Ports {
517 servicePort := &service.Spec.Ports[i]
518 if servicePort.NodePort == 0 && !shouldAllocateNodePorts(service) {
519
520 continue
521 }
522 allocatedNodePort := svcPortToNodePort[int(servicePort.Port)]
523 if allocatedNodePort == 0 {
524
525
526
527 np := findRequestedNodePort(int(servicePort.Port), service.Spec.Ports)
528 if np != 0 {
529 err := nodePortOp.Allocate(np)
530 if err != nil {
531
532 el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), np, err.Error())}
533 return errors.NewInvalid(api.Kind("Service"), service.Name, el)
534 }
535 servicePort.NodePort = int32(np)
536 svcPortToNodePort[int(servicePort.Port)] = np
537 } else {
538 nodePort, err := nodePortOp.AllocateNext()
539 if err != nil {
540
541
542
543 return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
544 }
545 servicePort.NodePort = int32(nodePort)
546 svcPortToNodePort[int(servicePort.Port)] = nodePort
547 }
548 } else if int(servicePort.NodePort) != allocatedNodePort {
549
550
551 if servicePort.NodePort == 0 {
552 servicePort.NodePort = int32(allocatedNodePort)
553 } else {
554 err := nodePortOp.Allocate(int(servicePort.NodePort))
555 if err != nil {
556
557 el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), servicePort.NodePort, err.Error())}
558 return errors.NewInvalid(api.Kind("Service"), service.Name, el)
559 }
560 }
561 }
562 }
563
564 return nil
565 }
566
567
568 func (al *Allocators) allocHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
569 healthCheckNodePort := service.Spec.HealthCheckNodePort
570 if healthCheckNodePort != 0 {
571
572 err := nodePortOp.Allocate(int(healthCheckNodePort))
573 if err != nil {
574 return fmt.Errorf("failed to allocate requested HealthCheck NodePort %v: %v",
575 healthCheckNodePort, err)
576 }
577 } else {
578
579 healthCheckNodePort, err := nodePortOp.AllocateNext()
580 if err != nil {
581 return fmt.Errorf("failed to allocate a HealthCheck NodePort %v: %v", healthCheckNodePort, err)
582 }
583 service.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
584 }
585 return nil
586 }
587
588 func (al *Allocators) allocateUpdate(after After, before Before, dryRun bool) (transaction, error) {
589 result := metaTransaction{}
590 success := false
591
592 defer func() {
593 if !success {
594 result.Revert()
595 }
596 }()
597
598
599
600 if err := al.initIPFamilyFields(after, before); err != nil {
601 return nil, err
602 }
603
604
605
606
607
608 if txn, err := al.txnUpdateClusterIPs(after, before, dryRun); err != nil {
609 return nil, err
610 } else {
611 result = append(result, txn)
612 }
613
614
615 if txn, err := al.txnUpdateNodePorts(after, before, dryRun); err != nil {
616 return nil, err
617 } else {
618 result = append(result, txn)
619 }
620
621 success = true
622 return result, nil
623 }
624
625 func (al *Allocators) txnUpdateClusterIPs(after After, before Before, dryRun bool) (transaction, error) {
626 service := after.Service
627
628 allocated, released, err := al.updateClusterIPs(after, before, dryRun)
629 if err != nil {
630 return nil, err
631 }
632
633
634
635
636
637
638 txn := callbackTransaction{
639 commit: func() {
640 if dryRun {
641 return
642 }
643 if len(allocated) > 0 {
644 klog.InfoS("allocated clusterIPs",
645 "service", klog.KObj(service),
646 "clusterIPs", allocated)
647 }
648 if actuallyReleased, err := al.releaseIPs(released); err != nil {
649 klog.ErrorS(err, "failed to clean up after successful service update",
650 "service", klog.KObj(service),
651 "shouldRelease", released,
652 "released", actuallyReleased)
653 }
654 },
655 revert: func() {
656 if dryRun {
657 return
658 }
659 if actuallyReleased, err := al.releaseIPs(allocated); err != nil {
660 klog.ErrorS(err, "failed to clean up after failed service update",
661 "service", klog.KObj(service),
662 "shouldRelease", allocated,
663 "released", actuallyReleased)
664 }
665 },
666 }
667 return txn, nil
668 }
669
670
671
672
673
674 func (al *Allocators) updateClusterIPs(after After, before Before, dryRun bool) (allocated map[api.IPFamily]string, toRelease map[api.IPFamily]string, err error) {
675 oldService, service := before.Service, after.Service
676
677
678
679
680
681
682
683
684
685 if isMatchingPreferDualStackClusterIPFields(after, before) {
686 return allocated, toRelease, nil
687 }
688
689
690
691
692
693
694
695
696
697 if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
698 allocated, err := al.allocClusterIPs(service, dryRun)
699 return allocated, nil, err
700 }
701
702
703 if len(oldService.Spec.ClusterIPs) > 0 && oldService.Spec.ClusterIPs[0] == api.ClusterIPNone {
704 return nil, nil, nil
705 }
706
707
708
709 if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName {
710 toRelease = make(map[api.IPFamily]string)
711 for i, family := range oldService.Spec.IPFamilies {
712 toRelease[family] = oldService.Spec.ClusterIPs[i]
713 }
714 return nil, toRelease, nil
715 }
716
717 upgraded := len(oldService.Spec.IPFamilies) == 1 && len(service.Spec.IPFamilies) == 2
718 downgraded := len(oldService.Spec.IPFamilies) == 2 && len(service.Spec.IPFamilies) == 1
719
720
721 if upgraded {
722 toAllocate := make(map[api.IPFamily]string)
723
724 if len(service.Spec.ClusterIPs) < 2 {
725 service.Spec.ClusterIPs = append(service.Spec.ClusterIPs, "" )
726 }
727
728 toAllocate[service.Spec.IPFamilies[1]] = service.Spec.ClusterIPs[1]
729
730
731 allocated, err := al.allocIPs(service, toAllocate, dryRun)
732
733 if err == nil {
734 service.Spec.ClusterIPs[1] = allocated[service.Spec.IPFamilies[1]]
735 }
736
737 return allocated, nil, err
738 }
739
740
741 if downgraded {
742 toRelease = make(map[api.IPFamily]string)
743 toRelease[oldService.Spec.IPFamilies[1]] = oldService.Spec.ClusterIPs[1]
744
745 return nil, toRelease, err
746 }
747
748 return nil, nil, nil
749 }
750
751 func (al *Allocators) txnUpdateNodePorts(after After, before Before, dryRun bool) (transaction, error) {
752 oldService, service := before.Service, after.Service
753
754
755 nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun)
756
757 txn := callbackTransaction{
758 commit: func() {
759 nodePortOp.Commit()
760
761
762 nodePortOp.Finish()
763 },
764 revert: func() {
765
766 nodePortOp.Finish()
767 },
768 }
769
770
771 if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
772 (service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
773 al.releaseNodePorts(oldService, nodePortOp)
774 }
775
776
777 if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
778 if err := al.updateNodePorts(After{service}, Before{oldService}, nodePortOp); err != nil {
779 txn.Revert()
780 return nil, err
781 }
782 }
783
784
785 success, err := al.updateHealthCheckNodePort(After{service}, Before{oldService}, nodePortOp)
786 if !success || err != nil {
787 txn.Revert()
788 return nil, err
789 }
790
791 return txn, nil
792 }
793
794 func (al *Allocators) releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
795 nodePorts := collectServiceNodePorts(service)
796
797 for _, nodePort := range nodePorts {
798 nodePortOp.ReleaseDeferred(nodePort)
799 }
800 }
801
802 func (al *Allocators) updateNodePorts(after After, before Before, nodePortOp *portallocator.PortAllocationOperation) error {
803 oldService, newService := before.Service, after.Service
804
805 oldNodePortsNumbers := collectServiceNodePorts(oldService)
806 newNodePorts := []ServiceNodePort{}
807 portAllocated := map[int]bool{}
808
809 for i := range newService.Spec.Ports {
810 servicePort := &newService.Spec.Ports[i]
811 if servicePort.NodePort == 0 && !shouldAllocateNodePorts(newService) {
812
813 continue
814 }
815 nodePort := ServiceNodePort{Protocol: servicePort.Protocol, NodePort: servicePort.NodePort}
816 if nodePort.NodePort != 0 {
817 if !containsNumber(oldNodePortsNumbers, int(nodePort.NodePort)) && !portAllocated[int(nodePort.NodePort)] {
818 err := nodePortOp.Allocate(int(nodePort.NodePort))
819 if err != nil {
820 el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort.NodePort, err.Error())}
821 return errors.NewInvalid(api.Kind("Service"), newService.Name, el)
822 }
823 portAllocated[int(nodePort.NodePort)] = true
824 }
825 } else {
826 nodePortNumber, err := nodePortOp.AllocateNext()
827 if err != nil {
828
829
830
831 return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
832 }
833 servicePort.NodePort = int32(nodePortNumber)
834 nodePort.NodePort = servicePort.NodePort
835 }
836 if containsNodePort(newNodePorts, nodePort) {
837 return fmt.Errorf("duplicate nodePort: %v", nodePort)
838 }
839 newNodePorts = append(newNodePorts, nodePort)
840 }
841
842 newNodePortsNumbers := collectServiceNodePorts(newService)
843
844
845
846 for _, oldNodePortNumber := range oldNodePortsNumbers {
847 if containsNumber(newNodePortsNumbers, oldNodePortNumber) {
848 continue
849 }
850 nodePortOp.ReleaseDeferred(int(oldNodePortNumber))
851 }
852
853 return nil
854 }
855
856
857
858 func (al *Allocators) updateHealthCheckNodePort(after After, before Before, nodePortOp *portallocator.PortAllocationOperation) (bool, error) {
859 oldService, service := before.Service, after.Service
860
861 neededHealthCheckNodePort := apiservice.NeedsHealthCheck(oldService)
862 oldHealthCheckNodePort := oldService.Spec.HealthCheckNodePort
863
864 needsHealthCheckNodePort := apiservice.NeedsHealthCheck(service)
865
866 switch {
867
868
869
870 case !neededHealthCheckNodePort && needsHealthCheckNodePort:
871 if err := al.allocHealthCheckNodePort(service, nodePortOp); err != nil {
872 return false, errors.NewInternalError(err)
873 }
874
875
876
877 case neededHealthCheckNodePort && !needsHealthCheckNodePort:
878 nodePortOp.ReleaseDeferred(int(oldHealthCheckNodePort))
879 }
880 return true, nil
881 }
882
883 func (al *Allocators) releaseAllocatedResources(svc *api.Service) {
884 al.releaseClusterIPs(svc)
885
886 for _, nodePort := range collectServiceNodePorts(svc) {
887 err := al.serviceNodePorts.Release(nodePort)
888 if err != nil {
889
890 utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err))
891 }
892 }
893
894 if apiservice.NeedsHealthCheck(svc) {
895 nodePort := svc.Spec.HealthCheckNodePort
896 if nodePort > 0 {
897 err := al.serviceNodePorts.Release(int(nodePort))
898 if err != nil {
899
900 utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err))
901 }
902 }
903 }
904 }
905
906
907 func (al *Allocators) releaseClusterIPs(service *api.Service) (released map[api.IPFamily]string, err error) {
908
909 if service.Spec.Type == api.ServiceTypeExternalName {
910 return nil, nil
911 }
912
913
914 if len(service.Spec.ClusterIPs) > 0 && service.Spec.ClusterIPs[0] == api.ClusterIPNone {
915 return nil, nil
916 }
917
918 toRelease := make(map[api.IPFamily]string)
919 for _, ip := range service.Spec.ClusterIPs {
920 if netutils.IsIPv6String(ip) {
921 toRelease[api.IPv6Protocol] = ip
922 } else {
923 toRelease[api.IPv4Protocol] = ip
924 }
925 }
926 return al.releaseIPs(toRelease)
927 }
928
929 func (al *Allocators) Destroy() {
930 al.serviceNodePorts.Destroy()
931 for _, a := range al.serviceIPAllocatorsByFamily {
932 a.Destroy()
933 }
934 }
935
936
937
938 func containsNumber(haystack []int, needle int) bool {
939 for _, v := range haystack {
940 if v == needle {
941 return true
942 }
943 }
944 return false
945 }
946
947
948
949 func containsNodePort(serviceNodePorts []ServiceNodePort, serviceNodePort ServiceNodePort) bool {
950 for _, snp := range serviceNodePorts {
951 if snp == serviceNodePort {
952 return true
953 }
954 }
955 return false
956 }
957
958
959
960 func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
961 for i := range servicePorts {
962 servicePort := servicePorts[i]
963 if port == int(servicePort.Port) && servicePort.NodePort != 0 {
964 return int(servicePort.NodePort)
965 }
966 }
967 return 0
968 }
969
970 func shouldAllocateNodePorts(service *api.Service) bool {
971 if service.Spec.Type == api.ServiceTypeNodePort {
972 return true
973 }
974 if service.Spec.Type == api.ServiceTypeLoadBalancer {
975 return *service.Spec.AllocateLoadBalancerNodePorts
976 }
977 return false
978 }
979
980 func collectServiceNodePorts(service *api.Service) []int {
981 servicePorts := []int{}
982 for i := range service.Spec.Ports {
983 servicePort := &service.Spec.Ports[i]
984 if servicePort.NodePort != 0 {
985 servicePorts = append(servicePorts, int(servicePort.NodePort))
986 }
987 }
988 return servicePorts
989 }
990
991
992
993 func isMatchingPreferDualStackClusterIPFields(after After, before Before) bool {
994 oldService, service := before.Service, after.Service
995
996 if oldService == nil {
997 return false
998 }
999
1000 if service.Spec.IPFamilyPolicy == nil {
1001 return false
1002 }
1003
1004
1005
1006 if oldService.Spec.Type != service.Spec.Type {
1007 return false
1008 }
1009
1010 if service.Spec.Type != api.ServiceTypeClusterIP &&
1011 service.Spec.Type != api.ServiceTypeNodePort &&
1012 service.Spec.Type != api.ServiceTypeLoadBalancer {
1013 return false
1014 }
1015
1016
1017 if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicyPreferDualStack {
1018 return false
1019 }
1020
1021 if oldService.Spec.IPFamilyPolicy != nil && *(oldService.Spec.IPFamilyPolicy) != api.IPFamilyPolicyPreferDualStack {
1022 return false
1023 }
1024
1025 if !sameClusterIPs(oldService, service) {
1026 return false
1027 }
1028
1029 if !sameIPFamilies(oldService, service) {
1030 return false
1031 }
1032
1033
1034
1035
1036
1037 return true
1038 }
1039
1040
1041
1042 func getIPFamilyPolicy(svc *api.Service) api.IPFamilyPolicy {
1043 if svc.Spec.IPFamilyPolicy == nil {
1044 return ""
1045 }
1046 return *svc.Spec.IPFamilyPolicy
1047 }
1048
1049 func sameClusterIPs(lhs, rhs *api.Service) bool {
1050 if len(rhs.Spec.ClusterIPs) != len(lhs.Spec.ClusterIPs) {
1051 return false
1052 }
1053
1054 for i, ip := range rhs.Spec.ClusterIPs {
1055 if lhs.Spec.ClusterIPs[i] != ip {
1056 return false
1057 }
1058 }
1059
1060 return true
1061 }
1062
1063 func reducedClusterIPs(after After, before Before) bool {
1064 oldSvc, newSvc := before.Service, after.Service
1065
1066 if len(newSvc.Spec.ClusterIPs) == 0 {
1067 return false
1068 }
1069 return len(newSvc.Spec.ClusterIPs) < len(oldSvc.Spec.ClusterIPs)
1070 }
1071
1072 func sameIPFamilies(lhs, rhs *api.Service) bool {
1073 if len(rhs.Spec.IPFamilies) != len(lhs.Spec.IPFamilies) {
1074 return false
1075 }
1076
1077 for i, family := range rhs.Spec.IPFamilies {
1078 if lhs.Spec.IPFamilies[i] != family {
1079 return false
1080 }
1081 }
1082
1083 return true
1084 }
1085
1086 func reducedIPFamilies(after After, before Before) bool {
1087 oldSvc, newSvc := before.Service, after.Service
1088
1089 if len(newSvc.Spec.IPFamilies) == 0 {
1090 return false
1091 }
1092 return len(newSvc.Spec.IPFamilies) < len(oldSvc.Spec.IPFamilies)
1093 }
1094
1095
1096 func familyOf(ip string) api.IPFamily {
1097 if netutils.IsIPv4String(ip) {
1098 return api.IPv4Protocol
1099 }
1100 if netutils.IsIPv6String(ip) {
1101 return api.IPv6Protocol
1102 }
1103 return api.IPFamily("unknown")
1104 }
1105
View as plain text