1
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "path"
23
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/wait"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/kubernetes/test/e2e/feature"
31 "k8s.io/kubernetes/test/e2e/framework"
32 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
33 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
34 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
35 "k8s.io/kubernetes/test/e2e/storage/utils"
36 imageutils "k8s.io/kubernetes/test/utils/image"
37 admissionapi "k8s.io/pod-security-admission/api"
38
39 "github.com/onsi/ginkgo/v2"
40 )
41
42 var (
43
44 BusyBoxImage = imageutils.GetE2EImage(imageutils.BusyBox)
45 durationForStuckMount = 110 * time.Second
46 )
47
48 var _ = utils.SIGDescribe(feature.Flexvolumes, "Detaching volumes", func() {
49 f := framework.NewDefaultFramework("flexvolume")
50 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
51
52
53
54 var cs clientset.Interface
55 var ns *v1.Namespace
56 var node *v1.Node
57 var suffix string
58
59 ginkgo.BeforeEach(func(ctx context.Context) {
60 e2eskipper.SkipUnlessProviderIs("gce", "local")
61 e2eskipper.SkipUnlessMasterOSDistroIs("debian", "ubuntu", "gci", "custom")
62 e2eskipper.SkipUnlessNodeOSDistroIs("debian", "ubuntu", "gci", "custom")
63 e2eskipper.SkipUnlessSSHKeyPresent()
64
65 cs = f.ClientSet
66 ns = f.Namespace
67 var err error
68 node, err = e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
69 framework.ExpectNoError(err)
70 suffix = ns.Name
71 })
72
73 f.It("should not work when mount is in progress", f.WithSlow(), func(ctx context.Context) {
74 e2eskipper.SkipUnlessSSHKeyPresent()
75
76 driver := "attachable-with-long-mount"
77 driverInstallAs := driver + "-" + suffix
78
79 ginkgo.By(fmt.Sprintf("installing flexvolume %s on node %s as %s", path.Join(driverDir, driver), node.Name, driverInstallAs))
80 installFlex(ctx, cs, node, "k8s", driverInstallAs, path.Join(driverDir, driver))
81 ginkgo.By(fmt.Sprintf("installing flexvolume %s on master as %s", path.Join(driverDir, driver), driverInstallAs))
82 installFlex(ctx, cs, nil, "k8s", driverInstallAs, path.Join(driverDir, driver))
83 volumeSource := v1.VolumeSource{
84 FlexVolume: &v1.FlexVolumeSource{
85 Driver: "k8s/" + driverInstallAs,
86 },
87 }
88
89 clientPod := getFlexVolumePod(volumeSource, node.Name)
90 ginkgo.By("Creating pod that uses slow format volume")
91 pod, err := cs.CoreV1().Pods(ns.Name).Create(ctx, clientPod, metav1.CreateOptions{})
92 framework.ExpectNoError(err)
93
94 uniqueVolumeName := getUniqueVolumeName(pod, driverInstallAs)
95
96 ginkgo.By("waiting for volumes to be attached to node")
97 err = waitForVolumesAttached(ctx, cs, node.Name, uniqueVolumeName)
98 framework.ExpectNoError(err, "while waiting for volume to attach to %s node", node.Name)
99
100 ginkgo.By("waiting for volume-in-use on the node after pod creation")
101 err = waitForVolumesInUse(ctx, cs, node.Name, uniqueVolumeName)
102 framework.ExpectNoError(err, "while waiting for volume in use")
103
104 ginkgo.By("waiting for kubelet to start mounting the volume")
105 time.Sleep(20 * time.Second)
106
107 ginkgo.By("Deleting the flexvolume pod")
108 err = e2epod.DeletePodWithWait(ctx, cs, pod)
109 framework.ExpectNoError(err, "in deleting the pod")
110
111
112 time.Sleep(30 * time.Second)
113
114 ginkgo.By("waiting for volume-in-use on the node after pod deletion")
115 err = waitForVolumesInUse(ctx, cs, node.Name, uniqueVolumeName)
116 framework.ExpectNoError(err, "while waiting for volume in use")
117
118
119
120 time.Sleep(durationForStuckMount)
121
122 ginkgo.By("waiting for volume to disappear from node in-use")
123 err = waitForVolumesNotInUse(ctx, cs, node.Name, uniqueVolumeName)
124 framework.ExpectNoError(err, "while waiting for volume to be removed from in-use")
125
126 ginkgo.By(fmt.Sprintf("uninstalling flexvolume %s from node %s", driverInstallAs, node.Name))
127 uninstallFlex(ctx, cs, node, "k8s", driverInstallAs)
128 ginkgo.By(fmt.Sprintf("uninstalling flexvolume %s from master", driverInstallAs))
129 uninstallFlex(ctx, cs, nil, "k8s", driverInstallAs)
130 })
131 })
132
133 func getUniqueVolumeName(pod *v1.Pod, driverName string) string {
134 return fmt.Sprintf("k8s/%s/%s", driverName, pod.Spec.Volumes[0].Name)
135 }
136
137 func waitForVolumesNotInUse(ctx context.Context, client clientset.Interface, nodeName, volumeName string) error {
138 waitErr := wait.PollUntilContextTimeout(ctx, 10*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
139 node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
140 if err != nil {
141 return false, fmt.Errorf("error fetching node %s with %v", nodeName, err)
142 }
143 volumeInUse := node.Status.VolumesInUse
144 for _, volume := range volumeInUse {
145 if string(volume) == volumeName {
146 return false, nil
147 }
148 }
149 return true, nil
150 })
151 if waitErr != nil {
152 return fmt.Errorf("error waiting for volumes to not be in use: %v", waitErr)
153 }
154 return nil
155 }
156
157 func waitForVolumesAttached(ctx context.Context, client clientset.Interface, nodeName, volumeName string) error {
158 waitErr := wait.PollUntilContextTimeout(ctx, 2*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
159 node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
160 if err != nil {
161 return false, fmt.Errorf("error fetching node %s with %v", nodeName, err)
162 }
163 volumeAttached := node.Status.VolumesAttached
164 for _, volume := range volumeAttached {
165 if string(volume.Name) == volumeName {
166 return true, nil
167 }
168 }
169 return false, nil
170 })
171 if waitErr != nil {
172 return fmt.Errorf("error waiting for volume %v to attach to node %v: %v", volumeName, nodeName, waitErr)
173 }
174 return nil
175 }
176
177 func waitForVolumesInUse(ctx context.Context, client clientset.Interface, nodeName, volumeName string) error {
178 waitErr := wait.PollUntilContextTimeout(ctx, 10*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
179 node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
180 if err != nil {
181 return false, fmt.Errorf("error fetching node %s with %v", nodeName, err)
182 }
183 volumeInUse := node.Status.VolumesInUse
184 for _, volume := range volumeInUse {
185 if string(volume) == volumeName {
186 return true, nil
187 }
188 }
189 return false, nil
190 })
191 if waitErr != nil {
192 return fmt.Errorf("error waiting for volume %v to be in use on node %v: %v", volumeName, nodeName, waitErr)
193 }
194 return nil
195 }
196
197 func getFlexVolumePod(volumeSource v1.VolumeSource, nodeName string) *v1.Pod {
198 var gracePeriod int64
199 clientPod := &v1.Pod{
200 TypeMeta: metav1.TypeMeta{
201 Kind: "Pod",
202 APIVersion: "v1",
203 },
204 ObjectMeta: metav1.ObjectMeta{
205 Name: "flexvolume-detach-test" + "-client",
206 Labels: map[string]string{
207 "role": "flexvolume-detach-test" + "-client",
208 },
209 },
210 Spec: v1.PodSpec{
211 Containers: []v1.Container{
212 {
213 Name: "flexvolume-detach-test" + "-client",
214 Image: BusyBoxImage,
215 WorkingDir: "/opt",
216
217
218
219 Command: []string{
220 "/bin/sh",
221 "-c",
222 "while true ; do cat /opt/foo/index.html ; sleep 2 ; ls -altrh /opt/ ; sleep 2 ; done ",
223 },
224 VolumeMounts: []v1.VolumeMount{
225 {
226 Name: "test-long-detach-flex",
227 MountPath: "/opt/foo",
228 },
229 },
230 },
231 },
232 TerminationGracePeriodSeconds: &gracePeriod,
233 SecurityContext: &v1.PodSecurityContext{
234 SELinuxOptions: &v1.SELinuxOptions{
235 Level: "s0:c0,c1",
236 },
237 },
238 Volumes: []v1.Volume{
239 {
240 Name: "test-long-detach-flex",
241 VolumeSource: volumeSource,
242 },
243 },
244 NodeName: nodeName,
245 },
246 }
247 return clientPod
248 }
249
View as plain text