1
16
17 package reconciler
18
19 import (
20 "fmt"
21 "sync"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/api/resource"
26 "k8s.io/apimachinery/pkg/types"
27 clientset "k8s.io/client-go/kubernetes"
28 "k8s.io/klog/v2"
29 "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
30 "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
31 volumepkg "k8s.io/kubernetes/pkg/volume"
32 "k8s.io/kubernetes/pkg/volume/util/hostutil"
33 "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
34 "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
35 "k8s.io/mount-utils"
36 )
37
38
39
40
41
42
43
44 type Reconciler interface {
45
46
47
48
49
50
51
52 Run(stopCh <-chan struct{})
53
54
55
56 StatesHasBeenSynced() bool
57 }
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 func NewReconciler(
91 kubeClient clientset.Interface,
92 controllerAttachDetachEnabled bool,
93 loopSleepDuration time.Duration,
94 waitForAttachTimeout time.Duration,
95 nodeName types.NodeName,
96 desiredStateOfWorld cache.DesiredStateOfWorld,
97 actualStateOfWorld cache.ActualStateOfWorld,
98 populatorHasAddedPods func() bool,
99 operationExecutor operationexecutor.OperationExecutor,
100 mounter mount.Interface,
101 hostutil hostutil.HostUtils,
102 volumePluginMgr *volumepkg.VolumePluginMgr,
103 kubeletPodsDir string) Reconciler {
104 return &reconciler{
105 kubeClient: kubeClient,
106 controllerAttachDetachEnabled: controllerAttachDetachEnabled,
107 loopSleepDuration: loopSleepDuration,
108 waitForAttachTimeout: waitForAttachTimeout,
109 nodeName: nodeName,
110 desiredStateOfWorld: desiredStateOfWorld,
111 actualStateOfWorld: actualStateOfWorld,
112 populatorHasAddedPods: populatorHasAddedPods,
113 operationExecutor: operationExecutor,
114 mounter: mounter,
115 hostutil: hostutil,
116 skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{},
117 volumePluginMgr: volumePluginMgr,
118 kubeletPodsDir: kubeletPodsDir,
119 timeOfLastSync: time.Time{},
120 volumesFailedReconstruction: make([]podVolume, 0),
121 volumesNeedUpdateFromNodeStatus: make([]v1.UniqueVolumeName, 0),
122 volumesNeedReportedInUse: make([]v1.UniqueVolumeName, 0),
123 }
124 }
125
126 type reconciler struct {
127 kubeClient clientset.Interface
128 controllerAttachDetachEnabled bool
129 loopSleepDuration time.Duration
130 waitForAttachTimeout time.Duration
131 nodeName types.NodeName
132 desiredStateOfWorld cache.DesiredStateOfWorld
133 actualStateOfWorld cache.ActualStateOfWorld
134 populatorHasAddedPods func() bool
135 operationExecutor operationexecutor.OperationExecutor
136 mounter mount.Interface
137 hostutil hostutil.HostUtils
138 volumePluginMgr *volumepkg.VolumePluginMgr
139 skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo
140 kubeletPodsDir string
141
142 timeOfLastSyncLock sync.Mutex
143 timeOfLastSync time.Time
144 volumesFailedReconstruction []podVolume
145 volumesNeedUpdateFromNodeStatus []v1.UniqueVolumeName
146 volumesNeedReportedInUse []v1.UniqueVolumeName
147 }
148
149 func (rc *reconciler) unmountVolumes() {
150
151 for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
152 if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName, mountedVolume.SELinuxMountContext) {
153
154 klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
155 err := rc.operationExecutor.UnmountVolume(
156 mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
157 if err != nil && !isExpectedError(err) {
158 klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
159 }
160 if err == nil {
161 klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
162 }
163 }
164 }
165 }
166
167 func (rc *reconciler) mountOrAttachVolumes() {
168
169 for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
170 volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
171 volumeToMount.DevicePath = devicePath
172 if cache.IsSELinuxMountMismatchError(err) {
173
174
175
176 rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error())
177 continue
178 } else if cache.IsVolumeNotAttachedError(err) {
179 rc.waitForVolumeAttach(volumeToMount)
180 } else if !volMounted || cache.IsRemountRequiredError(err) {
181 rc.mountAttachedVolumes(volumeToMount, err)
182 } else if cache.IsFSResizeRequiredError(err) {
183 fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
184 rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
185 }
186 }
187 }
188
189 func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
190 klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
191 err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize)
192
193 if err != nil && !isExpectedError(err) {
194 klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
195 }
196
197 if err == nil {
198 klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
199 }
200 }
201
202 func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
203
204 remountingLogStr := ""
205 isRemount := cache.IsRemountRequiredError(podExistError)
206 if isRemount {
207 remountingLogStr = "Volume is already mounted to pod, but remount was requested."
208 }
209 klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
210 err := rc.operationExecutor.MountVolume(
211 rc.waitForAttachTimeout,
212 volumeToMount.VolumeToMount,
213 rc.actualStateOfWorld,
214 isRemount)
215 if err != nil && !isExpectedError(err) {
216 klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
217 }
218 if err == nil {
219 if remountingLogStr == "" {
220 klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
221 } else {
222 klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
223 }
224 }
225 }
226
227 func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
228 logger := klog.TODO()
229 if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
230
231 if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
232 klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
233 return
234 }
235
236
237 klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
238 err := rc.operationExecutor.VerifyControllerAttachedVolume(
239 logger,
240 volumeToMount.VolumeToMount,
241 rc.nodeName,
242 rc.actualStateOfWorld)
243 if err != nil && !isExpectedError(err) {
244 klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
245 }
246 if err == nil {
247 klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
248 }
249 } else {
250
251
252 volumeToAttach := operationexecutor.VolumeToAttach{
253 VolumeName: volumeToMount.VolumeName,
254 VolumeSpec: volumeToMount.VolumeSpec,
255 NodeName: rc.nodeName,
256 }
257 klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
258 err := rc.operationExecutor.AttachVolume(logger, volumeToAttach, rc.actualStateOfWorld)
259 if err != nil && !isExpectedError(err) {
260 klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
261 }
262 if err == nil {
263 klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
264 }
265 }
266 }
267
268 func (rc *reconciler) unmountDetachDevices() {
269 for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
270
271 if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName, attachedVolume.SELinuxMountContext) &&
272 !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
273 if attachedVolume.DeviceMayBeMounted() {
274
275 klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
276 err := rc.operationExecutor.UnmountDevice(
277 attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
278 if err != nil && !isExpectedError(err) {
279 klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
280 }
281 if err == nil {
282 klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
283 }
284 } else {
285
286
287 if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
288 rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
289 klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
290 } else {
291
292 klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
293 err := rc.operationExecutor.DetachVolume(
294 klog.TODO(), attachedVolume.AttachedVolume, false , rc.actualStateOfWorld)
295 if err != nil && !isExpectedError(err) {
296 klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
297 }
298 if err == nil {
299 klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
300 }
301 }
302 }
303 }
304 }
305 }
306
307
308 func isExpectedError(err error) bool {
309 return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)
310 }
311
View as plain text