1
16
17 package dra
18
19 import (
20 "context"
21 "fmt"
22
23 v1 "k8s.io/api/core/v1"
24 resourceapi "k8s.io/api/resource/v1alpha2"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/types"
27 clientset "k8s.io/client-go/kubernetes"
28 "k8s.io/dynamic-resource-allocation/resourceclaim"
29 "k8s.io/klog/v2"
30 drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
31 dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
32 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
33 )
34
35
36 const draManagerStateFileName = "dra_manager_state"
37
38
39 type ManagerImpl struct {
40
41 cache *claimInfoCache
42
43
44 kubeClient clientset.Interface
45 }
46
47
48 func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) {
49 klog.V(2).InfoS("Creating DRA manager")
50
51 claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
52 if err != nil {
53 return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err)
54 }
55
56 manager := &ManagerImpl{
57 cache: claimInfoCache,
58 kubeClient: kubeClient,
59 }
60
61 return manager, nil
62 }
63
64
65
66
67
68 func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
69 batches := make(map[string][]*drapb.Claim)
70 claimInfos := make(map[types.UID]*ClaimInfo)
71 for i := range pod.Spec.ResourceClaims {
72 podClaim := &pod.Spec.ResourceClaims[i]
73 klog.V(3).InfoS("Processing resource", "podClaim", podClaim.Name, "pod", pod.Name)
74 claimName, mustCheckOwner, err := resourceclaim.Name(pod, podClaim)
75 if err != nil {
76 return fmt.Errorf("prepare resource claim: %v", err)
77 }
78
79 if claimName == nil {
80
81 continue
82 }
83
84 resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
85 context.TODO(),
86 *claimName,
87 metav1.GetOptions{})
88 if err != nil {
89 return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", *claimName, pod.Name, err)
90 }
91
92 if mustCheckOwner {
93 if err = resourceclaim.IsForPod(pod, resourceClaim); err != nil {
94 return err
95 }
96 }
97
98
99 if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
100 return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
101 pod.Name, pod.UID, *claimName, resourceClaim.UID)
102 }
103
104
105
106 if !claimIsUsedByPod(podClaim, pod) {
107 klog.V(5).InfoS("Skipping unused resource", "claim", claimName, "pod", pod.Name)
108 continue
109 }
110
111 claimInfo := m.cache.get(*claimName, pod.Namespace)
112 if claimInfo == nil {
113
114
115 claimInfo = newClaimInfoFromResourceClaim(resourceClaim)
116 }
117
118
119
120
121
122
123
124
125
126 claimInfo.addPodReference(pod.UID)
127
128 if claimInfo.prepared {
129
130 continue
131 }
132
133
134 for _, resourceHandle := range claimInfo.ResourceHandles {
135
136
137 pluginName := resourceHandle.DriverName
138 if pluginName == "" {
139 pluginName = resourceClaim.Status.DriverName
140 }
141 claim := &drapb.Claim{
142 Namespace: resourceClaim.Namespace,
143 Uid: string(resourceClaim.UID),
144 Name: resourceClaim.Name,
145 ResourceHandle: resourceHandle.Data,
146 }
147 if resourceHandle.StructuredData != nil {
148 claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData}
149 }
150 batches[pluginName] = append(batches[pluginName], claim)
151 }
152 claimInfos[resourceClaim.UID] = claimInfo
153 }
154
155
156
157
158 for pluginName, claims := range batches {
159
160 client, err := dra.NewDRAPluginClient(pluginName)
161 if err != nil {
162 return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
163 }
164 response, err := client.NodePrepareResources(context.Background(), &drapb.NodePrepareResourcesRequest{Claims: claims})
165 if err != nil {
166
167 return fmt.Errorf("NodePrepareResources failed: %v", err)
168 }
169 for claimUID, result := range response.Claims {
170 reqClaim := lookupClaimRequest(claims, claimUID)
171 if reqClaim == nil {
172 return fmt.Errorf("NodePrepareResources returned result for unknown claim UID %s", claimUID)
173 }
174 if result.Error != "" {
175 return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
176 }
177
178 claimInfo := claimInfos[types.UID(claimUID)]
179
180
181
182 err = claimInfo.addCDIDevices(pluginName, result.CDIDevices)
183 if err != nil {
184 return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
185 }
186
187 claimInfo.prepared = true
188
189
190
191
192
193
194
195
196
197
198 m.cache.add(claimInfo)
199 }
200
201
202
203 err = m.cache.syncToCheckpoint()
204 if err != nil {
205 return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
206 }
207
208 unfinished := len(claims) - len(response.Claims)
209 if unfinished != 0 {
210 return fmt.Errorf("NodePrepareResources left out %d claims", unfinished)
211 }
212 }
213
214 err := m.cache.syncToCheckpoint()
215 if err != nil {
216 return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
217 }
218 return nil
219 }
220
221 func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
222 for _, claim := range claims {
223 if claim.Uid == claimUID {
224 return claim
225 }
226 }
227 return nil
228 }
229
230 func claimIsUsedByPod(podClaim *v1.PodResourceClaim, pod *v1.Pod) bool {
231 if claimIsUsedByContainers(podClaim, pod.Spec.InitContainers) {
232 return true
233 }
234 if claimIsUsedByContainers(podClaim, pod.Spec.Containers) {
235 return true
236 }
237 return false
238 }
239
240 func claimIsUsedByContainers(podClaim *v1.PodResourceClaim, containers []v1.Container) bool {
241 for i := range containers {
242 if claimIsUsedByContainer(podClaim, &containers[i]) {
243 return true
244 }
245 }
246 return false
247 }
248
249 func claimIsUsedByContainer(podClaim *v1.PodResourceClaim, container *v1.Container) bool {
250 for _, c := range container.Resources.Claims {
251 if c.Name == podClaim.Name {
252 return true
253 }
254 }
255 return false
256 }
257
258
259
260 func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
261 annotations := []kubecontainer.Annotation{}
262 cdiDevices := []kubecontainer.CDIDevice{}
263
264 for i, podResourceClaim := range pod.Spec.ResourceClaims {
265 claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
266 if err != nil {
267 return nil, fmt.Errorf("list resource claims: %v", err)
268 }
269
270
271
272 if claimName == nil {
273 continue
274 }
275 for _, claim := range container.Resources.Claims {
276 if podResourceClaim.Name != claim.Name {
277 continue
278 }
279
280 claimInfo := m.cache.get(*claimName, pod.Namespace)
281 if claimInfo == nil {
282 return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName)
283 }
284
285 claimInfo.RLock()
286 claimAnnotations := claimInfo.annotationsAsList()
287 klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations)
288 annotations = append(annotations, claimAnnotations...)
289 for _, devices := range claimInfo.CDIDevices {
290 for _, device := range devices {
291 cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})
292 }
293 }
294 claimInfo.RUnlock()
295 }
296 }
297
298 return &ContainerInfo{Annotations: annotations, CDIDevices: cdiDevices}, nil
299 }
300
301
302
303
304
305 func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
306 batches := make(map[string][]*drapb.Claim)
307 claimInfos := make(map[types.UID]*ClaimInfo)
308 for i := range pod.Spec.ResourceClaims {
309 claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
310 if err != nil {
311 return fmt.Errorf("unprepare resource claim: %v", err)
312 }
313
314
315
316
317 if claimName == nil {
318 continue
319 }
320
321 claimInfo := m.cache.get(*claimName, pod.Namespace)
322
323
324 if claimInfo == nil {
325 continue
326 }
327
328
329 if len(claimInfo.PodUIDs) > 1 {
330
331
332
333
334
335 claimInfo.deletePodReference(pod.UID)
336 continue
337 }
338
339
340 for _, resourceHandle := range claimInfo.ResourceHandles {
341
342
343 pluginName := resourceHandle.DriverName
344 if pluginName == "" {
345 pluginName = claimInfo.DriverName
346 }
347
348 claim := &drapb.Claim{
349 Namespace: claimInfo.Namespace,
350 Uid: string(claimInfo.ClaimUID),
351 Name: claimInfo.ClaimName,
352 ResourceHandle: resourceHandle.Data,
353 }
354 if resourceHandle.StructuredData != nil {
355 claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData}
356 }
357 batches[pluginName] = append(batches[pluginName], claim)
358 }
359 claimInfos[claimInfo.ClaimUID] = claimInfo
360 }
361
362
363
364
365 for pluginName, claims := range batches {
366
367 client, err := dra.NewDRAPluginClient(pluginName)
368 if err != nil {
369 return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
370 }
371 response, err := client.NodeUnprepareResources(context.Background(), &drapb.NodeUnprepareResourcesRequest{Claims: claims})
372 if err != nil {
373
374 return fmt.Errorf("NodeUnprepareResources failed: %v", err)
375 }
376
377 for claimUID, result := range response.Claims {
378 reqClaim := lookupClaimRequest(claims, claimUID)
379 if reqClaim == nil {
380 return fmt.Errorf("NodeUnprepareResources returned result for unknown claim UID %s", claimUID)
381 }
382 if result.Error != "" {
383 return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
384 }
385
386
387
388
389
390 claimInfo := claimInfos[types.UID(claimUID)]
391 claimInfo.deletePodReference(pod.UID)
392 m.cache.delete(claimInfo.ClaimName, pod.Namespace)
393 }
394
395
396 err = m.cache.syncToCheckpoint()
397 if err != nil {
398 return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
399 }
400
401 unfinished := len(claims) - len(response.Claims)
402 if unfinished != 0 {
403 return fmt.Errorf("NodeUnprepareResources left out %d claims", unfinished)
404 }
405 }
406
407
408 err := m.cache.syncToCheckpoint()
409 if err != nil {
410 return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
411 }
412 return nil
413 }
414
415
416
417 func (m *ManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
418 return m.cache.hasPodReference(UID)
419 }
420
421
422 func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) {
423 claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims))
424
425 for i, podResourceClaim := range pod.Spec.ResourceClaims {
426 claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
427 if err != nil {
428 return nil, fmt.Errorf("determine resource claim information: %v", err)
429 }
430
431 for _, claim := range container.Resources.Claims {
432 if podResourceClaim.Name != claim.Name {
433 continue
434 }
435 claimInfo := m.cache.get(*claimName, pod.Namespace)
436 if claimInfo == nil {
437 return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName)
438 }
439 claimInfos = append(claimInfos, claimInfo)
440 }
441 }
442 return claimInfos, nil
443 }
444
View as plain text