1
2
3
4
19
20 package ipam
21
22 import (
23 "context"
24 "fmt"
25 "math/rand"
26 "net"
27 "sync"
28 "time"
29
30 "github.com/google/go-cmp/cmp"
31
32 "k8s.io/klog/v2"
33
34 v1 "k8s.io/api/core/v1"
35 "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/types"
38 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
39 informers "k8s.io/client-go/informers/core/v1"
40 corelisters "k8s.io/client-go/listers/core/v1"
41 "k8s.io/client-go/tools/cache"
42 "k8s.io/client-go/tools/record"
43
44 clientset "k8s.io/client-go/kubernetes"
45 "k8s.io/client-go/kubernetes/scheme"
46 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
47 cloudprovider "k8s.io/cloud-provider"
48 nodeutil "k8s.io/component-helpers/node/util"
49 controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
50 utiltaints "k8s.io/kubernetes/pkg/util/taints"
51 "k8s.io/legacy-cloud-providers/gce"
52 netutils "k8s.io/utils/net"
53 )
54
55
56 type nodeProcessingInfo struct {
57 retries int
58 }
59
60
61
62
63
64 type cloudCIDRAllocator struct {
65 client clientset.Interface
66 cloud *gce.Cloud
67
68
69
70 nodeLister corelisters.NodeLister
71
72 nodesSynced cache.InformerSynced
73
74
75
76
77
78 nodeUpdateChannel chan string
79 broadcaster record.EventBroadcaster
80 recorder record.EventRecorder
81
82
83 lock sync.Mutex
84 nodesInProcessing map[string]*nodeProcessingInfo
85 }
86
87 var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
88
89
90 func NewCloudCIDRAllocator(ctx context.Context, client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
91 logger := klog.FromContext(ctx)
92 if client == nil {
93 logger.Error(nil, "kubeClient is nil when starting cloud CIDR allocator")
94 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
95 }
96
97 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
98 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
99
100 gceCloud, ok := cloud.(*gce.Cloud)
101 if !ok {
102 err := fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName())
103 return nil, err
104 }
105
106 ca := &cloudCIDRAllocator{
107 client: client,
108 cloud: gceCloud,
109 nodeLister: nodeInformer.Lister(),
110 nodesSynced: nodeInformer.Informer().HasSynced,
111 nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
112 broadcaster: eventBroadcaster,
113 recorder: recorder,
114 nodesInProcessing: map[string]*nodeProcessingInfo{},
115 }
116
117 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
118 AddFunc: controllerutil.CreateAddNodeHandler(
119 func(node *v1.Node) error {
120 return ca.AllocateOrOccupyCIDR(logger, node)
121 }),
122 UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
123 if newNode.Spec.PodCIDR == "" {
124 return ca.AllocateOrOccupyCIDR(logger, newNode)
125 }
126
127
128 networkUnavailableTaint := &v1.Taint{Key: v1.TaintNodeNetworkUnavailable, Effect: v1.TaintEffectNoSchedule}
129 _, cond := controllerutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable)
130 if cond == nil || cond.Status != v1.ConditionFalse || utiltaints.TaintExists(newNode.Spec.Taints, networkUnavailableTaint) {
131 return ca.AllocateOrOccupyCIDR(logger, newNode)
132 }
133 return nil
134 }),
135 DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
136 return ca.ReleaseCIDR(logger, node)
137 }),
138 })
139 logger.Info("Using cloud CIDR allocator", "provider", cloud.ProviderName())
140 return ca, nil
141 }
142
143 func (ca *cloudCIDRAllocator) Run(ctx context.Context) {
144 defer utilruntime.HandleCrash()
145
146
147 ca.broadcaster.StartStructuredLogging(3)
148 logger := klog.FromContext(ctx)
149 logger.Info("Sending events to api server")
150 ca.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ca.client.CoreV1().Events("")})
151 defer ca.broadcaster.Shutdown()
152
153 logger.Info("Starting cloud CIDR allocator")
154 defer logger.Info("Shutting down cloud CIDR allocator")
155
156 if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), ca.nodesSynced) {
157 return
158 }
159
160 for i := 0; i < cidrUpdateWorkers; i++ {
161 go ca.worker(ctx)
162 }
163
164 <-ctx.Done()
165 }
166
167 func (ca *cloudCIDRAllocator) worker(ctx context.Context) {
168 logger := klog.FromContext(ctx)
169 for {
170 select {
171 case workItem, ok := <-ca.nodeUpdateChannel:
172 if !ok {
173 logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
174 return
175 }
176 if err := ca.updateCIDRAllocation(logger, workItem); err == nil {
177 logger.V(3).Info("Updated CIDR", "workItem", workItem)
178 } else {
179 logger.Error(err, "Error updating CIDR", "workItem", workItem)
180 if canRetry, timeout := ca.retryParams(logger, workItem); canRetry {
181 logger.V(2).Info("Retrying update on next period", "workItem", workItem, "timeout", timeout)
182 time.AfterFunc(timeout, func() {
183
184 ca.nodeUpdateChannel <- workItem
185 })
186 continue
187 }
188 logger.Error(nil, "Exceeded retry count, dropping from queue", "workItem", workItem)
189 }
190 ca.removeNodeFromProcessing(workItem)
191 case <-ctx.Done():
192 return
193 }
194 }
195 }
196
197 func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
198 ca.lock.Lock()
199 defer ca.lock.Unlock()
200 if _, found := ca.nodesInProcessing[nodeName]; found {
201 return false
202 }
203 ca.nodesInProcessing[nodeName] = &nodeProcessingInfo{}
204 return true
205 }
206
207 func (ca *cloudCIDRAllocator) retryParams(logger klog.Logger, nodeName string) (bool, time.Duration) {
208 ca.lock.Lock()
209 defer ca.lock.Unlock()
210
211 entry, ok := ca.nodesInProcessing[nodeName]
212 if !ok {
213 logger.Error(nil, "Cannot get retryParams for node as entry does not exist", "node", klog.KRef("", nodeName))
214 return false, 0
215 }
216
217 count := entry.retries + 1
218 if count > updateMaxRetries {
219 return false, 0
220 }
221 ca.nodesInProcessing[nodeName].retries = count
222
223 return true, nodeUpdateRetryTimeout(count)
224 }
225
226 func nodeUpdateRetryTimeout(count int) time.Duration {
227 timeout := updateRetryTimeout
228 for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ {
229 timeout *= 2
230 }
231 if timeout > maxUpdateRetryTimeout {
232 timeout = maxUpdateRetryTimeout
233 }
234 return time.Duration(timeout.Nanoseconds()/2 + rand.Int63n(timeout.Nanoseconds()))
235 }
236
237 func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
238 ca.lock.Lock()
239 defer ca.lock.Unlock()
240 delete(ca.nodesInProcessing, nodeName)
241 }
242
243
244
245
246 func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
247 if node == nil {
248 return nil
249 }
250 if !ca.insertNodeToProcessing(node.Name) {
251 logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node))
252 return nil
253 }
254
255 logger.V(4).Info("Putting node into the work queue", "node", klog.KObj(node))
256 ca.nodeUpdateChannel <- node.Name
257 return nil
258 }
259
260
261 func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName string) error {
262 node, err := ca.nodeLister.Get(nodeName)
263 if err != nil {
264 if errors.IsNotFound(err) {
265 return nil
266 }
267 logger.Error(err, "Failed while getting the node for updating Node.Spec.PodCIDR", "node", klog.KRef("", nodeName))
268 return err
269 }
270 if node.Spec.ProviderID == "" {
271 return fmt.Errorf("node %s doesn't have providerID", nodeName)
272 }
273
274 cidrStrings, err := ca.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
275 if err != nil {
276 controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
277 return fmt.Errorf("failed to get cidr(s) from provider: %v", err)
278 }
279 if len(cidrStrings) == 0 {
280 controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
281 return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
282 }
283
284 if len(cidrStrings) > 2 {
285 logger.Info("Got more than 2 ips, truncating to 2", "cidrStrings", cidrStrings)
286 cidrStrings = cidrStrings[:2]
287 }
288
289 cidrs, err := netutils.ParseCIDRs(cidrStrings)
290 if err != nil {
291 return fmt.Errorf("failed to parse strings %v as CIDRs: %v", cidrStrings, err)
292 }
293
294 needUpdate, err := needPodCIDRsUpdate(logger, node, cidrs)
295 if err != nil {
296 return fmt.Errorf("err: %v, CIDRS: %v", err, cidrStrings)
297 }
298 if needUpdate {
299 if node.Spec.PodCIDR != "" {
300 logger.Error(nil, "PodCIDR being reassigned", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs, "cidrStrings", cidrStrings)
301
302
303
304
305
306 }
307 for i := 0; i < cidrUpdateRetries; i++ {
308 if err = nodeutil.PatchNodeCIDRs(ca.client, types.NodeName(node.Name), cidrStrings); err == nil {
309 logger.Info("Set the node PodCIDRs", "node", klog.KObj(node), "cidrStrings", cidrStrings)
310 break
311 }
312 }
313 }
314 if err != nil {
315 controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRAssignmentFailed")
316 logger.Error(err, "Failed to update the node PodCIDR after multiple attempts", "node", klog.KObj(node), "cidrStrings", cidrStrings)
317 return err
318 }
319
320 err = nodeutil.SetNodeCondition(ca.client, types.NodeName(node.Name), v1.NodeCondition{
321 Type: v1.NodeNetworkUnavailable,
322 Status: v1.ConditionFalse,
323 Reason: "RouteCreated",
324 Message: "NodeController create implicit route",
325 LastTransitionTime: metav1.Now(),
326 })
327 if err != nil {
328 logger.Error(err, "Error setting route status for the node", "node", klog.KObj(node))
329 }
330 return err
331 }
332
333 func needPodCIDRsUpdate(logger klog.Logger, node *v1.Node, podCIDRs []*net.IPNet) (bool, error) {
334 if node.Spec.PodCIDR == "" {
335 return true, nil
336 }
337 _, nodePodCIDR, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
338 if err != nil {
339 logger.Error(err, "Found invalid node.Spec.PodCIDR", "podCIDR", node.Spec.PodCIDR)
340
341 return true, nil
342 }
343 nodePodCIDRs, err := netutils.ParseCIDRs(node.Spec.PodCIDRs)
344 if err != nil {
345 logger.Error(err, "Found invalid node.Spec.PodCIDRs", "podCIDRs", node.Spec.PodCIDRs)
346
347 return true, nil
348 }
349
350 if len(podCIDRs) == 1 {
351 if cmp.Equal(nodePodCIDR, podCIDRs[0]) {
352 logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "podCIDR", podCIDRs[0])
353 return false, nil
354 }
355 } else if len(nodePodCIDRs) == len(podCIDRs) {
356 if dualStack, _ := netutils.IsDualStackCIDRs(podCIDRs); !dualStack {
357 return false, fmt.Errorf("IPs are not dual stack")
358 }
359 for idx, cidr := range podCIDRs {
360 if !cmp.Equal(nodePodCIDRs[idx], cidr) {
361 return true, nil
362 }
363 }
364 logger.V(4).Info("Node already has allocated CIDRs. It matches the proposed one", "node", klog.KObj(node), "podCIDRs", podCIDRs)
365 return false, nil
366 }
367
368 return true, nil
369 }
370
371 func (ca *cloudCIDRAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
372 logger.V(2).Info("Node's PodCIDR will be released by external cloud provider (not managed by controller)",
373 "node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
374 return nil
375 }
376
View as plain text