...

Source file src/k8s.io/kubernetes/pkg/kubelet/config/config.go

Documentation: k8s.io/kubernetes/pkg/kubelet/config

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package config
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"sync"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	"k8s.io/client-go/tools/record"
    30  	"k8s.io/klog/v2"
    31  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    32  	"k8s.io/kubernetes/pkg/kubelet/events"
    33  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    34  	"k8s.io/kubernetes/pkg/kubelet/util/format"
    35  )
    36  
    37  // PodConfigNotificationMode describes how changes are sent to the update channel.
    38  type PodConfigNotificationMode int
    39  
    40  const (
    41  	// PodConfigNotificationUnknown is the default value for
    42  	// PodConfigNotificationMode when uninitialized.
    43  	PodConfigNotificationUnknown PodConfigNotificationMode = iota
    44  	// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
    45  	// any change occurs.
    46  	PodConfigNotificationSnapshot
    47  	// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
    48  	// changed, and a SET message if there are any additions or removals.
    49  	PodConfigNotificationSnapshotAndUpdates
    50  	// PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
    51  	PodConfigNotificationIncremental
    52  )
    53  
    54  type podStartupSLIObserver interface {
    55  	ObservedPodOnWatch(pod *v1.Pod, when time.Time)
    56  }
    57  
    58  // PodConfig is a configuration mux that merges many sources of pod configuration into a single
    59  // consistent structure, and then delivers incremental change notifications to listeners
    60  // in order.
    61  type PodConfig struct {
    62  	pods *podStorage
    63  	mux  *mux
    64  
    65  	// the channel of denormalized changes passed to listeners
    66  	updates chan kubetypes.PodUpdate
    67  
    68  	// contains the list of all configured sources
    69  	sourcesLock sync.Mutex
    70  	sources     sets.String
    71  }
    72  
    73  // NewPodConfig creates an object that can merge many configuration sources into a stream
    74  // of normalized updates to a pod configuration.
    75  func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig {
    76  	updates := make(chan kubetypes.PodUpdate, 50)
    77  	storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
    78  	podConfig := &PodConfig{
    79  		pods:    storage,
    80  		mux:     newMux(storage),
    81  		updates: updates,
    82  		sources: sets.String{},
    83  	}
    84  	return podConfig
    85  }
    86  
    87  // Channel creates or returns a config source channel.  The channel
    88  // only accepts PodUpdates
    89  func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
    90  	c.sourcesLock.Lock()
    91  	defer c.sourcesLock.Unlock()
    92  	c.sources.Insert(source)
    93  	return c.mux.ChannelWithContext(ctx, source)
    94  }
    95  
    96  // SeenAllSources returns true if seenSources contains all sources in the
    97  // config, and also this config has received a SET message from each source.
    98  func (c *PodConfig) SeenAllSources(seenSources sets.String) bool {
    99  	if c.pods == nil {
   100  		return false
   101  	}
   102  	c.sourcesLock.Lock()
   103  	defer c.sourcesLock.Unlock()
   104  	klog.V(5).InfoS("Looking for sources, have seen", "sources", c.sources.List(), "seenSources", seenSources)
   105  	return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...)
   106  }
   107  
   108  // Updates returns a channel of updates to the configuration, properly denormalized.
   109  func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
   110  	return c.updates
   111  }
   112  
   113  // Sync requests the full configuration be delivered to the update channel.
   114  func (c *PodConfig) Sync() {
   115  	c.pods.sync()
   116  }
   117  
   118  // podStorage manages the current pod state at any point in time and ensures updates
   119  // to the channel are delivered in order.  Note that this object is an in-memory source of
   120  // "truth" and on creation contains zero entries.  Once all previously read sources are
   121  // available, then this object should be considered authoritative.
   122  type podStorage struct {
   123  	podLock sync.RWMutex
   124  	// map of source name to pod uid to pod reference
   125  	pods map[string]map[types.UID]*v1.Pod
   126  	mode PodConfigNotificationMode
   127  
   128  	// ensures that updates are delivered in strict order
   129  	// on the updates channel
   130  	updateLock sync.Mutex
   131  	updates    chan<- kubetypes.PodUpdate
   132  
   133  	// contains the set of all sources that have sent at least one SET
   134  	sourcesSeenLock sync.RWMutex
   135  	sourcesSeen     sets.String
   136  
   137  	// the EventRecorder to use
   138  	recorder record.EventRecorder
   139  
   140  	startupSLIObserver podStartupSLIObserver
   141  }
   142  
   143  // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel
   144  // in the future, especially with multiple listeners.
   145  // TODO: allow initialization of the current state of the store with snapshotted version.
   146  func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage {
   147  	return &podStorage{
   148  		pods:               make(map[string]map[types.UID]*v1.Pod),
   149  		mode:               mode,
   150  		updates:            updates,
   151  		sourcesSeen:        sets.String{},
   152  		recorder:           recorder,
   153  		startupSLIObserver: startupSLIObserver,
   154  	}
   155  }
   156  
   157  // Merge normalizes a set of incoming changes from different sources into a map of all Pods
   158  // and ensures that redundant changes are filtered out, and then pushes zero or more minimal
   159  // updates onto the update channel.  Ensures that updates are delivered in order.
   160  func (s *podStorage) Merge(source string, change interface{}) error {
   161  	s.updateLock.Lock()
   162  	defer s.updateLock.Unlock()
   163  
   164  	seenBefore := s.sourcesSeen.Has(source)
   165  	adds, updates, deletes, removes, reconciles := s.merge(source, change)
   166  	firstSet := !seenBefore && s.sourcesSeen.Has(source)
   167  
   168  	// deliver update notifications
   169  	switch s.mode {
   170  	case PodConfigNotificationIncremental:
   171  		if len(removes.Pods) > 0 {
   172  			s.updates <- *removes
   173  		}
   174  		if len(adds.Pods) > 0 {
   175  			s.updates <- *adds
   176  		}
   177  		if len(updates.Pods) > 0 {
   178  			s.updates <- *updates
   179  		}
   180  		if len(deletes.Pods) > 0 {
   181  			s.updates <- *deletes
   182  		}
   183  		if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
   184  			// Send an empty update when first seeing the source and there are
   185  			// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
   186  			// the source is ready.
   187  			s.updates <- *adds
   188  		}
   189  		// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
   190  		if len(reconciles.Pods) > 0 {
   191  			s.updates <- *reconciles
   192  		}
   193  
   194  	case PodConfigNotificationSnapshotAndUpdates:
   195  		if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
   196  			s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
   197  		}
   198  		if len(updates.Pods) > 0 {
   199  			s.updates <- *updates
   200  		}
   201  		if len(deletes.Pods) > 0 {
   202  			s.updates <- *deletes
   203  		}
   204  
   205  	case PodConfigNotificationSnapshot:
   206  		if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
   207  			s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
   208  		}
   209  
   210  	case PodConfigNotificationUnknown:
   211  		fallthrough
   212  	default:
   213  		panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
   214  	}
   215  
   216  	return nil
   217  }
   218  
   219  func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
   220  	s.podLock.Lock()
   221  	defer s.podLock.Unlock()
   222  
   223  	addPods := []*v1.Pod{}
   224  	updatePods := []*v1.Pod{}
   225  	deletePods := []*v1.Pod{}
   226  	removePods := []*v1.Pod{}
   227  	reconcilePods := []*v1.Pod{}
   228  
   229  	pods := s.pods[source]
   230  	if pods == nil {
   231  		pods = make(map[types.UID]*v1.Pod)
   232  	}
   233  
   234  	// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
   235  	// After updated, new pod will be stored in the pod cache *pods*.
   236  	// Notice that *pods* and *oldPods* could be the same cache.
   237  	updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
   238  		filtered := filterInvalidPods(newPods, source, s.recorder)
   239  		for _, ref := range filtered {
   240  			// Annotate the pod with the source before any comparison.
   241  			if ref.Annotations == nil {
   242  				ref.Annotations = make(map[string]string)
   243  			}
   244  			ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
   245  			// ignore static pods
   246  			if !kubetypes.IsStaticPod(ref) {
   247  				s.startupSLIObserver.ObservedPodOnWatch(ref, time.Now())
   248  			}
   249  			if existing, found := oldPods[ref.UID]; found {
   250  				pods[ref.UID] = existing
   251  				needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
   252  				if needUpdate {
   253  					updatePods = append(updatePods, existing)
   254  				} else if needReconcile {
   255  					reconcilePods = append(reconcilePods, existing)
   256  				} else if needGracefulDelete {
   257  					deletePods = append(deletePods, existing)
   258  				}
   259  				continue
   260  			}
   261  			recordFirstSeenTime(ref)
   262  			pods[ref.UID] = ref
   263  			addPods = append(addPods, ref)
   264  		}
   265  	}
   266  
   267  	update := change.(kubetypes.PodUpdate)
   268  	switch update.Op {
   269  	case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
   270  		if update.Op == kubetypes.ADD {
   271  			klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
   272  		} else if update.Op == kubetypes.DELETE {
   273  			klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
   274  		} else {
   275  			klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
   276  		}
   277  		updatePodsFunc(update.Pods, pods, pods)
   278  
   279  	case kubetypes.REMOVE:
   280  		klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
   281  		for _, value := range update.Pods {
   282  			if existing, found := pods[value.UID]; found {
   283  				// this is a delete
   284  				delete(pods, value.UID)
   285  				removePods = append(removePods, existing)
   286  				continue
   287  			}
   288  			// this is a no-op
   289  		}
   290  
   291  	case kubetypes.SET:
   292  		klog.V(4).InfoS("Setting pods for source", "source", source)
   293  		s.markSourceSet(source)
   294  		// Clear the old map entries by just creating a new map
   295  		oldPods := pods
   296  		pods = make(map[types.UID]*v1.Pod)
   297  		updatePodsFunc(update.Pods, oldPods, pods)
   298  		for uid, existing := range oldPods {
   299  			if _, found := pods[uid]; !found {
   300  				// this is a delete
   301  				removePods = append(removePods, existing)
   302  			}
   303  		}
   304  
   305  	default:
   306  		klog.InfoS("Received invalid update type", "type", update)
   307  
   308  	}
   309  
   310  	s.pods[source] = pods
   311  
   312  	adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
   313  	updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
   314  	deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
   315  	removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
   316  	reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
   317  
   318  	return adds, updates, deletes, removes, reconciles
   319  }
   320  
   321  func (s *podStorage) markSourceSet(source string) {
   322  	s.sourcesSeenLock.Lock()
   323  	defer s.sourcesSeenLock.Unlock()
   324  	s.sourcesSeen.Insert(source)
   325  }
   326  
   327  func (s *podStorage) seenSources(sources ...string) bool {
   328  	s.sourcesSeenLock.RLock()
   329  	defer s.sourcesSeenLock.RUnlock()
   330  	return s.sourcesSeen.HasAll(sources...)
   331  }
   332  
   333  func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) {
   334  	names := sets.String{}
   335  	for i, pod := range pods {
   336  		// Pods from each source are assumed to have passed validation individually.
   337  		// This function only checks if there is any naming conflict.
   338  		name := kubecontainer.GetPodFullName(pod)
   339  		if names.Has(name) {
   340  			klog.InfoS("Pod failed validation due to duplicate pod name, ignoring", "index", i, "pod", klog.KObj(pod), "source", source)
   341  			recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s due to duplicate pod name %q, ignoring", format.Pod(pod), source, pod.Name)
   342  			continue
   343  		} else {
   344  			names.Insert(name)
   345  		}
   346  
   347  		filtered = append(filtered, pod)
   348  	}
   349  	return
   350  }
   351  
   352  // Annotations that the kubelet adds to the pod.
   353  var localAnnotations = []string{
   354  	kubetypes.ConfigSourceAnnotationKey,
   355  	kubetypes.ConfigMirrorAnnotationKey,
   356  	kubetypes.ConfigFirstSeenAnnotationKey,
   357  }
   358  
   359  func isLocalAnnotationKey(key string) bool {
   360  	for _, localKey := range localAnnotations {
   361  		if key == localKey {
   362  			return true
   363  		}
   364  	}
   365  	return false
   366  }
   367  
   368  // isAnnotationMapEqual returns true if the existing annotation Map is equal to candidate except
   369  // for local annotations.
   370  func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool {
   371  	if candidateMap == nil {
   372  		candidateMap = make(map[string]string)
   373  	}
   374  	for k, v := range candidateMap {
   375  		if isLocalAnnotationKey(k) {
   376  			continue
   377  		}
   378  		if existingValue, ok := existingMap[k]; ok && existingValue == v {
   379  			continue
   380  		}
   381  		return false
   382  	}
   383  	for k := range existingMap {
   384  		if isLocalAnnotationKey(k) {
   385  			continue
   386  		}
   387  		// stale entry in existing map.
   388  		if _, exists := candidateMap[k]; !exists {
   389  			return false
   390  		}
   391  	}
   392  	return true
   393  }
   394  
   395  // recordFirstSeenTime records the first seen time of this pod.
   396  func recordFirstSeenTime(pod *v1.Pod) {
   397  	klog.V(4).InfoS("Receiving a new pod", "pod", klog.KObj(pod))
   398  	pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = kubetypes.NewTimestamp().GetString()
   399  }
   400  
   401  // updateAnnotations returns an Annotation map containing the api annotation map plus
   402  // locally managed annotations
   403  func updateAnnotations(existing, ref *v1.Pod) {
   404  	annotations := make(map[string]string, len(ref.Annotations)+len(localAnnotations))
   405  	for k, v := range ref.Annotations {
   406  		annotations[k] = v
   407  	}
   408  	for _, k := range localAnnotations {
   409  		if v, ok := existing.Annotations[k]; ok {
   410  			annotations[k] = v
   411  		}
   412  	}
   413  	existing.Annotations = annotations
   414  }
   415  
   416  func podsDifferSemantically(existing, ref *v1.Pod) bool {
   417  	if reflect.DeepEqual(existing.Spec, ref.Spec) &&
   418  		reflect.DeepEqual(existing.Labels, ref.Labels) &&
   419  		reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
   420  		reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) &&
   421  		isAnnotationMapEqual(existing.Annotations, ref.Annotations) {
   422  		return false
   423  	}
   424  	return true
   425  }
   426  
   427  // checkAndUpdatePod updates existing, and:
   428  //   - if ref makes a meaningful change, returns needUpdate=true
   429  //   - if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
   430  //   - if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
   431  //   - else return all false
   432  //     Now, needUpdate, needGracefulDelete and needReconcile should never be both true
   433  func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
   434  
   435  	// 1. this is a reconcile
   436  	// TODO: it would be better to update the whole object and only preserve certain things
   437  	//       like the source annotation or the UID (to ensure safety)
   438  	if !podsDifferSemantically(existing, ref) {
   439  		// this is not an update
   440  		// Only check reconcile when it is not an update, because if the pod is going to
   441  		// be updated, an extra reconcile is unnecessary
   442  		if !reflect.DeepEqual(existing.Status, ref.Status) {
   443  			// Pod with changed pod status needs reconcile, because kubelet should
   444  			// be the source of truth of pod status.
   445  			existing.Status = ref.Status
   446  			needReconcile = true
   447  		}
   448  		return
   449  	}
   450  
   451  	// Overwrite the first-seen time with the existing one. This is our own
   452  	// internal annotation, there is no need to update.
   453  	ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]
   454  
   455  	existing.Spec = ref.Spec
   456  	existing.Labels = ref.Labels
   457  	existing.DeletionTimestamp = ref.DeletionTimestamp
   458  	existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
   459  	existing.Status = ref.Status
   460  	updateAnnotations(existing, ref)
   461  
   462  	// 2. this is an graceful delete
   463  	if ref.DeletionTimestamp != nil {
   464  		needGracefulDelete = true
   465  	} else {
   466  		// 3. this is an update
   467  		needUpdate = true
   468  	}
   469  
   470  	return
   471  }
   472  
   473  // sync sends a copy of the current state through the update channel.
   474  func (s *podStorage) sync() {
   475  	s.updateLock.Lock()
   476  	defer s.updateLock.Unlock()
   477  	s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
   478  }
   479  
   480  func (s *podStorage) mergedState() interface{} {
   481  	s.podLock.RLock()
   482  	defer s.podLock.RUnlock()
   483  	pods := make([]*v1.Pod, 0)
   484  	for _, sourcePods := range s.pods {
   485  		for _, podRef := range sourcePods {
   486  			pods = append(pods, podRef.DeepCopy())
   487  		}
   488  	}
   489  	return pods
   490  }
   491  
   492  func copyPods(sourcePods []*v1.Pod) []*v1.Pod {
   493  	pods := []*v1.Pod{}
   494  	for _, source := range sourcePods {
   495  		// Use a deep copy here just in case
   496  		pods = append(pods, source.DeepCopy())
   497  	}
   498  	return pods
   499  }
   500  

View as plain text