1
16
17 package reconciler
18
19 import (
20 "fmt"
21 "io/fs"
22 "os"
23 "path/filepath"
24 "time"
25
26 "github.com/go-logr/logr"
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/klog/v2"
31 "k8s.io/kubernetes/pkg/kubelet/config"
32 "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
33 volumepkg "k8s.io/kubernetes/pkg/volume"
34 "k8s.io/kubernetes/pkg/volume/util"
35 "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
36 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
37 utilpath "k8s.io/utils/path"
38 utilstrings "k8s.io/utils/strings"
39 )
40
41
42
43 var _ logr.Marshaler = podVolume{}
44 var _ logr.Marshaler = reconstructedVolume{}
45 var _ logr.Marshaler = globalVolumeInfo{}
46
47 type podVolume struct {
48 podName volumetypes.UniquePodName
49 volumeSpecName string
50 volumePath string
51 pluginName string
52 volumeMode v1.PersistentVolumeMode
53 }
54
55 func (p podVolume) MarshalLog() interface{} {
56 return struct {
57 PodName string `json:"podName"`
58 VolumeSpecName string `json:"volumeSpecName"`
59 VolumePath string `json:"volumePath"`
60 PluginName string `json:"pluginName"`
61 VolumeMode string `json:"volumeMode"`
62 }{
63 PodName: string(p.podName),
64 VolumeSpecName: p.volumeSpecName,
65 VolumePath: p.volumePath,
66 PluginName: p.pluginName,
67 VolumeMode: string(p.volumeMode),
68 }
69 }
70
71 type reconstructedVolume struct {
72 volumeName v1.UniqueVolumeName
73 podName volumetypes.UniquePodName
74 volumeSpec *volumepkg.Spec
75 outerVolumeSpecName string
76 pod *v1.Pod
77 volumeGidValue string
78 devicePath string
79 mounter volumepkg.Mounter
80 deviceMounter volumepkg.DeviceMounter
81 blockVolumeMapper volumepkg.BlockVolumeMapper
82 seLinuxMountContext string
83 }
84
85 func (rv reconstructedVolume) MarshalLog() interface{} {
86 return struct {
87 VolumeName string `json:"volumeName"`
88 PodName string `json:"podName"`
89 VolumeSpecName string `json:"volumeSpecName"`
90 OuterVolumeSpecName string `json:"outerVolumeSpecName"`
91 PodUID string `json:"podUID"`
92 VolumeGIDValue string `json:"volumeGIDValue"`
93 DevicePath string `json:"devicePath"`
94 SeLinuxMountContext string `json:"seLinuxMountContext"`
95 }{
96 VolumeName: string(rv.volumeName),
97 PodName: string(rv.podName),
98 VolumeSpecName: rv.volumeSpec.Name(),
99 OuterVolumeSpecName: rv.outerVolumeSpecName,
100 PodUID: string(rv.pod.UID),
101 VolumeGIDValue: rv.volumeGidValue,
102 DevicePath: rv.devicePath,
103 SeLinuxMountContext: rv.seLinuxMountContext,
104 }
105 }
106
107
108
109 type globalVolumeInfo struct {
110 volumeName v1.UniqueVolumeName
111 volumeSpec *volumepkg.Spec
112 devicePath string
113 mounter volumepkg.Mounter
114 deviceMounter volumepkg.DeviceMounter
115 blockVolumeMapper volumepkg.BlockVolumeMapper
116 podVolumes map[volumetypes.UniquePodName]*reconstructedVolume
117 }
118
119 func (gvi globalVolumeInfo) MarshalLog() interface{} {
120 podVolumes := make(map[volumetypes.UniquePodName]v1.UniqueVolumeName)
121 for podName, volume := range gvi.podVolumes {
122 podVolumes[podName] = volume.volumeName
123 }
124
125 return struct {
126 VolumeName string `json:"volumeName"`
127 VolumeSpecName string `json:"volumeSpecName"`
128 DevicePath string `json:"devicePath"`
129 PodVolumes map[volumetypes.UniquePodName]v1.UniqueVolumeName `json:"podVolumes"`
130 }{
131 VolumeName: string(gvi.volumeName),
132 VolumeSpecName: gvi.volumeSpec.Name(),
133 DevicePath: gvi.devicePath,
134 PodVolumes: podVolumes,
135 }
136 }
137
138 func (rc *reconciler) updateLastSyncTime() {
139 rc.timeOfLastSyncLock.Lock()
140 defer rc.timeOfLastSyncLock.Unlock()
141 rc.timeOfLastSync = time.Now()
142 }
143
144 func (rc *reconciler) StatesHasBeenSynced() bool {
145 rc.timeOfLastSyncLock.Lock()
146 defer rc.timeOfLastSyncLock.Unlock()
147 return !rc.timeOfLastSync.IsZero()
148 }
149
150 func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
151 if gvi.podVolumes == nil {
152 gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
153 }
154 gvi.podVolumes[rcv.podName] = rcv
155 }
156
157 func (rc *reconciler) cleanupMounts(volume podVolume) {
158 klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
159 mountedVolume := operationexecutor.MountedVolume{
160 PodName: volume.podName,
161
162
163 VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
164 InnerVolumeSpecName: volume.volumeSpecName,
165 PluginName: volume.pluginName,
166 PodUID: types.UID(volume.podName),
167 }
168 metrics.ForceCleanedFailedVolumeOperationsTotal.Inc()
169
170
171 err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
172 if err != nil {
173 metrics.ForceCleanedFailedVolumeOperationsErrorsTotal.Inc()
174 klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
175 return
176 }
177 }
178
179
180
181
182 func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
183 if gvi.blockVolumeMapper != nil {
184
185 return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
186 } else if gvi.deviceMounter != nil {
187
188 return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
189 } else {
190 return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
191 }
192 }
193
194
195
196
197 func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
198 podsDirInfo, err := os.ReadDir(podDir)
199 if err != nil {
200 return nil, err
201 }
202 volumes := []podVolume{}
203 for i := range podsDirInfo {
204 if !podsDirInfo[i].IsDir() {
205 continue
206 }
207 podName := podsDirInfo[i].Name()
208 podDir := filepath.Join(podDir, podName)
209
210
211
212 volumesDirs := map[v1.PersistentVolumeMode]string{
213 v1.PersistentVolumeFilesystem: filepath.Join(podDir, config.DefaultKubeletVolumesDirName),
214 }
215
216
217 volumesDirs[v1.PersistentVolumeBlock] = filepath.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
218
219 for volumeMode, volumesDir := range volumesDirs {
220 var volumesDirInfo []fs.DirEntry
221 if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil {
222
223 continue
224 }
225 for _, volumeDir := range volumesDirInfo {
226 pluginName := volumeDir.Name()
227 volumePluginPath := filepath.Join(volumesDir, pluginName)
228 volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
229 if err != nil {
230 klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
231 continue
232 }
233 unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
234 for _, volumeName := range volumePluginDirs {
235 volumePath := filepath.Join(volumePluginPath, volumeName)
236 klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
237 volumes = append(volumes, podVolume{
238 podName: volumetypes.UniquePodName(podName),
239 volumeSpecName: volumeName,
240 volumePath: volumePath,
241 pluginName: unescapePluginName,
242 volumeMode: volumeMode,
243 })
244 }
245 }
246 }
247 }
248 for _, volume := range volumes {
249 klog.V(4).InfoS("Get volume from pod directory", "path", podDir, "volume", volume)
250 }
251 return volumes, nil
252 }
253
254
255 func (rc *reconciler) reconstructVolume(volume podVolume) (rvolume *reconstructedVolume, rerr error) {
256 metrics.ReconstructVolumeOperationsTotal.Inc()
257 defer func() {
258 if rerr != nil {
259 metrics.ReconstructVolumeOperationsErrorsTotal.Inc()
260 }
261 }()
262
263
264 plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
265 if err != nil {
266 return nil, err
267 }
268
269
270 pod := &v1.Pod{
271 ObjectMeta: metav1.ObjectMeta{
272 UID: types.UID(volume.podName),
273 },
274 }
275 mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
276 if err != nil {
277 return nil, err
278 }
279 if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
280 return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
281 }
282
283 reconstructed, err := rc.operationExecutor.ReconstructVolumeOperation(
284 volume.volumeMode,
285 plugin,
286 mapperPlugin,
287 pod.UID,
288 volume.podName,
289 volume.volumeSpecName,
290 volume.volumePath,
291 volume.pluginName)
292 if err != nil {
293 return nil, err
294 }
295 volumeSpec := reconstructed.Spec
296 if volumeSpec == nil {
297 return nil, fmt.Errorf("failed to reconstruct volume for plugin %q (spec.Name: %q) pod %q (UID: %q): got nil", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
298 }
299
300
301
302
303
304
305 deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
306 if err != nil {
307 return nil, err
308 }
309
310
311
312 needsNameFromSpec := deviceMountablePlugin != nil
313 if !needsNameFromSpec {
314
315
316
317
318 attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
319 if err != nil {
320 return nil, err
321 }
322 needsNameFromSpec = attachablePlugin != nil
323 }
324
325 var uniqueVolumeName v1.UniqueVolumeName
326 if needsNameFromSpec {
327 uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
328 if err != nil {
329 return nil, err
330 }
331 } else {
332 uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
333 }
334
335 var volumeMapper volumepkg.BlockVolumeMapper
336 var volumeMounter volumepkg.Mounter
337 var deviceMounter volumepkg.DeviceMounter
338
339 if volume.volumeMode == v1.PersistentVolumeBlock {
340 var newMapperErr error
341 volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
342 volumeSpec,
343 pod,
344 volumepkg.VolumeOptions{})
345 if newMapperErr != nil {
346 return nil, fmt.Errorf(
347 "reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
348 uniqueVolumeName,
349 volumeSpec.Name(),
350 volume.podName,
351 pod.UID,
352 newMapperErr)
353 }
354 } else {
355 var err error
356 volumeMounter, err = plugin.NewMounter(
357 volumeSpec,
358 pod,
359 volumepkg.VolumeOptions{})
360 if err != nil {
361 return nil, fmt.Errorf(
362 "reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
363 uniqueVolumeName,
364 volumeSpec.Name(),
365 volume.podName,
366 pod.UID,
367 err)
368 }
369 if deviceMountablePlugin != nil {
370 deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
371 if err != nil {
372 return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
373 uniqueVolumeName,
374 volumeSpec.Name(),
375 volume.podName,
376 pod.UID,
377 err)
378 }
379 }
380 }
381
382 reconstructedVolume := &reconstructedVolume{
383 volumeName: uniqueVolumeName,
384 podName: volume.podName,
385 volumeSpec: volumeSpec,
386
387
388
389
390 outerVolumeSpecName: volume.volumeSpecName,
391 pod: pod,
392 deviceMounter: deviceMounter,
393 volumeGidValue: "",
394
395
396 devicePath: "",
397 mounter: volumeMounter,
398 blockVolumeMapper: volumeMapper,
399 seLinuxMountContext: reconstructed.SELinuxMountContext,
400 }
401 return reconstructedVolume, nil
402 }
403
View as plain text