1
16
17 package config
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "sync"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/types"
28 "k8s.io/apimachinery/pkg/util/sets"
29 "k8s.io/client-go/tools/record"
30 "k8s.io/klog/v2"
31 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
32 "k8s.io/kubernetes/pkg/kubelet/events"
33 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
34 "k8s.io/kubernetes/pkg/kubelet/util/format"
35 )
36
37
38 type PodConfigNotificationMode int
39
40 const (
41
42
43 PodConfigNotificationUnknown PodConfigNotificationMode = iota
44
45
46 PodConfigNotificationSnapshot
47
48
49 PodConfigNotificationSnapshotAndUpdates
50
51 PodConfigNotificationIncremental
52 )
53
54 type podStartupSLIObserver interface {
55 ObservedPodOnWatch(pod *v1.Pod, when time.Time)
56 }
57
58
59
60
61 type PodConfig struct {
62 pods *podStorage
63 mux *mux
64
65
66 updates chan kubetypes.PodUpdate
67
68
69 sourcesLock sync.Mutex
70 sources sets.String
71 }
72
73
74
75 func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig {
76 updates := make(chan kubetypes.PodUpdate, 50)
77 storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
78 podConfig := &PodConfig{
79 pods: storage,
80 mux: newMux(storage),
81 updates: updates,
82 sources: sets.String{},
83 }
84 return podConfig
85 }
86
87
88
89 func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
90 c.sourcesLock.Lock()
91 defer c.sourcesLock.Unlock()
92 c.sources.Insert(source)
93 return c.mux.ChannelWithContext(ctx, source)
94 }
95
96
97
98 func (c *PodConfig) SeenAllSources(seenSources sets.String) bool {
99 if c.pods == nil {
100 return false
101 }
102 c.sourcesLock.Lock()
103 defer c.sourcesLock.Unlock()
104 klog.V(5).InfoS("Looking for sources, have seen", "sources", c.sources.List(), "seenSources", seenSources)
105 return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...)
106 }
107
108
109 func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
110 return c.updates
111 }
112
113
114 func (c *PodConfig) Sync() {
115 c.pods.sync()
116 }
117
118
119
120
121
122 type podStorage struct {
123 podLock sync.RWMutex
124
125 pods map[string]map[types.UID]*v1.Pod
126 mode PodConfigNotificationMode
127
128
129
130 updateLock sync.Mutex
131 updates chan<- kubetypes.PodUpdate
132
133
134 sourcesSeenLock sync.RWMutex
135 sourcesSeen sets.String
136
137
138 recorder record.EventRecorder
139
140 startupSLIObserver podStartupSLIObserver
141 }
142
143
144
145
146 func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage {
147 return &podStorage{
148 pods: make(map[string]map[types.UID]*v1.Pod),
149 mode: mode,
150 updates: updates,
151 sourcesSeen: sets.String{},
152 recorder: recorder,
153 startupSLIObserver: startupSLIObserver,
154 }
155 }
156
157
158
159
160 func (s *podStorage) Merge(source string, change interface{}) error {
161 s.updateLock.Lock()
162 defer s.updateLock.Unlock()
163
164 seenBefore := s.sourcesSeen.Has(source)
165 adds, updates, deletes, removes, reconciles := s.merge(source, change)
166 firstSet := !seenBefore && s.sourcesSeen.Has(source)
167
168
169 switch s.mode {
170 case PodConfigNotificationIncremental:
171 if len(removes.Pods) > 0 {
172 s.updates <- *removes
173 }
174 if len(adds.Pods) > 0 {
175 s.updates <- *adds
176 }
177 if len(updates.Pods) > 0 {
178 s.updates <- *updates
179 }
180 if len(deletes.Pods) > 0 {
181 s.updates <- *deletes
182 }
183 if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
184
185
186
187 s.updates <- *adds
188 }
189
190 if len(reconciles.Pods) > 0 {
191 s.updates <- *reconciles
192 }
193
194 case PodConfigNotificationSnapshotAndUpdates:
195 if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
196 s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
197 }
198 if len(updates.Pods) > 0 {
199 s.updates <- *updates
200 }
201 if len(deletes.Pods) > 0 {
202 s.updates <- *deletes
203 }
204
205 case PodConfigNotificationSnapshot:
206 if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
207 s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
208 }
209
210 case PodConfigNotificationUnknown:
211 fallthrough
212 default:
213 panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
214 }
215
216 return nil
217 }
218
219 func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
220 s.podLock.Lock()
221 defer s.podLock.Unlock()
222
223 addPods := []*v1.Pod{}
224 updatePods := []*v1.Pod{}
225 deletePods := []*v1.Pod{}
226 removePods := []*v1.Pod{}
227 reconcilePods := []*v1.Pod{}
228
229 pods := s.pods[source]
230 if pods == nil {
231 pods = make(map[types.UID]*v1.Pod)
232 }
233
234
235
236
237 updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
238 filtered := filterInvalidPods(newPods, source, s.recorder)
239 for _, ref := range filtered {
240
241 if ref.Annotations == nil {
242 ref.Annotations = make(map[string]string)
243 }
244 ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
245
246 if !kubetypes.IsStaticPod(ref) {
247 s.startupSLIObserver.ObservedPodOnWatch(ref, time.Now())
248 }
249 if existing, found := oldPods[ref.UID]; found {
250 pods[ref.UID] = existing
251 needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
252 if needUpdate {
253 updatePods = append(updatePods, existing)
254 } else if needReconcile {
255 reconcilePods = append(reconcilePods, existing)
256 } else if needGracefulDelete {
257 deletePods = append(deletePods, existing)
258 }
259 continue
260 }
261 recordFirstSeenTime(ref)
262 pods[ref.UID] = ref
263 addPods = append(addPods, ref)
264 }
265 }
266
267 update := change.(kubetypes.PodUpdate)
268 switch update.Op {
269 case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
270 if update.Op == kubetypes.ADD {
271 klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
272 } else if update.Op == kubetypes.DELETE {
273 klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
274 } else {
275 klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
276 }
277 updatePodsFunc(update.Pods, pods, pods)
278
279 case kubetypes.REMOVE:
280 klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
281 for _, value := range update.Pods {
282 if existing, found := pods[value.UID]; found {
283
284 delete(pods, value.UID)
285 removePods = append(removePods, existing)
286 continue
287 }
288
289 }
290
291 case kubetypes.SET:
292 klog.V(4).InfoS("Setting pods for source", "source", source)
293 s.markSourceSet(source)
294
295 oldPods := pods
296 pods = make(map[types.UID]*v1.Pod)
297 updatePodsFunc(update.Pods, oldPods, pods)
298 for uid, existing := range oldPods {
299 if _, found := pods[uid]; !found {
300
301 removePods = append(removePods, existing)
302 }
303 }
304
305 default:
306 klog.InfoS("Received invalid update type", "type", update)
307
308 }
309
310 s.pods[source] = pods
311
312 adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
313 updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
314 deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
315 removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
316 reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
317
318 return adds, updates, deletes, removes, reconciles
319 }
320
321 func (s *podStorage) markSourceSet(source string) {
322 s.sourcesSeenLock.Lock()
323 defer s.sourcesSeenLock.Unlock()
324 s.sourcesSeen.Insert(source)
325 }
326
327 func (s *podStorage) seenSources(sources ...string) bool {
328 s.sourcesSeenLock.RLock()
329 defer s.sourcesSeenLock.RUnlock()
330 return s.sourcesSeen.HasAll(sources...)
331 }
332
333 func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) {
334 names := sets.String{}
335 for i, pod := range pods {
336
337
338 name := kubecontainer.GetPodFullName(pod)
339 if names.Has(name) {
340 klog.InfoS("Pod failed validation due to duplicate pod name, ignoring", "index", i, "pod", klog.KObj(pod), "source", source)
341 recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s due to duplicate pod name %q, ignoring", format.Pod(pod), source, pod.Name)
342 continue
343 } else {
344 names.Insert(name)
345 }
346
347 filtered = append(filtered, pod)
348 }
349 return
350 }
351
352
353 var localAnnotations = []string{
354 kubetypes.ConfigSourceAnnotationKey,
355 kubetypes.ConfigMirrorAnnotationKey,
356 kubetypes.ConfigFirstSeenAnnotationKey,
357 }
358
359 func isLocalAnnotationKey(key string) bool {
360 for _, localKey := range localAnnotations {
361 if key == localKey {
362 return true
363 }
364 }
365 return false
366 }
367
368
369
370 func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool {
371 if candidateMap == nil {
372 candidateMap = make(map[string]string)
373 }
374 for k, v := range candidateMap {
375 if isLocalAnnotationKey(k) {
376 continue
377 }
378 if existingValue, ok := existingMap[k]; ok && existingValue == v {
379 continue
380 }
381 return false
382 }
383 for k := range existingMap {
384 if isLocalAnnotationKey(k) {
385 continue
386 }
387
388 if _, exists := candidateMap[k]; !exists {
389 return false
390 }
391 }
392 return true
393 }
394
395
396 func recordFirstSeenTime(pod *v1.Pod) {
397 klog.V(4).InfoS("Receiving a new pod", "pod", klog.KObj(pod))
398 pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = kubetypes.NewTimestamp().GetString()
399 }
400
401
402
403 func updateAnnotations(existing, ref *v1.Pod) {
404 annotations := make(map[string]string, len(ref.Annotations)+len(localAnnotations))
405 for k, v := range ref.Annotations {
406 annotations[k] = v
407 }
408 for _, k := range localAnnotations {
409 if v, ok := existing.Annotations[k]; ok {
410 annotations[k] = v
411 }
412 }
413 existing.Annotations = annotations
414 }
415
416 func podsDifferSemantically(existing, ref *v1.Pod) bool {
417 if reflect.DeepEqual(existing.Spec, ref.Spec) &&
418 reflect.DeepEqual(existing.Labels, ref.Labels) &&
419 reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
420 reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) &&
421 isAnnotationMapEqual(existing.Annotations, ref.Annotations) {
422 return false
423 }
424 return true
425 }
426
427
428
429
430
431
432
433 func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
434
435
436
437
438 if !podsDifferSemantically(existing, ref) {
439
440
441
442 if !reflect.DeepEqual(existing.Status, ref.Status) {
443
444
445 existing.Status = ref.Status
446 needReconcile = true
447 }
448 return
449 }
450
451
452
453 ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]
454
455 existing.Spec = ref.Spec
456 existing.Labels = ref.Labels
457 existing.DeletionTimestamp = ref.DeletionTimestamp
458 existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
459 existing.Status = ref.Status
460 updateAnnotations(existing, ref)
461
462
463 if ref.DeletionTimestamp != nil {
464 needGracefulDelete = true
465 } else {
466
467 needUpdate = true
468 }
469
470 return
471 }
472
473
474 func (s *podStorage) sync() {
475 s.updateLock.Lock()
476 defer s.updateLock.Unlock()
477 s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
478 }
479
480 func (s *podStorage) mergedState() interface{} {
481 s.podLock.RLock()
482 defer s.podLock.RUnlock()
483 pods := make([]*v1.Pod, 0)
484 for _, sourcePods := range s.pods {
485 for _, podRef := range sourcePods {
486 pods = append(pods, podRef.DeepCopy())
487 }
488 }
489 return pods
490 }
491
492 func copyPods(sourcePods []*v1.Pod) []*v1.Pod {
493 pods := []*v1.Pod{}
494 for _, source := range sourcePods {
495
496 pods = append(pods, source.DeepCopy())
497 }
498 return pods
499 }
500
View as plain text