1
16
17 package node
18
19 import (
20 "time"
21
22 "k8s.io/klog/v2"
23
24 corev1 "k8s.io/api/core/v1"
25 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
26 storagev1 "k8s.io/api/storage/v1"
27 "k8s.io/apimachinery/pkg/util/wait"
28 corev1informers "k8s.io/client-go/informers/core/v1"
29 resourcev1alpha2informers "k8s.io/client-go/informers/resource/v1alpha2"
30 storageinformers "k8s.io/client-go/informers/storage/v1"
31 "k8s.io/client-go/tools/cache"
32 )
33
34 type graphPopulator struct {
35 graph *Graph
36 }
37
38 func AddGraphEventHandlers(
39 graph *Graph,
40 nodes corev1informers.NodeInformer,
41 pods corev1informers.PodInformer,
42 pvs corev1informers.PersistentVolumeInformer,
43 attachments storageinformers.VolumeAttachmentInformer,
44 slices resourcev1alpha2informers.ResourceSliceInformer,
45 ) {
46 g := &graphPopulator{
47 graph: graph,
48 }
49
50 podHandler, _ := pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
51 AddFunc: g.addPod,
52 UpdateFunc: g.updatePod,
53 DeleteFunc: g.deletePod,
54 })
55
56 pvsHandler, _ := pvs.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
57 AddFunc: g.addPV,
58 UpdateFunc: g.updatePV,
59 DeleteFunc: g.deletePV,
60 })
61
62 attachHandler, _ := attachments.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
63 AddFunc: g.addVolumeAttachment,
64 UpdateFunc: g.updateVolumeAttachment,
65 DeleteFunc: g.deleteVolumeAttachment,
66 })
67
68 synced := []cache.InformerSynced{
69 podHandler.HasSynced, pvsHandler.HasSynced, attachHandler.HasSynced,
70 }
71
72 if slices != nil {
73 sliceHandler, _ := slices.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
74 AddFunc: g.addResourceSlice,
75 UpdateFunc: nil,
76 DeleteFunc: g.deleteResourceSlice,
77 })
78 synced = append(synced, sliceHandler.HasSynced)
79 }
80
81 go cache.WaitForNamedCacheSync("node_authorizer", wait.NeverStop, synced...)
82 }
83
84 func (g *graphPopulator) addPod(obj interface{}) {
85 g.updatePod(nil, obj)
86 }
87
88 func (g *graphPopulator) updatePod(oldObj, obj interface{}) {
89 pod := obj.(*corev1.Pod)
90 if len(pod.Spec.NodeName) == 0 {
91
92 klog.V(5).Infof("updatePod %s/%s, no node", pod.Namespace, pod.Name)
93 return
94 }
95 if oldPod, ok := oldObj.(*corev1.Pod); ok && oldPod != nil {
96 if (pod.Spec.NodeName == oldPod.Spec.NodeName) && (pod.UID == oldPod.UID) &&
97 resourceClaimStatusesEqual(oldPod.Status.ResourceClaimStatuses, pod.Status.ResourceClaimStatuses) {
98
99 klog.V(5).Infof("updatePod %s/%s, node unchanged", pod.Namespace, pod.Name)
100 return
101 }
102 }
103
104 klog.V(4).Infof("updatePod %s/%s for node %s", pod.Namespace, pod.Name, pod.Spec.NodeName)
105 startTime := time.Now()
106 g.graph.AddPod(pod)
107 klog.V(5).Infof("updatePod %s/%s for node %s completed in %v", pod.Namespace, pod.Name, pod.Spec.NodeName, time.Since(startTime))
108 }
109
110 func resourceClaimStatusesEqual(statusA, statusB []corev1.PodResourceClaimStatus) bool {
111 if len(statusA) != len(statusB) {
112 return false
113 }
114
115
116
117 for i := range statusA {
118 if statusA[i].Name != statusB[i].Name {
119 return false
120 }
121 claimNameA := statusA[i].ResourceClaimName
122 claimNameB := statusB[i].ResourceClaimName
123 if (claimNameA == nil) != (claimNameB == nil) {
124 return false
125 }
126 if claimNameA != nil && *claimNameA != *claimNameB {
127 return false
128 }
129 }
130 return true
131 }
132
133 func (g *graphPopulator) deletePod(obj interface{}) {
134 if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
135 obj = tombstone.Obj
136 }
137 pod, ok := obj.(*corev1.Pod)
138 if !ok {
139 klog.Infof("unexpected type %T", obj)
140 return
141 }
142 if len(pod.Spec.NodeName) == 0 {
143 klog.V(5).Infof("deletePod %s/%s, no node", pod.Namespace, pod.Name)
144 return
145 }
146
147 klog.V(4).Infof("deletePod %s/%s for node %s", pod.Namespace, pod.Name, pod.Spec.NodeName)
148 startTime := time.Now()
149 g.graph.DeletePod(pod.Name, pod.Namespace)
150 klog.V(5).Infof("deletePod %s/%s for node %s completed in %v", pod.Namespace, pod.Name, pod.Spec.NodeName, time.Since(startTime))
151 }
152
153 func (g *graphPopulator) addPV(obj interface{}) {
154 g.updatePV(nil, obj)
155 }
156
157 func (g *graphPopulator) updatePV(oldObj, obj interface{}) {
158 pv := obj.(*corev1.PersistentVolume)
159
160 g.graph.AddPV(pv)
161 }
162
163 func (g *graphPopulator) deletePV(obj interface{}) {
164 if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
165 obj = tombstone.Obj
166 }
167 pv, ok := obj.(*corev1.PersistentVolume)
168 if !ok {
169 klog.Infof("unexpected type %T", obj)
170 return
171 }
172 g.graph.DeletePV(pv.Name)
173 }
174
175 func (g *graphPopulator) addVolumeAttachment(obj interface{}) {
176 g.updateVolumeAttachment(nil, obj)
177 }
178
179 func (g *graphPopulator) updateVolumeAttachment(oldObj, obj interface{}) {
180 attachment := obj.(*storagev1.VolumeAttachment)
181 if oldObj != nil {
182
183 oldAttachment := oldObj.(*storagev1.VolumeAttachment)
184 if oldAttachment.Spec.NodeName == attachment.Spec.NodeName {
185 return
186 }
187 }
188 g.graph.AddVolumeAttachment(attachment.Name, attachment.Spec.NodeName)
189 }
190
191 func (g *graphPopulator) deleteVolumeAttachment(obj interface{}) {
192 if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
193 obj = tombstone.Obj
194 }
195 attachment, ok := obj.(*storagev1.VolumeAttachment)
196 if !ok {
197 klog.Infof("unexpected type %T", obj)
198 return
199 }
200 g.graph.DeleteVolumeAttachment(attachment.Name)
201 }
202
203 func (g *graphPopulator) addResourceSlice(obj interface{}) {
204 slice, ok := obj.(*resourcev1alpha2.ResourceSlice)
205 if !ok {
206 klog.Infof("unexpected type %T", obj)
207 return
208 }
209 g.graph.AddResourceSlice(slice.Name, slice.NodeName)
210 }
211
212 func (g *graphPopulator) deleteResourceSlice(obj interface{}) {
213 if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
214 obj = tombstone.Obj
215 }
216 slice, ok := obj.(*resourcev1alpha2.ResourceSlice)
217 if !ok {
218 klog.Infof("unexpected type %T", obj)
219 return
220 }
221 g.graph.DeleteResourceSlice(slice.Name)
222 }
223
View as plain text