1
16
17 package sync
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "time"
24
25 "k8s.io/klog/v2"
26 netutils "k8s.io/utils/net"
27
28 v1 "k8s.io/api/core/v1"
29 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
30 )
31
32 const (
33
34
35 InvalidPodCIDR = "CloudCIDRAllocatorInvalidPodCIDR"
36
37
38 InvalidModeEvent = "CloudCIDRAllocatorInvalidMode"
39
40
41 MismatchEvent = "CloudCIDRAllocatorMismatch"
42 )
43
44
45 type cloudAlias interface {
46
47 Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error)
48
49 AddAlias(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error
50 }
51
52
53 type kubeAPI interface {
54
55 Node(ctx context.Context, name string) (*v1.Node, error)
56
57 UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error
58
59 UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error
60
61 EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{})
62 }
63
64
65 type controller interface {
66
67
68 ReportResult(err error)
69
70
71 ResyncTimeout() time.Duration
72 }
73
74
75 type NodeSyncMode string
76
77 var (
78
79
80 SyncFromCloud NodeSyncMode = "SyncFromCloud"
81
82
83 SyncFromCluster NodeSyncMode = "SyncFromCluster"
84 )
85
86
87 func IsValidMode(m NodeSyncMode) bool {
88 switch m {
89 case SyncFromCloud:
90 case SyncFromCluster:
91 default:
92 return false
93 }
94 return true
95 }
96
97
98 type NodeSync struct {
99 c controller
100 cloudAlias cloudAlias
101 kubeAPI kubeAPI
102 mode NodeSyncMode
103 nodeName string
104 opChan chan syncOp
105 set *cidrset.CidrSet
106 }
107
108
109 func New(c controller, cloudAlias cloudAlias, kubeAPI kubeAPI, mode NodeSyncMode, nodeName string, set *cidrset.CidrSet) *NodeSync {
110 return &NodeSync{
111 c: c,
112 cloudAlias: cloudAlias,
113 kubeAPI: kubeAPI,
114 mode: mode,
115 nodeName: nodeName,
116 opChan: make(chan syncOp, 1),
117 set: set,
118 }
119 }
120
121
122
123 func (sync *NodeSync) Loop(logger klog.Logger, done chan struct{}) {
124 logger.V(2).Info("Starting sync loop", "node", klog.KRef("", sync.nodeName))
125
126 defer func() {
127 if done != nil {
128 close(done)
129 }
130 }()
131
132 timeout := sync.c.ResyncTimeout()
133 delayTimer := time.NewTimer(timeout)
134 logger.V(4).Info("Try to resync node later", "node", klog.KRef("", sync.nodeName), "resyncTime", timeout)
135
136 for {
137 select {
138 case op, more := <-sync.opChan:
139 if !more {
140 logger.V(2).Info("Stopping sync loop")
141 return
142 }
143 sync.c.ReportResult(op.run(logger, sync))
144 if !delayTimer.Stop() {
145 <-delayTimer.C
146 }
147 case <-delayTimer.C:
148 logger.V(4).Info("Running resync", "node", klog.KRef("", sync.nodeName))
149 sync.c.ReportResult((&updateOp{}).run(logger, sync))
150 }
151
152 timeout := sync.c.ResyncTimeout()
153 delayTimer.Reset(timeout)
154 logger.V(4).Info("Try to resync node later", "node", klog.KRef("", sync.nodeName), "resyncTime", timeout)
155 }
156 }
157
158
159
160
161
162 func (sync *NodeSync) Update(node *v1.Node) {
163 sync.opChan <- &updateOp{node}
164 }
165
166
167
168
169
170 func (sync *NodeSync) Delete(node *v1.Node) {
171 sync.opChan <- &deleteOp{node}
172 close(sync.opChan)
173 }
174
175
176 type syncOp interface {
177
178 run(logger klog.Logger, sync *NodeSync) error
179 }
180
181
182 type updateOp struct {
183 node *v1.Node
184 }
185
186 func (op *updateOp) String() string {
187 if op.node == nil {
188 return fmt.Sprintf("updateOp(nil)")
189 }
190 return fmt.Sprintf("updateOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR)
191 }
192
193 func (op *updateOp) run(logger klog.Logger, sync *NodeSync) error {
194 logger.V(3).Info("Running updateOp", "updateOp", op)
195
196 ctx := context.Background()
197
198 if op.node == nil {
199 logger.V(3).Info("Getting node spec", "node", klog.KRef("", sync.nodeName))
200 node, err := sync.kubeAPI.Node(ctx, sync.nodeName)
201 if err != nil {
202 logger.Error(err, "Error getting node pec", "node", klog.KRef("", sync.nodeName))
203 return err
204 }
205 op.node = node
206 }
207
208 aliasRange, err := sync.cloudAlias.Alias(ctx, op.node)
209 if err != nil {
210 logger.Error(err, "Error getting cloud alias for node", "node", klog.KRef("", sync.nodeName))
211 return err
212 }
213
214 switch {
215 case op.node.Spec.PodCIDR == "" && aliasRange == nil:
216 err = op.allocateRange(ctx, sync, op.node)
217 case op.node.Spec.PodCIDR == "" && aliasRange != nil:
218 err = op.updateNodeFromAlias(ctx, sync, op.node, aliasRange)
219 case op.node.Spec.PodCIDR != "" && aliasRange == nil:
220 err = op.updateAliasFromNode(ctx, sync, op.node)
221 case op.node.Spec.PodCIDR != "" && aliasRange != nil:
222 err = op.validateRange(ctx, sync, op.node, aliasRange)
223 }
224
225 return err
226 }
227
228
229
230 func (op *updateOp) validateRange(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error {
231 if node.Spec.PodCIDR != aliasRange.String() {
232 klog.FromContext(ctx).Error(nil, "Inconsistency detected between node PodCIDR and node alias", "podCIDR", node.Spec.PodCIDR, "alias", aliasRange)
233 sync.kubeAPI.EmitNodeWarningEvent(node.Name, MismatchEvent,
234 "Node.Spec.PodCIDR != cloud alias (%v != %v)", node.Spec.PodCIDR, aliasRange)
235
236
237 } else {
238 klog.FromContext(ctx).V(4).Info("Node CIDR range is matches cloud assignment", "node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
239 }
240 return nil
241 }
242
243
244
245 func (op *updateOp) updateNodeFromAlias(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error {
246 if sync.mode != SyncFromCloud {
247 sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent,
248 "Cannot sync from cloud in mode %q", sync.mode)
249 return fmt.Errorf("cannot sync from cloud in mode %q", sync.mode)
250 }
251 logger := klog.FromContext(ctx)
252 logger.V(2).Info("Updating node spec with alias range", "podCIDR", aliasRange)
253
254 if err := sync.set.Occupy(aliasRange); err != nil {
255 logger.Error(nil, "Error occupying range for node", "node", klog.KRef("", sync.nodeName), "alias", aliasRange)
256 return err
257 }
258
259 if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, aliasRange); err != nil {
260 logger.Error(err, "Could not update node PodCIDR", "node", klog.KObj(node), "podCIDR", aliasRange)
261 return err
262 }
263
264 logger.V(2).Info("Node PodCIDR updated", "node", klog.KObj(node), "podCIDR", aliasRange)
265
266 if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
267 logger.Error(err, "Could not update node NetworkUnavailable status to false")
268 return err
269 }
270
271 logger.V(2).Info("Updated node PodCIDR from cloud alias", "node", klog.KObj(node), "alias", aliasRange)
272
273 return nil
274 }
275
276
277 func (op *updateOp) updateAliasFromNode(ctx context.Context, sync *NodeSync, node *v1.Node) error {
278 if sync.mode != SyncFromCluster {
279 sync.kubeAPI.EmitNodeWarningEvent(
280 node.Name, InvalidModeEvent, "Cannot sync to cloud in mode %q", sync.mode)
281 return fmt.Errorf("cannot sync to cloud in mode %q", sync.mode)
282 }
283
284 _, aliasRange, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
285
286 logger := klog.FromContext(ctx)
287 if err != nil {
288 logger.Error(err, "Could not parse PodCIDR for node", "node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
289 return err
290 }
291
292 if err := sync.set.Occupy(aliasRange); err != nil {
293 logger.Error(nil, "Error occupying range for node", "node", klog.KRef("", sync.nodeName), "alias", aliasRange)
294 return err
295 }
296
297 if err := sync.cloudAlias.AddAlias(ctx, node, aliasRange); err != nil {
298 logger.Error(err, "Could not add alias for node", "node", klog.KObj(node), "alias", aliasRange)
299 return err
300 }
301
302 if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
303 logger.Error(err, "Could not update node NetworkUnavailable status to false")
304 return err
305 }
306
307 logger.V(2).Info("Updated node cloud alias with node spec", "node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
308
309 return nil
310 }
311
312
313
314 func (op *updateOp) allocateRange(ctx context.Context, sync *NodeSync, node *v1.Node) error {
315 if sync.mode != SyncFromCluster {
316 sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent,
317 "Cannot allocate CIDRs in mode %q", sync.mode)
318 return fmt.Errorf("controller cannot allocate CIDRS in mode %q", sync.mode)
319 }
320
321 cidrRange, err := sync.set.AllocateNext()
322 if err != nil {
323 return err
324 }
325
326
327
328 logger := klog.FromContext(ctx)
329 if err := sync.cloudAlias.AddAlias(ctx, node, cidrRange); err != nil {
330 logger.Error(err, "Could not add alias for node", "node", klog.KObj(node), "alias", cidrRange)
331 return err
332 }
333
334 if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, cidrRange); err != nil {
335 logger.Error(err, "Could not update node PodCIDR", "node", klog.KObj(node), "podCIDR", cidrRange)
336 return err
337 }
338
339 if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
340 logger.Error(err, "Could not update node NetworkUnavailable status to false")
341 return err
342 }
343
344 logger.V(2).Info("Allocated PodCIDR for node", "node", klog.KObj(node), "podCIDR", cidrRange)
345
346 return nil
347 }
348
349
350 type deleteOp struct {
351 node *v1.Node
352 }
353
354 func (op *deleteOp) String() string {
355 if op.node == nil {
356 return fmt.Sprintf("deleteOp(nil)")
357 }
358 return fmt.Sprintf("deleteOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR)
359 }
360
361 func (op *deleteOp) run(logger klog.Logger, sync *NodeSync) error {
362 logger.V(3).Info("Running deleteOp", "deleteOp", op)
363 if op.node.Spec.PodCIDR == "" {
364 logger.V(2).Info("Node was deleted, node had no PodCIDR range assigned", "node", klog.KObj(op.node))
365 return nil
366 }
367
368 _, cidrRange, err := netutils.ParseCIDRSloppy(op.node.Spec.PodCIDR)
369 if err != nil {
370 logger.Error(err, "Deleted node has an invalid podCIDR", "node", klog.KObj(op.node), "podCIDR", op.node.Spec.PodCIDR)
371 sync.kubeAPI.EmitNodeWarningEvent(op.node.Name, InvalidPodCIDR,
372 "Node %q has an invalid PodCIDR: %q", op.node.Name, op.node.Spec.PodCIDR)
373 return nil
374 }
375
376 sync.set.Release(cidrRange)
377 logger.V(2).Info("Node was deleted, releasing CIDR range", "node", klog.KObj(op.node), "podCIDR", op.node.Spec.PodCIDR)
378
379 return nil
380 }
381
View as plain text