...
1
16
17 package state
18
19 import (
20 "fmt"
21 "sync"
22
23 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/apimachinery/pkg/util/sets"
26 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
27 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
28 )
29
30 var _ CheckpointState = &stateCheckpoint{}
31
32
33 type CheckpointState interface {
34 GetOrCreate() (ClaimInfoStateList, error)
35 Store(ClaimInfoStateList) error
36 }
37
38
39 type ClaimInfoState struct {
40
41 DriverName string
42
43
44 ClassName string
45
46
47 ClaimUID types.UID
48
49
50 ClaimName string
51
52
53 Namespace string
54
55
56 PodUIDs sets.Set[string]
57
58
59 ResourceHandles []resourcev1alpha2.ResourceHandle
60
61
62
63 CDIDevices map[string][]string
64 }
65
66
67
68 type ClaimInfoStateWithoutResourceHandles struct {
69
70 DriverName string
71
72
73 ClassName string
74
75
76 ClaimUID types.UID
77
78
79 ClaimName string
80
81
82 Namespace string
83
84
85 PodUIDs sets.Set[string]
86
87
88
89 CDIDevices map[string][]string
90 }
91
92 type stateCheckpoint struct {
93 sync.RWMutex
94 checkpointManager checkpointmanager.CheckpointManager
95 checkpointName string
96 }
97
98
99 func NewCheckpointState(stateDir, checkpointName string) (*stateCheckpoint, error) {
100 if len(checkpointName) == 0 {
101 return nil, fmt.Errorf("received empty string instead of checkpointName")
102 }
103
104 checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
105 if err != nil {
106 return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
107 }
108 stateCheckpoint := &stateCheckpoint{
109 checkpointManager: checkpointManager,
110 checkpointName: checkpointName,
111 }
112
113 return stateCheckpoint, nil
114 }
115
116
117 func (sc *stateCheckpoint) GetOrCreate() (ClaimInfoStateList, error) {
118 sc.Lock()
119 defer sc.Unlock()
120
121 checkpoint := NewDRAManagerCheckpoint()
122 err := sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
123 if err == errors.ErrCheckpointNotFound {
124 sc.store(ClaimInfoStateList{})
125 return ClaimInfoStateList{}, nil
126 }
127 if err != nil {
128 return nil, fmt.Errorf("failed to get checkpoint %v: %v", sc.checkpointName, err)
129 }
130
131 return checkpoint.Entries, nil
132 }
133
134
135 func (sc *stateCheckpoint) Store(claimInfoStateList ClaimInfoStateList) error {
136 sc.Lock()
137 defer sc.Unlock()
138
139 return sc.store(claimInfoStateList)
140 }
141
142
143 func (sc *stateCheckpoint) store(claimInfoStateList ClaimInfoStateList) error {
144 checkpoint := NewDRAManagerCheckpoint()
145 checkpoint.Entries = claimInfoStateList
146
147 err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
148 if err != nil {
149 return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
150 }
151 return nil
152 }
153
View as plain text