...
1
16
17 package ipam
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "time"
24
25 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
26
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/fields"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/util/wait"
32 informers "k8s.io/client-go/informers/core/v1"
33 clientset "k8s.io/client-go/kubernetes"
34 cloudprovider "k8s.io/cloud-provider"
35 "k8s.io/klog/v2"
36 )
37
38
39 type CIDRAllocatorType string
40
41 const (
42
43
44 RangeAllocatorType CIDRAllocatorType = "RangeAllocator"
45
46
47 CloudAllocatorType CIDRAllocatorType = "CloudAllocator"
48
49
50 IPAMFromClusterAllocatorType = "IPAMFromCluster"
51
52
53 IPAMFromCloudAllocatorType = "IPAMFromCloud"
54 )
55
56
57 const (
58
59 apiserverStartupGracePeriod = 10 * time.Minute
60
61
62 cidrUpdateWorkers = 30
63
64
65 cidrUpdateQueueSize = 5000
66
67
68 cidrUpdateRetries = 3
69
70
71 updateRetryTimeout = 250 * time.Millisecond
72
73
74 maxUpdateRetryTimeout = 5 * time.Second
75
76
77 updateMaxRetries = 10
78 )
79
80
81
82 var nodePollInterval = 10 * time.Second
83
84
85
86 type CIDRAllocator interface {
87
88
89
90 AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error
91
92 ReleaseCIDR(logger klog.Logger, node *v1.Node) error
93
94 Run(ctx context.Context)
95 }
96
97
98
99 type CIDRAllocatorParams struct {
100
101 ClusterCIDRs []*net.IPNet
102
103 ServiceCIDR *net.IPNet
104
105 SecondaryServiceCIDR *net.IPNet
106
107 NodeCIDRMaskSizes []int
108 }
109
110
111
112 type nodeReservedCIDRs struct {
113 allocatedCIDRs []*net.IPNet
114 nodeName string
115 }
116
117
118 func New(ctx context.Context, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
119 logger := klog.FromContext(ctx)
120 nodeList, err := listNodes(logger, kubeClient)
121 if err != nil {
122 return nil, err
123 }
124
125 switch allocatorType {
126 case RangeAllocatorType:
127 return NewCIDRRangeAllocator(ctx, kubeClient, nodeInformer, allocatorParams, nodeList)
128 case CloudAllocatorType:
129 return NewCloudCIDRAllocator(ctx, kubeClient, cloud, nodeInformer)
130 default:
131 return nil, fmt.Errorf("invalid CIDR allocator type: %v", allocatorType)
132 }
133 }
134
135 func listNodes(logger klog.Logger, kubeClient clientset.Interface) (*v1.NodeList, error) {
136 var nodeList *v1.NodeList
137
138
139 if pollErr := wait.Poll(nodePollInterval, apiserverStartupGracePeriod, func() (bool, error) {
140 var err error
141 nodeList, err = kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
142 FieldSelector: fields.Everything().String(),
143 LabelSelector: labels.Everything().String(),
144 })
145 if err != nil {
146 logger.Error(err, "Failed to list all nodes")
147 return false, nil
148 }
149 return true, nil
150 }); pollErr != nil {
151 return nil, fmt.Errorf("failed to list all nodes in %v, cannot proceed without updating CIDR map",
152 apiserverStartupGracePeriod)
153 }
154 return nodeList, nil
155 }
156
157
158 func ipnetToStringList(inCIDRs []*net.IPNet) []string {
159 outCIDRs := make([]string, len(inCIDRs))
160 for idx, inCIDR := range inCIDRs {
161 outCIDRs[idx] = inCIDR.String()
162 }
163 return outCIDRs
164 }
165
166
167
168 func occupyServiceCIDR(set *cidrset.CidrSet, clusterCIDR, serviceCIDR *net.IPNet) error {
169 if clusterCIDR.Contains(serviceCIDR.IP) || serviceCIDR.Contains(clusterCIDR.IP) {
170 if err := set.Occupy(serviceCIDR); err != nil {
171 return err
172 }
173 }
174 return nil
175 }
176
View as plain text