1
16
17 package node
18
19 import (
20 "sync"
21 "time"
22
23 corev1 "k8s.io/api/core/v1"
24 "k8s.io/component-helpers/storage/ephemeral"
25 "k8s.io/dynamic-resource-allocation/resourceclaim"
26 pvutil "k8s.io/kubernetes/pkg/api/v1/persistentvolume"
27 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
28 "k8s.io/kubernetes/third_party/forked/gonum/graph"
29 "k8s.io/kubernetes/third_party/forked/gonum/graph/simple"
30 )
31
32
33 type namedVertex struct {
34 name string
35 namespace string
36 id int
37 vertexType vertexType
38 }
39
40 func newNamedVertex(vertexType vertexType, namespace, name string, id int) *namedVertex {
41 return &namedVertex{
42 vertexType: vertexType,
43 name: name,
44 namespace: namespace,
45 id: id,
46 }
47 }
48 func (n *namedVertex) ID() int {
49 return n.id
50 }
51 func (n *namedVertex) String() string {
52 if len(n.namespace) == 0 {
53 return vertexTypes[n.vertexType] + ":" + n.name
54 }
55 return vertexTypes[n.vertexType] + ":" + n.namespace + "/" + n.name
56 }
57
58
59
60 type destinationEdge struct {
61 F graph.Node
62 T graph.Node
63 Destination graph.Node
64 }
65
66 func newDestinationEdge(from, to, destination graph.Node) graph.Edge {
67 return &destinationEdge{F: from, T: to, Destination: destination}
68 }
69 func (e *destinationEdge) From() graph.Node { return e.F }
70 func (e *destinationEdge) To() graph.Node { return e.T }
71 func (e *destinationEdge) Weight() float64 { return 0 }
72 func (e *destinationEdge) DestinationID() int { return e.Destination.ID() }
73
74
75
76
77
78
79
80
81 type Graph struct {
82 lock sync.RWMutex
83 graph *simple.DirectedAcyclicGraph
84
85 vertices map[vertexType]namespaceVertexMapping
86
87
88 destinationEdgeIndex map[int]*intSet
89
90 destinationEdgeThreshold int
91 }
92
93
94 type namespaceVertexMapping map[string]nameVertexMapping
95
96
97 type nameVertexMapping map[string]*namedVertex
98
99 func NewGraph() *Graph {
100 return &Graph{
101 vertices: map[vertexType]namespaceVertexMapping{},
102 graph: simple.NewDirectedAcyclicGraph(0, 0),
103
104 destinationEdgeIndex: map[int]*intSet{},
105
106
107 destinationEdgeThreshold: 200,
108 }
109 }
110
111
112
113 type vertexType byte
114
115 const (
116 configMapVertexType vertexType = iota
117 sliceVertexType
118 nodeVertexType
119 podVertexType
120 pvcVertexType
121 pvVertexType
122 resourceClaimVertexType
123 secretVertexType
124 vaVertexType
125 serviceAccountVertexType
126 )
127
128 var vertexTypes = map[vertexType]string{
129 configMapVertexType: "configmap",
130 sliceVertexType: "resourceslice",
131 nodeVertexType: "node",
132 podVertexType: "pod",
133 pvcVertexType: "pvc",
134 pvVertexType: "pv",
135 resourceClaimVertexType: "resourceclaim",
136 secretVertexType: "secret",
137 vaVertexType: "volumeattachment",
138 serviceAccountVertexType: "serviceAccount",
139 }
140
141
142 func (g *Graph) getOrCreateVertex_locked(vertexType vertexType, namespace, name string) *namedVertex {
143 if vertex, exists := g.getVertex_rlocked(vertexType, namespace, name); exists {
144 return vertex
145 }
146 return g.createVertex_locked(vertexType, namespace, name)
147 }
148
149
150 func (g *Graph) getVertex_rlocked(vertexType vertexType, namespace, name string) (*namedVertex, bool) {
151 vertex, exists := g.vertices[vertexType][namespace][name]
152 return vertex, exists
153 }
154
155
156 func (g *Graph) createVertex_locked(vertexType vertexType, namespace, name string) *namedVertex {
157 typedVertices, exists := g.vertices[vertexType]
158 if !exists {
159 typedVertices = namespaceVertexMapping{}
160 g.vertices[vertexType] = typedVertices
161 }
162
163 namespacedVertices, exists := typedVertices[namespace]
164 if !exists {
165 namespacedVertices = map[string]*namedVertex{}
166 typedVertices[namespace] = namespacedVertices
167 }
168
169 vertex := newNamedVertex(vertexType, namespace, name, g.graph.NewNodeID())
170 namespacedVertices[name] = vertex
171 g.graph.AddNode(vertex)
172
173 return vertex
174 }
175
176
177 func (g *Graph) deleteVertex_locked(vertexType vertexType, namespace, name string) {
178 vertex, exists := g.getVertex_rlocked(vertexType, namespace, name)
179 if !exists {
180 return
181 }
182
183
184 neighborsToRemove := []graph.Node{}
185 edgesToRemoveFromIndexes := []graph.Edge{}
186 g.graph.VisitFrom(vertex, func(neighbor graph.Node) bool {
187
188 if g.graph.Degree(neighbor) == 1 {
189 neighborsToRemove = append(neighborsToRemove, neighbor)
190 }
191 return true
192 })
193 g.graph.VisitTo(vertex, func(neighbor graph.Node) bool {
194 if g.graph.Degree(neighbor) == 1 {
195
196 neighborsToRemove = append(neighborsToRemove, neighbor)
197 } else {
198
199 edgesToRemoveFromIndexes = append(edgesToRemoveFromIndexes, g.graph.EdgeBetween(vertex, neighbor))
200 }
201 return true
202 })
203
204
205 g.removeVertex_locked(vertex)
206
207
208 for _, neighbor := range neighborsToRemove {
209 g.removeVertex_locked(neighbor.(*namedVertex))
210 }
211
212
213 for _, edge := range edgesToRemoveFromIndexes {
214 g.removeEdgeFromDestinationIndex_locked(edge)
215 }
216 }
217
218
219
220
221 func (g *Graph) deleteEdges_locked(fromType, toType vertexType, toNamespace, toName string) {
222
223 toVert, exists := g.getVertex_rlocked(toType, toNamespace, toName)
224 if !exists {
225 return
226 }
227
228
229 neighborsToRemove := []*namedVertex{}
230 edgesToRemove := []graph.Edge{}
231 g.graph.VisitTo(toVert, func(from graph.Node) bool {
232 fromVert := from.(*namedVertex)
233 if fromVert.vertexType != fromType {
234 return true
235 }
236
237 if g.graph.Degree(fromVert) == 1 {
238 neighborsToRemove = append(neighborsToRemove, fromVert)
239 } else {
240 edgesToRemove = append(edgesToRemove, g.graph.EdgeBetween(from, toVert))
241 }
242 return true
243 })
244
245
246 for _, v := range neighborsToRemove {
247 g.removeVertex_locked(v)
248 }
249
250
251 for _, edge := range edgesToRemove {
252 g.graph.RemoveEdge(edge)
253 g.removeEdgeFromDestinationIndex_locked(edge)
254 }
255 }
256
257
258 func (g *Graph) removeEdgeFromDestinationIndex_locked(e graph.Edge) {
259 n := e.From()
260
261 edgeCount := g.graph.Degree(n)
262 if edgeCount < g.destinationEdgeThreshold {
263 delete(g.destinationEdgeIndex, n.ID())
264 return
265 }
266
267
268 index := g.destinationEdgeIndex[n.ID()]
269 if index == nil {
270 return
271 }
272 if destinationEdge, ok := e.(*destinationEdge); ok {
273 index.decrement(destinationEdge.DestinationID())
274 }
275 }
276
277
278 func (g *Graph) addEdgeToDestinationIndex_locked(e graph.Edge) {
279 n := e.From()
280 index := g.destinationEdgeIndex[n.ID()]
281 if index == nil {
282
283 g.recomputeDestinationIndex_locked(n)
284 return
285 }
286
287 if destinationEdge, ok := e.(*destinationEdge); ok {
288 index.increment(destinationEdge.DestinationID())
289 }
290 }
291
292
293
294
295 func (g *Graph) removeVertex_locked(v *namedVertex) {
296 g.graph.RemoveNode(v)
297 delete(g.destinationEdgeIndex, v.ID())
298 delete(g.vertices[v.vertexType][v.namespace], v.name)
299 if len(g.vertices[v.vertexType][v.namespace]) == 0 {
300 delete(g.vertices[v.vertexType], v.namespace)
301 }
302 }
303
304
305
306 func (g *Graph) recomputeDestinationIndex_locked(n graph.Node) {
307
308 edgeCount := g.graph.Degree(n)
309 if edgeCount < g.destinationEdgeThreshold {
310 delete(g.destinationEdgeIndex, n.ID())
311 return
312 }
313
314
315 index := g.destinationEdgeIndex[n.ID()]
316 if index == nil {
317 index = newIntSet()
318 } else {
319 index.reset()
320 }
321
322
323 g.graph.VisitFrom(n, func(dest graph.Node) bool {
324 if destinationEdge, ok := g.graph.EdgeBetween(n, dest).(*destinationEdge); ok {
325 index.increment(destinationEdge.DestinationID())
326 }
327 return true
328 })
329 g.destinationEdgeIndex[n.ID()] = index
330 }
331
332
333
334
335
336
337
338
339
340 func (g *Graph) AddPod(pod *corev1.Pod) {
341 start := time.Now()
342 defer func() {
343 graphActionsDuration.WithLabelValues("AddPod").Observe(time.Since(start).Seconds())
344 }()
345 g.lock.Lock()
346 defer g.lock.Unlock()
347
348 g.deleteVertex_locked(podVertexType, pod.Namespace, pod.Name)
349 podVertex := g.getOrCreateVertex_locked(podVertexType, pod.Namespace, pod.Name)
350 nodeVertex := g.getOrCreateVertex_locked(nodeVertexType, "", pod.Spec.NodeName)
351 g.graph.SetEdge(newDestinationEdge(podVertex, nodeVertex, nodeVertex))
352
353
354
355
356 if _, isMirrorPod := pod.Annotations[corev1.MirrorPodAnnotationKey]; isMirrorPod {
357 return
358 }
359
360
361
362
363
364 if len(pod.Spec.ServiceAccountName) > 0 {
365 serviceAccountVertex := g.getOrCreateVertex_locked(serviceAccountVertexType, pod.Namespace, pod.Spec.ServiceAccountName)
366 e := newDestinationEdge(serviceAccountVertex, podVertex, nodeVertex)
367 g.graph.SetEdge(e)
368 g.addEdgeToDestinationIndex_locked(e)
369 }
370
371 podutil.VisitPodSecretNames(pod, func(secret string) bool {
372 secretVertex := g.getOrCreateVertex_locked(secretVertexType, pod.Namespace, secret)
373 e := newDestinationEdge(secretVertex, podVertex, nodeVertex)
374 g.graph.SetEdge(e)
375 g.addEdgeToDestinationIndex_locked(e)
376 return true
377 })
378
379 podutil.VisitPodConfigmapNames(pod, func(configmap string) bool {
380 configmapVertex := g.getOrCreateVertex_locked(configMapVertexType, pod.Namespace, configmap)
381 e := newDestinationEdge(configmapVertex, podVertex, nodeVertex)
382 g.graph.SetEdge(e)
383 g.addEdgeToDestinationIndex_locked(e)
384 return true
385 })
386
387 for _, v := range pod.Spec.Volumes {
388 claimName := ""
389 if v.PersistentVolumeClaim != nil {
390 claimName = v.PersistentVolumeClaim.ClaimName
391 } else if v.Ephemeral != nil {
392 claimName = ephemeral.VolumeClaimName(pod, &v)
393 }
394 if claimName != "" {
395 pvcVertex := g.getOrCreateVertex_locked(pvcVertexType, pod.Namespace, claimName)
396 e := newDestinationEdge(pvcVertex, podVertex, nodeVertex)
397 g.graph.SetEdge(e)
398 g.addEdgeToDestinationIndex_locked(e)
399 }
400 }
401
402 for _, podResourceClaim := range pod.Spec.ResourceClaims {
403 claimName, _, err := resourceclaim.Name(pod, &podResourceClaim)
404
405
406
407
408 if err == nil && claimName != nil {
409 claimVertex := g.getOrCreateVertex_locked(resourceClaimVertexType, pod.Namespace, *claimName)
410 e := newDestinationEdge(claimVertex, podVertex, nodeVertex)
411 g.graph.SetEdge(e)
412 g.addEdgeToDestinationIndex_locked(e)
413 }
414 }
415 }
416 func (g *Graph) DeletePod(name, namespace string) {
417 start := time.Now()
418 defer func() {
419 graphActionsDuration.WithLabelValues("DeletePod").Observe(time.Since(start).Seconds())
420 }()
421 g.lock.Lock()
422 defer g.lock.Unlock()
423 g.deleteVertex_locked(podVertexType, namespace, name)
424 }
425
426
427
428
429
430
431 func (g *Graph) AddPV(pv *corev1.PersistentVolume) {
432 start := time.Now()
433 defer func() {
434 graphActionsDuration.WithLabelValues("AddPV").Observe(time.Since(start).Seconds())
435 }()
436 g.lock.Lock()
437 defer g.lock.Unlock()
438
439
440 g.deleteVertex_locked(pvVertexType, "", pv.Name)
441
442
443 if pv.Spec.ClaimRef != nil {
444 pvVertex := g.getOrCreateVertex_locked(pvVertexType, "", pv.Name)
445
446
447 g.graph.SetEdge(simple.Edge{F: pvVertex, T: g.getOrCreateVertex_locked(pvcVertexType, pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)})
448 pvutil.VisitPVSecretNames(pv, func(namespace, secret string, kubeletVisible bool) bool {
449
450 if kubeletVisible {
451 g.graph.SetEdge(simple.Edge{F: g.getOrCreateVertex_locked(secretVertexType, namespace, secret), T: pvVertex})
452 }
453 return true
454 })
455 }
456 }
457 func (g *Graph) DeletePV(name string) {
458 start := time.Now()
459 defer func() {
460 graphActionsDuration.WithLabelValues("DeletePV").Observe(time.Since(start).Seconds())
461 }()
462 g.lock.Lock()
463 defer g.lock.Unlock()
464 g.deleteVertex_locked(pvVertexType, "", name)
465 }
466
467
468
469
470 func (g *Graph) AddVolumeAttachment(attachmentName, nodeName string) {
471 start := time.Now()
472 defer func() {
473 graphActionsDuration.WithLabelValues("AddVolumeAttachment").Observe(time.Since(start).Seconds())
474 }()
475 g.lock.Lock()
476 defer g.lock.Unlock()
477
478
479 g.deleteVertex_locked(vaVertexType, "", attachmentName)
480
481
482 if len(nodeName) > 0 {
483 vaVertex := g.getOrCreateVertex_locked(vaVertexType, "", attachmentName)
484 nodeVertex := g.getOrCreateVertex_locked(nodeVertexType, "", nodeName)
485 g.graph.SetEdge(newDestinationEdge(vaVertex, nodeVertex, nodeVertex))
486 }
487 }
488 func (g *Graph) DeleteVolumeAttachment(name string) {
489 start := time.Now()
490 defer func() {
491 graphActionsDuration.WithLabelValues("DeleteVolumeAttachment").Observe(time.Since(start).Seconds())
492 }()
493 g.lock.Lock()
494 defer g.lock.Unlock()
495 g.deleteVertex_locked(vaVertexType, "", name)
496 }
497
498
499
500
501 func (g *Graph) AddResourceSlice(sliceName, nodeName string) {
502 start := time.Now()
503 defer func() {
504 graphActionsDuration.WithLabelValues("AddResourceSlice").Observe(time.Since(start).Seconds())
505 }()
506 g.lock.Lock()
507 defer g.lock.Unlock()
508
509
510 g.deleteVertex_locked(sliceVertexType, "", sliceName)
511
512
513 if len(nodeName) > 0 {
514 sliceVertex := g.getOrCreateVertex_locked(sliceVertexType, "", sliceName)
515 nodeVertex := g.getOrCreateVertex_locked(nodeVertexType, "", nodeName)
516 g.graph.SetEdge(newDestinationEdge(sliceVertex, nodeVertex, nodeVertex))
517 }
518 }
519 func (g *Graph) DeleteResourceSlice(sliceName string) {
520 start := time.Now()
521 defer func() {
522 graphActionsDuration.WithLabelValues("DeleteResourceSlice").Observe(time.Since(start).Seconds())
523 }()
524 g.lock.Lock()
525 defer g.lock.Unlock()
526 g.deleteVertex_locked(sliceVertexType, "", sliceName)
527 }
528
View as plain text