1
16
17 package daemon
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "reflect"
24 "sort"
25
26 "k8s.io/klog/v2"
27
28 apps "k8s.io/api/apps/v1"
29 v1 "k8s.io/api/core/v1"
30 "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/labels"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/apimachinery/pkg/util/json"
36 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
37 "k8s.io/kubernetes/pkg/controller"
38 "k8s.io/kubernetes/pkg/controller/daemon/util"
39 labelsutil "k8s.io/kubernetes/pkg/util/labels"
40 )
41
42
43
44 func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
45 logger := klog.FromContext(ctx)
46 nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
47 if err != nil {
48 return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
49 }
50 maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
51 if err != nil {
52 return fmt.Errorf("couldn't get unavailable numbers: %v", err)
53 }
54
55 now := dsc.failedPodsBackoff.Clock.Now()
56
57
58
59
60
61
62
63
64
65
66
67
68
69 if maxSurge == 0 {
70 var numUnavailable int
71 var allowedReplacementPods []string
72 var candidatePodsToDelete []string
73 for nodeName, pods := range nodeToDaemonPods {
74 newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
75 if !ok {
76
77 logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
78 numUnavailable++
79 continue
80 }
81 switch {
82 case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
83
84 numUnavailable++
85 case newPod != nil:
86
87 if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
88
89 numUnavailable++
90 }
91 default:
92
93 switch {
94 case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
95
96 logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
97
98 if allowedReplacementPods == nil {
99 allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
100 }
101 allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
102 case numUnavailable >= maxUnavailable:
103
104 continue
105 default:
106 logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
107
108 if candidatePodsToDelete == nil {
109 candidatePodsToDelete = make([]string, 0, maxUnavailable)
110 }
111 candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
112 }
113 }
114 }
115
116
117 logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete))
118 remainingUnavailable := maxUnavailable - numUnavailable
119 if remainingUnavailable < 0 {
120 remainingUnavailable = 0
121 }
122 if max := len(candidatePodsToDelete); remainingUnavailable > max {
123 remainingUnavailable = max
124 }
125 oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)
126
127 return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 var oldPodsToDelete []string
144 var shouldNotRunPodsToDelete []string
145 var candidateNewNodes []string
146 var allowedNewNodes []string
147 var numSurge int
148 var numAvailable int
149
150 for nodeName, pods := range nodeToDaemonPods {
151 newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
152 if !ok {
153
154 logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
155 numSurge++
156 continue
157 }
158
159
160 if oldPod != nil {
161 if podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
162 numAvailable++
163 }
164 } else if newPod != nil {
165 if podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
166 numAvailable++
167 }
168 }
169
170 switch {
171 case oldPod == nil:
172
173 case newPod == nil:
174
175 switch {
176 case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
177 node, err := dsc.nodeLister.Get(nodeName)
178 if err != nil {
179 return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
180 }
181 if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
182 logger.V(5).Info("DaemonSet pod on node is not available and does not match scheduling constraints, remove old pod", "daemonset", klog.KObj(ds), "node", nodeName, "oldPod", klog.KObj(oldPod))
183 oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
184 continue
185 }
186
187 logger.V(5).Info("Pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
188
189 if allowedNewNodes == nil {
190 allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
191 }
192 allowedNewNodes = append(allowedNewNodes, nodeName)
193 default:
194 node, err := dsc.nodeLister.Get(nodeName)
195 if err != nil {
196 return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
197 }
198 if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
199 shouldNotRunPodsToDelete = append(shouldNotRunPodsToDelete, oldPod.Name)
200 continue
201 }
202 if numSurge >= maxSurge {
203
204 continue
205 }
206 logger.V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
207
208 if candidateNewNodes == nil {
209 candidateNewNodes = make([]string, 0, maxSurge)
210 }
211 candidateNewNodes = append(candidateNewNodes, nodeName)
212 }
213 default:
214
215 if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
216
217 numSurge++
218 continue
219 }
220
221 logger.V(5).Info("DaemonSet pod on node is available, remove old pod", "daemonset", klog.KObj(ds), "newPod", klog.KObj(newPod), "node", nodeName, "oldPod", klog.KObj(oldPod))
222 oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
223 }
224 }
225
226
227 logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedNewNodes), "maxSurge", maxSurge, "numSurge", numSurge, "candidates", len(candidateNewNodes))
228 remainingSurge := maxSurge - numSurge
229
230
231
232
233 if deletablePodsNumber := numAvailable - desiredNumberScheduled; deletablePodsNumber > 0 {
234 if shouldNotRunPodsToDeleteNumber := len(shouldNotRunPodsToDelete); deletablePodsNumber > shouldNotRunPodsToDeleteNumber {
235 deletablePodsNumber = shouldNotRunPodsToDeleteNumber
236 }
237 for _, podToDeleteName := range shouldNotRunPodsToDelete[:deletablePodsNumber] {
238 podToDelete, err := dsc.podLister.Pods(ds.Namespace).Get(podToDeleteName)
239 if err != nil {
240 if errors.IsNotFound(err) {
241 continue
242 }
243 return fmt.Errorf("couldn't get pod which should be deleted due to scheduling constraints %q: %v", podToDeleteName, err)
244 }
245 logger.V(5).Info("DaemonSet pod on node should be deleted due to scheduling constraints", "daemonset", klog.KObj(ds), "pod", klog.KObj(podToDelete), "node", podToDelete.Spec.NodeName)
246 oldPodsToDelete = append(oldPodsToDelete, podToDeleteName)
247 }
248 }
249
250 if remainingSurge < 0 {
251 remainingSurge = 0
252 }
253 if max := len(candidateNewNodes); remainingSurge > max {
254 remainingSurge = max
255 }
256 newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)
257
258 return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
259 }
260
261
262
263
264
265 func findUpdatedPodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) {
266 for _, pod := range podsOnNode {
267 if pod.DeletionTimestamp != nil {
268 continue
269 }
270 generation, err := util.GetTemplateGeneration(ds)
271 if err != nil {
272 generation = nil
273 }
274 if util.IsPodUpdated(pod, hash, generation) {
275 if newPod != nil {
276 return nil, nil, false
277 }
278 newPod = pod
279 } else {
280 if oldPod != nil {
281 return nil, nil, false
282 }
283 oldPod = pod
284 }
285 }
286 return newPod, oldPod, true
287 }
288
289
290
291
292 func (dsc *DaemonSetsController) constructHistory(ctx context.Context, ds *apps.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) {
293 var histories []*apps.ControllerRevision
294 var currentHistories []*apps.ControllerRevision
295 histories, err = dsc.controlledHistories(ctx, ds)
296 if err != nil {
297 return nil, nil, err
298 }
299 for _, history := range histories {
300
301
302 if _, ok := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; !ok {
303 toUpdate := history.DeepCopy()
304 toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name
305 history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{})
306 if err != nil {
307 return nil, nil, err
308 }
309 }
310
311 found := false
312 found, err = Match(ds, history)
313 if err != nil {
314 return nil, nil, err
315 }
316 if found {
317 currentHistories = append(currentHistories, history)
318 } else {
319 old = append(old, history)
320 }
321 }
322
323 currRevision := maxRevision(old) + 1
324 switch len(currentHistories) {
325 case 0:
326
327 cur, err = dsc.snapshot(ctx, ds, currRevision)
328 if err != nil {
329 return nil, nil, err
330 }
331 default:
332 cur, err = dsc.dedupCurHistories(ctx, ds, currentHistories)
333 if err != nil {
334 return nil, nil, err
335 }
336
337 if cur.Revision < currRevision {
338 toUpdate := cur.DeepCopy()
339 toUpdate.Revision = currRevision
340 _, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{})
341 if err != nil {
342 return nil, nil, err
343 }
344 }
345 }
346 return cur, old, err
347 }
348
349 func (dsc *DaemonSetsController) cleanupHistory(ctx context.Context, ds *apps.DaemonSet, old []*apps.ControllerRevision) error {
350
351 nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, true)
352 if err != nil {
353 return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
354 }
355
356 toKeep := int(*ds.Spec.RevisionHistoryLimit)
357 toKill := len(old) - toKeep
358 if toKill <= 0 {
359 return nil
360 }
361
362
363 liveHashes := make(map[string]bool)
364 for _, pods := range nodesToDaemonPods {
365 for _, pod := range pods {
366 if hash := pod.Labels[apps.DefaultDaemonSetUniqueLabelKey]; len(hash) > 0 {
367 liveHashes[hash] = true
368 }
369 }
370 }
371
372
373 sort.Sort(historiesByRevision(old))
374 for _, history := range old {
375 if toKill <= 0 {
376 break
377 }
378 if hash := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; liveHashes[hash] {
379 continue
380 }
381
382 err := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, history.Name, metav1.DeleteOptions{})
383 if err != nil {
384 return err
385 }
386 toKill--
387 }
388 return nil
389 }
390
391
392 func maxRevision(histories []*apps.ControllerRevision) int64 {
393 max := int64(0)
394 for _, history := range histories {
395 if history.Revision > max {
396 max = history.Revision
397 }
398 }
399 return max
400 }
401
402 func (dsc *DaemonSetsController) dedupCurHistories(ctx context.Context, ds *apps.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) {
403 if len(curHistories) == 1 {
404 return curHistories[0], nil
405 }
406 var maxRevision int64
407 var keepCur *apps.ControllerRevision
408 for _, cur := range curHistories {
409 if cur.Revision >= maxRevision {
410 keepCur = cur
411 maxRevision = cur.Revision
412 }
413 }
414
415 for _, cur := range curHistories {
416 if cur.Name == keepCur.Name {
417 continue
418 }
419
420 pods, err := dsc.getDaemonPods(ctx, ds)
421 if err != nil {
422 return nil, err
423 }
424 for _, pod := range pods {
425 if pod.Labels[apps.DefaultDaemonSetUniqueLabelKey] != keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey] {
426 patchRaw := map[string]interface{}{
427 "metadata": map[string]interface{}{
428 "labels": map[string]interface{}{
429 apps.DefaultDaemonSetUniqueLabelKey: keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey],
430 },
431 },
432 }
433 patchJson, err := json.Marshal(patchRaw)
434 if err != nil {
435 return nil, err
436 }
437 _, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Patch(ctx, pod.Name, types.MergePatchType, patchJson, metav1.PatchOptions{})
438 if err != nil {
439 return nil, err
440 }
441 }
442 }
443
444 err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, cur.Name, metav1.DeleteOptions{})
445 if err != nil {
446 return nil, err
447 }
448 }
449 return keepCur, nil
450 }
451
452
453
454
455
456 func (dsc *DaemonSetsController) controlledHistories(ctx context.Context, ds *apps.DaemonSet) ([]*apps.ControllerRevision, error) {
457 selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
458 if err != nil {
459 return nil, err
460 }
461
462
463
464 histories, err := dsc.historyLister.ControllerRevisions(ds.Namespace).List(labels.Everything())
465 if err != nil {
466 return nil, err
467 }
468
469
470 canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
471 fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{})
472 if err != nil {
473 return nil, err
474 }
475 if fresh.UID != ds.UID {
476 return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
477 }
478 return fresh, nil
479 })
480
481 cm := controller.NewControllerRevisionControllerRefManager(dsc.crControl, ds, selector, controllerKind, canAdoptFunc)
482 return cm.ClaimControllerRevisions(ctx, histories)
483 }
484
485
486 func Match(ds *apps.DaemonSet, history *apps.ControllerRevision) (bool, error) {
487 patch, err := getPatch(ds)
488 if err != nil {
489 return false, err
490 }
491 return bytes.Equal(patch, history.Data.Raw), nil
492 }
493
494
495
496
497
498 func getPatch(ds *apps.DaemonSet) ([]byte, error) {
499 dsBytes, err := json.Marshal(ds)
500 if err != nil {
501 return nil, err
502 }
503 var raw map[string]interface{}
504 err = json.Unmarshal(dsBytes, &raw)
505 if err != nil {
506 return nil, err
507 }
508 objCopy := make(map[string]interface{})
509 specCopy := make(map[string]interface{})
510
511
512 spec := raw["spec"].(map[string]interface{})
513 template := spec["template"].(map[string]interface{})
514 specCopy["template"] = template
515 template["$patch"] = "replace"
516 objCopy["spec"] = specCopy
517 patch, err := json.Marshal(objCopy)
518 return patch, err
519 }
520
521 func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSet, revision int64) (*apps.ControllerRevision, error) {
522 patch, err := getPatch(ds)
523 if err != nil {
524 return nil, err
525 }
526 hash := controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)
527 name := ds.Name + "-" + hash
528 history := &apps.ControllerRevision{
529 ObjectMeta: metav1.ObjectMeta{
530 Name: name,
531 Namespace: ds.Namespace,
532 Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, hash),
533 Annotations: ds.Annotations,
534 OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, controllerKind)},
535 },
536 Data: runtime.RawExtension{Raw: patch},
537 Revision: revision,
538 }
539
540 history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(ctx, history, metav1.CreateOptions{})
541 if outerErr := err; errors.IsAlreadyExists(outerErr) {
542 logger := klog.FromContext(ctx)
543
544 existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(ctx, name, metav1.GetOptions{})
545 if getErr != nil {
546 return nil, getErr
547 }
548
549 done, matchErr := Match(ds, existedHistory)
550 if matchErr != nil {
551 return nil, matchErr
552 }
553 if done {
554 return existedHistory, nil
555 }
556
557
558
559 currDS, getErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{})
560 if getErr != nil {
561 return nil, getErr
562 }
563
564 if !reflect.DeepEqual(currDS.Status.CollisionCount, ds.Status.CollisionCount) {
565 return nil, fmt.Errorf("found a stale collision count (%d, expected %d) of DaemonSet %q while processing; will retry until it is updated", ds.Status.CollisionCount, currDS.Status.CollisionCount, ds.Name)
566 }
567 if currDS.Status.CollisionCount == nil {
568 currDS.Status.CollisionCount = new(int32)
569 }
570 *currDS.Status.CollisionCount++
571 _, updateErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).UpdateStatus(ctx, currDS, metav1.UpdateOptions{})
572 if updateErr != nil {
573 return nil, updateErr
574 }
575 logger.V(2).Info("Found a hash collision for DaemonSet - bumping collisionCount to resolve it", "daemonset", klog.KObj(ds), "collisionCount", *currDS.Status.CollisionCount)
576 return nil, outerErr
577 }
578 return history, err
579 }
580
581
582
583 func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, int, error) {
584 var desiredNumberScheduled int
585 logger := klog.FromContext(ctx)
586 for i := range nodeList {
587 node := nodeList[i]
588 wantToRun, _ := NodeShouldRunDaemonPod(node, ds)
589 if !wantToRun {
590 continue
591 }
592 desiredNumberScheduled++
593
594 if _, exists := nodeToDaemonPods[node.Name]; !exists {
595 nodeToDaemonPods[node.Name] = nil
596 }
597 }
598
599 maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled)
600 if err != nil {
601 return -1, -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
602 }
603
604 maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled)
605 if err != nil {
606 return -1, -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err)
607 }
608
609
610
611 if desiredNumberScheduled > 0 && maxUnavailable == 0 && maxSurge == 0 {
612 logger.Info("DaemonSet is not configured for surge or unavailability, defaulting to accepting unavailability", "daemonset", klog.KObj(ds))
613 maxUnavailable = 1
614 }
615 logger.V(5).Info("DaemonSet with maxSurge and maxUnavailable", "daemonset", klog.KObj(ds), "maxSurge", maxSurge, "maxUnavailable", maxUnavailable)
616 return maxSurge, maxUnavailable, desiredNumberScheduled, nil
617 }
618
619 type historiesByRevision []*apps.ControllerRevision
620
621 func (h historiesByRevision) Len() int { return len(h) }
622 func (h historiesByRevision) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
623 func (h historiesByRevision) Less(i, j int) bool {
624 return h[i].Revision < h[j].Revision
625 }
626
View as plain text