...

Source file src/k8s.io/kubernetes/pkg/controller/daemon/update.go

Documentation: k8s.io/kubernetes/pkg/controller/daemon

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package daemon
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"reflect"
    24  	"sort"
    25  
    26  	"k8s.io/klog/v2"
    27  
    28  	apps "k8s.io/api/apps/v1"
    29  	v1 "k8s.io/api/core/v1"
    30  	"k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/labels"
    33  	"k8s.io/apimachinery/pkg/runtime"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	"k8s.io/apimachinery/pkg/util/json"
    36  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    37  	"k8s.io/kubernetes/pkg/controller"
    38  	"k8s.io/kubernetes/pkg/controller/daemon/util"
    39  	labelsutil "k8s.io/kubernetes/pkg/util/labels"
    40  )
    41  
    42  // rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
    43  // remaining within the constraints imposed by the update strategy.
    44  func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    45  	logger := klog.FromContext(ctx)
    46  	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
    47  	if err != nil {
    48  		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    49  	}
    50  	maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
    51  	if err != nil {
    52  		return fmt.Errorf("couldn't get unavailable numbers: %v", err)
    53  	}
    54  
    55  	now := dsc.failedPodsBackoff.Clock.Now()
    56  
    57  	// When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any
    58  	// are necessary, and let the core loop create new instances on those nodes.
    59  	//
    60  	// Assumptions:
    61  	// * Expect manage loop to allow no more than one pod per node
    62  	// * Expect manage loop will create new pods
    63  	// * Expect manage loop will handle failed pods
    64  	// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
    65  	// Invariants:
    66  	// * The number of new pods that are unavailable must be less than maxUnavailable
    67  	// * A node with an available old pod is a candidate for deletion if it does not violate other invariants
    68  	//
    69  	if maxSurge == 0 {
    70  		var numUnavailable int
    71  		var allowedReplacementPods []string
    72  		var candidatePodsToDelete []string
    73  		for nodeName, pods := range nodeToDaemonPods {
    74  			newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
    75  			if !ok {
    76  				// let the manage loop clean up this node, and treat it as an unavailable node
    77  				logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
    78  				numUnavailable++
    79  				continue
    80  			}
    81  			switch {
    82  			case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
    83  				// the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
    84  				numUnavailable++
    85  			case newPod != nil:
    86  				// this pod is up to date, check its availability
    87  				if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    88  					// an unavailable new pod is counted against maxUnavailable
    89  					numUnavailable++
    90  				}
    91  			default:
    92  				// this pod is old, it is an update candidate
    93  				switch {
    94  				case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
    95  					// the old pod isn't available, so it needs to be replaced
    96  					logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
    97  					// record the replacement
    98  					if allowedReplacementPods == nil {
    99  						allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
   100  					}
   101  					allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
   102  				case numUnavailable >= maxUnavailable:
   103  					// no point considering any other candidates
   104  					continue
   105  				default:
   106  					logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
   107  					// record the candidate
   108  					if candidatePodsToDelete == nil {
   109  						candidatePodsToDelete = make([]string, 0, maxUnavailable)
   110  					}
   111  					candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
   112  				}
   113  			}
   114  		}
   115  
   116  		// use any of the candidates we can, including the allowedReplacemnntPods
   117  		logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete))
   118  		remainingUnavailable := maxUnavailable - numUnavailable
   119  		if remainingUnavailable < 0 {
   120  			remainingUnavailable = 0
   121  		}
   122  		if max := len(candidatePodsToDelete); remainingUnavailable > max {
   123  			remainingUnavailable = max
   124  		}
   125  		oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)
   126  
   127  		return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
   128  	}
   129  
   130  	// When surging, we create new pods whenever an old pod is unavailable, and we can create up
   131  	// to maxSurge extra pods
   132  	//
   133  	// Assumptions:
   134  	// * Expect manage loop to allow no more than two pods per node, one old, one new
   135  	// * Expect manage loop will create new pods if there are no pods on node
   136  	// * Expect manage loop will handle failed pods
   137  	// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
   138  	// Invariants:
   139  	// * A node with an unavailable old pod is a candidate for immediate new pod creation
   140  	// * An old available pod is deleted if a new pod is available
   141  	// * No more than maxSurge new pods are created for old available pods at any one time
   142  	//
   143  	var oldPodsToDelete []string          // these pods are already updated or unavailable on sunsetted node
   144  	var shouldNotRunPodsToDelete []string // candidate pods to be deleted on sunsetted nodes
   145  	var candidateNewNodes []string
   146  	var allowedNewNodes []string
   147  	var numSurge int
   148  	var numAvailable int
   149  
   150  	for nodeName, pods := range nodeToDaemonPods {
   151  		newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
   152  		if !ok {
   153  			// let the manage loop clean up this node, and treat it as a surge node
   154  			logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
   155  			numSurge++
   156  			continue
   157  		}
   158  
   159  		// first count availability for all the nodes (even the ones that we are sunsetting due to scheduling constraints)
   160  		if oldPod != nil {
   161  			if podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
   162  				numAvailable++
   163  			}
   164  		} else if newPod != nil {
   165  			if podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
   166  				numAvailable++
   167  			}
   168  		}
   169  
   170  		switch {
   171  		case oldPod == nil:
   172  			// we don't need to do anything to this node, the manage loop will handle it
   173  		case newPod == nil:
   174  			// this is a surge candidate
   175  			switch {
   176  			case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
   177  				node, err := dsc.nodeLister.Get(nodeName)
   178  				if err != nil {
   179  					return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
   180  				}
   181  				if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
   182  					logger.V(5).Info("DaemonSet pod on node is not available and does not match scheduling constraints, remove old pod", "daemonset", klog.KObj(ds), "node", nodeName, "oldPod", klog.KObj(oldPod))
   183  					oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
   184  					continue
   185  				}
   186  				// the old pod isn't available, allow it to become a replacement
   187  				logger.V(5).Info("Pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
   188  				// record the replacement
   189  				if allowedNewNodes == nil {
   190  					allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
   191  				}
   192  				allowedNewNodes = append(allowedNewNodes, nodeName)
   193  			default:
   194  				node, err := dsc.nodeLister.Get(nodeName)
   195  				if err != nil {
   196  					return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
   197  				}
   198  				if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
   199  					shouldNotRunPodsToDelete = append(shouldNotRunPodsToDelete, oldPod.Name)
   200  					continue
   201  				}
   202  				if numSurge >= maxSurge {
   203  					// no point considering any other candidates
   204  					continue
   205  				}
   206  				logger.V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
   207  				// record the candidate
   208  				if candidateNewNodes == nil {
   209  					candidateNewNodes = make([]string, 0, maxSurge)
   210  				}
   211  				candidateNewNodes = append(candidateNewNodes, nodeName)
   212  			}
   213  		default:
   214  			// we have already surged onto this node, determine our state
   215  			if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
   216  				// we're waiting to go available here
   217  				numSurge++
   218  				continue
   219  			}
   220  			// we're available, delete the old pod
   221  			logger.V(5).Info("DaemonSet pod on node is available, remove old pod", "daemonset", klog.KObj(ds), "newPod", klog.KObj(newPod), "node", nodeName, "oldPod", klog.KObj(oldPod))
   222  			oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
   223  		}
   224  	}
   225  
   226  	// use any of the candidates we can, including the allowedNewNodes
   227  	logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedNewNodes), "maxSurge", maxSurge, "numSurge", numSurge, "candidates", len(candidateNewNodes))
   228  	remainingSurge := maxSurge - numSurge
   229  
   230  	// With maxSurge, the application owner expects 100% availability.
   231  	// When the scheduling constraint change from node A to node B, we do not want the application to stay
   232  	// without any available pods. Only delete a pod on node A when a pod on node B becomes available.
   233  	if deletablePodsNumber := numAvailable - desiredNumberScheduled; deletablePodsNumber > 0 {
   234  		if shouldNotRunPodsToDeleteNumber := len(shouldNotRunPodsToDelete); deletablePodsNumber > shouldNotRunPodsToDeleteNumber {
   235  			deletablePodsNumber = shouldNotRunPodsToDeleteNumber
   236  		}
   237  		for _, podToDeleteName := range shouldNotRunPodsToDelete[:deletablePodsNumber] {
   238  			podToDelete, err := dsc.podLister.Pods(ds.Namespace).Get(podToDeleteName)
   239  			if err != nil {
   240  				if errors.IsNotFound(err) {
   241  					continue
   242  				}
   243  				return fmt.Errorf("couldn't get pod which should be deleted due to scheduling constraints %q: %v", podToDeleteName, err)
   244  			}
   245  			logger.V(5).Info("DaemonSet pod on node should be deleted due to scheduling constraints", "daemonset", klog.KObj(ds), "pod", klog.KObj(podToDelete), "node", podToDelete.Spec.NodeName)
   246  			oldPodsToDelete = append(oldPodsToDelete, podToDeleteName)
   247  		}
   248  	}
   249  
   250  	if remainingSurge < 0 {
   251  		remainingSurge = 0
   252  	}
   253  	if max := len(candidateNewNodes); remainingSurge > max {
   254  		remainingSurge = max
   255  	}
   256  	newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)
   257  
   258  	return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
   259  }
   260  
   261  // findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there
   262  // is at most one of each old and new pods, or false if there are multiples. We can skip
   263  // processing the particular node in those scenarios and let the manage loop prune the
   264  // excess pods for our next time around.
   265  func findUpdatedPodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) {
   266  	for _, pod := range podsOnNode {
   267  		if pod.DeletionTimestamp != nil {
   268  			continue
   269  		}
   270  		generation, err := util.GetTemplateGeneration(ds)
   271  		if err != nil {
   272  			generation = nil
   273  		}
   274  		if util.IsPodUpdated(pod, hash, generation) {
   275  			if newPod != nil {
   276  				return nil, nil, false
   277  			}
   278  			newPod = pod
   279  		} else {
   280  			if oldPod != nil {
   281  				return nil, nil, false
   282  			}
   283  			oldPod = pod
   284  		}
   285  	}
   286  	return newPod, oldPod, true
   287  }
   288  
   289  // constructHistory finds all histories controlled by the given DaemonSet, and
   290  // update current history revision number, or create current history if need to.
   291  // It also deduplicates current history, and adds missing unique labels to existing histories.
   292  func (dsc *DaemonSetsController) constructHistory(ctx context.Context, ds *apps.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) {
   293  	var histories []*apps.ControllerRevision
   294  	var currentHistories []*apps.ControllerRevision
   295  	histories, err = dsc.controlledHistories(ctx, ds)
   296  	if err != nil {
   297  		return nil, nil, err
   298  	}
   299  	for _, history := range histories {
   300  		// Add the unique label if it's not already added to the history
   301  		// We use history name instead of computing hash, so that we don't need to worry about hash collision
   302  		if _, ok := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; !ok {
   303  			toUpdate := history.DeepCopy()
   304  			toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name
   305  			history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{})
   306  			if err != nil {
   307  				return nil, nil, err
   308  			}
   309  		}
   310  		// Compare histories with ds to separate cur and old history
   311  		found := false
   312  		found, err = Match(ds, history)
   313  		if err != nil {
   314  			return nil, nil, err
   315  		}
   316  		if found {
   317  			currentHistories = append(currentHistories, history)
   318  		} else {
   319  			old = append(old, history)
   320  		}
   321  	}
   322  
   323  	currRevision := maxRevision(old) + 1
   324  	switch len(currentHistories) {
   325  	case 0:
   326  		// Create a new history if the current one isn't found
   327  		cur, err = dsc.snapshot(ctx, ds, currRevision)
   328  		if err != nil {
   329  			return nil, nil, err
   330  		}
   331  	default:
   332  		cur, err = dsc.dedupCurHistories(ctx, ds, currentHistories)
   333  		if err != nil {
   334  			return nil, nil, err
   335  		}
   336  		// Update revision number if necessary
   337  		if cur.Revision < currRevision {
   338  			toUpdate := cur.DeepCopy()
   339  			toUpdate.Revision = currRevision
   340  			_, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{})
   341  			if err != nil {
   342  				return nil, nil, err
   343  			}
   344  		}
   345  	}
   346  	return cur, old, err
   347  }
   348  
   349  func (dsc *DaemonSetsController) cleanupHistory(ctx context.Context, ds *apps.DaemonSet, old []*apps.ControllerRevision) error {
   350  	// Include deleted terminal pods when maintaining history.
   351  	nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, true)
   352  	if err != nil {
   353  		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
   354  	}
   355  
   356  	toKeep := int(*ds.Spec.RevisionHistoryLimit)
   357  	toKill := len(old) - toKeep
   358  	if toKill <= 0 {
   359  		return nil
   360  	}
   361  
   362  	// Find all hashes of live pods
   363  	liveHashes := make(map[string]bool)
   364  	for _, pods := range nodesToDaemonPods {
   365  		for _, pod := range pods {
   366  			if hash := pod.Labels[apps.DefaultDaemonSetUniqueLabelKey]; len(hash) > 0 {
   367  				liveHashes[hash] = true
   368  			}
   369  		}
   370  	}
   371  
   372  	// Clean up old history from smallest to highest revision (from oldest to newest)
   373  	sort.Sort(historiesByRevision(old))
   374  	for _, history := range old {
   375  		if toKill <= 0 {
   376  			break
   377  		}
   378  		if hash := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; liveHashes[hash] {
   379  			continue
   380  		}
   381  		// Clean up
   382  		err := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, history.Name, metav1.DeleteOptions{})
   383  		if err != nil {
   384  			return err
   385  		}
   386  		toKill--
   387  	}
   388  	return nil
   389  }
   390  
   391  // maxRevision returns the max revision number of the given list of histories
   392  func maxRevision(histories []*apps.ControllerRevision) int64 {
   393  	max := int64(0)
   394  	for _, history := range histories {
   395  		if history.Revision > max {
   396  			max = history.Revision
   397  		}
   398  	}
   399  	return max
   400  }
   401  
   402  func (dsc *DaemonSetsController) dedupCurHistories(ctx context.Context, ds *apps.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) {
   403  	if len(curHistories) == 1 {
   404  		return curHistories[0], nil
   405  	}
   406  	var maxRevision int64
   407  	var keepCur *apps.ControllerRevision
   408  	for _, cur := range curHistories {
   409  		if cur.Revision >= maxRevision {
   410  			keepCur = cur
   411  			maxRevision = cur.Revision
   412  		}
   413  	}
   414  	// Clean up duplicates and relabel pods
   415  	for _, cur := range curHistories {
   416  		if cur.Name == keepCur.Name {
   417  			continue
   418  		}
   419  		// Relabel pods before dedup
   420  		pods, err := dsc.getDaemonPods(ctx, ds)
   421  		if err != nil {
   422  			return nil, err
   423  		}
   424  		for _, pod := range pods {
   425  			if pod.Labels[apps.DefaultDaemonSetUniqueLabelKey] != keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey] {
   426  				patchRaw := map[string]interface{}{
   427  					"metadata": map[string]interface{}{
   428  						"labels": map[string]interface{}{
   429  							apps.DefaultDaemonSetUniqueLabelKey: keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey],
   430  						},
   431  					},
   432  				}
   433  				patchJson, err := json.Marshal(patchRaw)
   434  				if err != nil {
   435  					return nil, err
   436  				}
   437  				_, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Patch(ctx, pod.Name, types.MergePatchType, patchJson, metav1.PatchOptions{})
   438  				if err != nil {
   439  					return nil, err
   440  				}
   441  			}
   442  		}
   443  		// Remove duplicates
   444  		err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, cur.Name, metav1.DeleteOptions{})
   445  		if err != nil {
   446  			return nil, err
   447  		}
   448  	}
   449  	return keepCur, nil
   450  }
   451  
   452  // controlledHistories returns all ControllerRevisions controlled by the given DaemonSet.
   453  // This also reconciles ControllerRef by adopting/orphaning.
   454  // Note that returned histories are pointers to objects in the cache.
   455  // If you want to modify one, you need to deep-copy it first.
   456  func (dsc *DaemonSetsController) controlledHistories(ctx context.Context, ds *apps.DaemonSet) ([]*apps.ControllerRevision, error) {
   457  	selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
   458  	if err != nil {
   459  		return nil, err
   460  	}
   461  
   462  	// List all histories to include those that don't match the selector anymore
   463  	// but have a ControllerRef pointing to the controller.
   464  	histories, err := dsc.historyLister.ControllerRevisions(ds.Namespace).List(labels.Everything())
   465  	if err != nil {
   466  		return nil, err
   467  	}
   468  	// If any adoptions are attempted, we should first recheck for deletion with
   469  	// an uncached quorum read sometime after listing Pods (see #42639).
   470  	canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
   471  		fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{})
   472  		if err != nil {
   473  			return nil, err
   474  		}
   475  		if fresh.UID != ds.UID {
   476  			return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
   477  		}
   478  		return fresh, nil
   479  	})
   480  	// Use ControllerRefManager to adopt/orphan as needed.
   481  	cm := controller.NewControllerRevisionControllerRefManager(dsc.crControl, ds, selector, controllerKind, canAdoptFunc)
   482  	return cm.ClaimControllerRevisions(ctx, histories)
   483  }
   484  
   485  // Match check if the given DaemonSet's template matches the template stored in the given history.
   486  func Match(ds *apps.DaemonSet, history *apps.ControllerRevision) (bool, error) {
   487  	patch, err := getPatch(ds)
   488  	if err != nil {
   489  		return false, err
   490  	}
   491  	return bytes.Equal(patch, history.Data.Raw), nil
   492  }
   493  
   494  // getPatch returns a strategic merge patch that can be applied to restore a Daemonset to a
   495  // previous version. If the returned error is nil the patch is valid. The current state that we save is just the
   496  // PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
   497  // recorded patches.
   498  func getPatch(ds *apps.DaemonSet) ([]byte, error) {
   499  	dsBytes, err := json.Marshal(ds)
   500  	if err != nil {
   501  		return nil, err
   502  	}
   503  	var raw map[string]interface{}
   504  	err = json.Unmarshal(dsBytes, &raw)
   505  	if err != nil {
   506  		return nil, err
   507  	}
   508  	objCopy := make(map[string]interface{})
   509  	specCopy := make(map[string]interface{})
   510  
   511  	// Create a patch of the DaemonSet that replaces spec.template
   512  	spec := raw["spec"].(map[string]interface{})
   513  	template := spec["template"].(map[string]interface{})
   514  	specCopy["template"] = template
   515  	template["$patch"] = "replace"
   516  	objCopy["spec"] = specCopy
   517  	patch, err := json.Marshal(objCopy)
   518  	return patch, err
   519  }
   520  
   521  func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSet, revision int64) (*apps.ControllerRevision, error) {
   522  	patch, err := getPatch(ds)
   523  	if err != nil {
   524  		return nil, err
   525  	}
   526  	hash := controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)
   527  	name := ds.Name + "-" + hash
   528  	history := &apps.ControllerRevision{
   529  		ObjectMeta: metav1.ObjectMeta{
   530  			Name:            name,
   531  			Namespace:       ds.Namespace,
   532  			Labels:          labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, hash),
   533  			Annotations:     ds.Annotations,
   534  			OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, controllerKind)},
   535  		},
   536  		Data:     runtime.RawExtension{Raw: patch},
   537  		Revision: revision,
   538  	}
   539  
   540  	history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(ctx, history, metav1.CreateOptions{})
   541  	if outerErr := err; errors.IsAlreadyExists(outerErr) {
   542  		logger := klog.FromContext(ctx)
   543  		// TODO: Is it okay to get from historyLister?
   544  		existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(ctx, name, metav1.GetOptions{})
   545  		if getErr != nil {
   546  			return nil, getErr
   547  		}
   548  		// Check if we already created it
   549  		done, matchErr := Match(ds, existedHistory)
   550  		if matchErr != nil {
   551  			return nil, matchErr
   552  		}
   553  		if done {
   554  			return existedHistory, nil
   555  		}
   556  
   557  		// Handle name collisions between different history
   558  		// Get the latest DaemonSet from the API server to make sure collision count is only increased when necessary
   559  		currDS, getErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{})
   560  		if getErr != nil {
   561  			return nil, getErr
   562  		}
   563  		// If the collision count used to compute hash was in fact stale, there's no need to bump collision count; retry again
   564  		if !reflect.DeepEqual(currDS.Status.CollisionCount, ds.Status.CollisionCount) {
   565  			return nil, fmt.Errorf("found a stale collision count (%d, expected %d) of DaemonSet %q while processing; will retry until it is updated", ds.Status.CollisionCount, currDS.Status.CollisionCount, ds.Name)
   566  		}
   567  		if currDS.Status.CollisionCount == nil {
   568  			currDS.Status.CollisionCount = new(int32)
   569  		}
   570  		*currDS.Status.CollisionCount++
   571  		_, updateErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).UpdateStatus(ctx, currDS, metav1.UpdateOptions{})
   572  		if updateErr != nil {
   573  			return nil, updateErr
   574  		}
   575  		logger.V(2).Info("Found a hash collision for DaemonSet - bumping collisionCount to resolve it", "daemonset", klog.KObj(ds), "collisionCount", *currDS.Status.CollisionCount)
   576  		return nil, outerErr
   577  	}
   578  	return history, err
   579  }
   580  
   581  // updatedDesiredNodeCounts calculates the true number of allowed surge, unavailable or desired scheduled pods and
   582  // updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled.
   583  func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, int, error) {
   584  	var desiredNumberScheduled int
   585  	logger := klog.FromContext(ctx)
   586  	for i := range nodeList {
   587  		node := nodeList[i]
   588  		wantToRun, _ := NodeShouldRunDaemonPod(node, ds)
   589  		if !wantToRun {
   590  			continue
   591  		}
   592  		desiredNumberScheduled++
   593  
   594  		if _, exists := nodeToDaemonPods[node.Name]; !exists {
   595  			nodeToDaemonPods[node.Name] = nil
   596  		}
   597  	}
   598  
   599  	maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled)
   600  	if err != nil {
   601  		return -1, -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
   602  	}
   603  
   604  	maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled)
   605  	if err != nil {
   606  		return -1, -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err)
   607  	}
   608  
   609  	// if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the
   610  	// event the apiserver returns 0 for both surge and unavailability)
   611  	if desiredNumberScheduled > 0 && maxUnavailable == 0 && maxSurge == 0 {
   612  		logger.Info("DaemonSet is not configured for surge or unavailability, defaulting to accepting unavailability", "daemonset", klog.KObj(ds))
   613  		maxUnavailable = 1
   614  	}
   615  	logger.V(5).Info("DaemonSet with maxSurge and maxUnavailable", "daemonset", klog.KObj(ds), "maxSurge", maxSurge, "maxUnavailable", maxUnavailable)
   616  	return maxSurge, maxUnavailable, desiredNumberScheduled, nil
   617  }
   618  
   619  type historiesByRevision []*apps.ControllerRevision
   620  
   621  func (h historiesByRevision) Len() int      { return len(h) }
   622  func (h historiesByRevision) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
   623  func (h historiesByRevision) Less(i, j int) bool {
   624  	return h[i].Revision < h[j].Revision
   625  }
   626  

View as plain text