...
1
16
17 package job
18
19 import (
20 "fmt"
21 "sync"
22
23 batch "k8s.io/api/batch/v1"
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/util/sets"
26 "k8s.io/client-go/tools/cache"
27 "k8s.io/klog/v2"
28 "k8s.io/kubernetes/pkg/controller/job/metrics"
29 )
30
31
32 var uidSetKeyFunc = func(obj interface{}) (string, error) {
33 if u, ok := obj.(*uidSet); ok {
34 return u.key, nil
35 }
36 return "", fmt.Errorf("could not find key for obj %#v", obj)
37 }
38
39
40
41 type uidSet struct {
42 sync.RWMutex
43 set sets.Set[string]
44 key string
45 }
46
47
48
49 type uidTrackingExpectations struct {
50 store cache.Store
51 }
52
53
54
55
56 func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet {
57 if obj, exists, err := u.store.GetByKey(controllerKey); err == nil && exists {
58 return obj.(*uidSet)
59 }
60 return nil
61 }
62
63 func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[string] {
64 uids := u.getSet(controllerKey)
65 if uids == nil {
66 return nil
67 }
68 uids.RLock()
69 set := uids.set.Clone()
70 uids.RUnlock()
71 return set
72 }
73
74
75
76
77 func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []string) error {
78 logger.V(4).Info("Expecting tracking finalizers removed", "key", jobKey, "podUIDs", deletedKeys)
79
80 uids := u.getSet(jobKey)
81 if uids == nil {
82 uids = &uidSet{
83 key: jobKey,
84 set: sets.New[string](),
85 }
86 if err := u.store.Add(uids); err != nil {
87 return err
88 }
89 }
90 uids.Lock()
91 uids.set.Insert(deletedKeys...)
92 uids.Unlock()
93 return nil
94 }
95
96
97 func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey, deleteKey string) {
98 uids := u.getSet(jobKey)
99 if uids != nil {
100 uids.Lock()
101 if uids.set.Has(deleteKey) {
102 logger.V(4).Info("Observed tracking finalizer removed", "key", jobKey, "podUID", deleteKey)
103 uids.set.Delete(deleteKey)
104 }
105 uids.Unlock()
106 }
107 }
108
109
110 func (u *uidTrackingExpectations) deleteExpectations(logger klog.Logger, jobKey string) {
111 set := u.getSet(jobKey)
112 if set != nil {
113 if err := u.store.Delete(set); err != nil {
114 logger.Error(err, "Could not delete tracking annotation UID expectations", "key", jobKey)
115 }
116 }
117 }
118
119
120
121 func newUIDTrackingExpectations() *uidTrackingExpectations {
122 return &uidTrackingExpectations{store: cache.NewStore(uidSetKeyFunc)}
123 }
124
125 func hasJobTrackingFinalizer(pod *v1.Pod) bool {
126 for _, fin := range pod.Finalizers {
127 if fin == batch.JobTrackingFinalizer {
128 return true
129 }
130 }
131 return false
132 }
133
134 func recordFinishedPodWithTrackingFinalizer(oldPod, newPod *v1.Pod) {
135 was := isFinishedPodWithTrackingFinalizer(oldPod)
136 is := isFinishedPodWithTrackingFinalizer(newPod)
137 if was == is {
138 return
139 }
140 var event = metrics.Delete
141 if is {
142 event = metrics.Add
143 }
144 metrics.TerminatedPodsTrackingFinalizerTotal.WithLabelValues(event).Inc()
145 }
146
147 func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool {
148 if pod == nil {
149 return false
150 }
151 return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod)
152 }
153
View as plain text