1
16
17 package portworx
18
19 import (
20 "fmt"
21 "net"
22 "os"
23 "strconv"
24
25 "k8s.io/klog/v2"
26 "k8s.io/mount-utils"
27 utilstrings "k8s.io/utils/strings"
28
29 volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
30 v1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/resource"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/types"
34 utilfeature "k8s.io/apiserver/pkg/util/feature"
35 "k8s.io/kubernetes/pkg/features"
36 "k8s.io/kubernetes/pkg/volume"
37 "k8s.io/kubernetes/pkg/volume/util"
38 )
39
40 const (
41 attachContextKey = "context"
42 attachHostKey = "host"
43 )
44
45
46 func ProbeVolumePlugins() []volume.VolumePlugin {
47 return []volume.VolumePlugin{&portworxVolumePlugin{nil, nil}}
48 }
49
50 type portworxVolumePlugin struct {
51 host volume.VolumeHost
52 util *portworxVolumeUtil
53 }
54
55 var _ volume.VolumePlugin = &portworxVolumePlugin{}
56 var _ volume.PersistentVolumePlugin = &portworxVolumePlugin{}
57 var _ volume.DeletableVolumePlugin = &portworxVolumePlugin{}
58 var _ volume.ProvisionableVolumePlugin = &portworxVolumePlugin{}
59 var _ volume.ExpandableVolumePlugin = &portworxVolumePlugin{}
60
61 const (
62 portworxVolumePluginName = "kubernetes.io/portworx-volume"
63 )
64
65 func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
66 return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(portworxVolumePluginName), volName)
67 }
68
69 func (plugin *portworxVolumePlugin) IsMigratedToCSI() bool {
70 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx)
71 }
72
73 func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
74 client, err := volumeclient.NewDriverClient(
75 fmt.Sprintf("http://%s", net.JoinHostPort(host.GetHostName(), strconv.Itoa(osdMgmtDefaultPort))),
76 pxdDriverName, osdDriverVersion, pxDriverName)
77 if err != nil {
78 return err
79 }
80
81 plugin.host = host
82 plugin.util = &portworxVolumeUtil{
83 portworxClient: client,
84 }
85
86 return nil
87 }
88
89 func (plugin *portworxVolumePlugin) GetPluginName() string {
90 return portworxVolumePluginName
91 }
92
93 func (plugin *portworxVolumePlugin) GetVolumeName(spec *volume.Spec) (string, error) {
94 volumeSource, _, err := getVolumeSource(spec)
95 if err != nil {
96 return "", err
97 }
98
99 return volumeSource.VolumeID, nil
100 }
101
102 func (plugin *portworxVolumePlugin) CanSupport(spec *volume.Spec) bool {
103 return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume != nil) ||
104 (spec.Volume != nil && spec.Volume.PortworxVolume != nil)
105 }
106
107 func (plugin *portworxVolumePlugin) RequiresRemount(spec *volume.Spec) bool {
108 return false
109 }
110
111 func (plugin *portworxVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
112 return []v1.PersistentVolumeAccessMode{
113 v1.ReadWriteOnce,
114 v1.ReadWriteMany,
115 }
116 }
117
118 func (plugin *portworxVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
119 return plugin.newMounterInternal(spec, pod.UID, plugin.util, plugin.host.GetMounter(plugin.GetPluginName()))
120 }
121
122 func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Mounter, error) {
123 pwx, readOnly, err := getVolumeSource(spec)
124 if err != nil {
125 return nil, err
126 }
127
128 volumeID := pwx.VolumeID
129 fsType := pwx.FSType
130
131 return &portworxVolumeMounter{
132 portworxVolume: &portworxVolume{
133 podUID: podUID,
134 volName: spec.Name(),
135 volumeID: volumeID,
136 manager: manager,
137 mounter: mounter,
138 plugin: plugin,
139 MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
140 },
141 fsType: fsType,
142 readOnly: readOnly,
143 diskMounter: util.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host)}, nil
144 }
145
146 func (plugin *portworxVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
147 return plugin.newUnmounterInternal(volName, podUID, plugin.util, plugin.host.GetMounter(plugin.GetPluginName()))
148 }
149
150 func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager,
151 mounter mount.Interface) (volume.Unmounter, error) {
152 return &portworxVolumeUnmounter{
153 &portworxVolume{
154 podUID: podUID,
155 volName: volName,
156 manager: manager,
157 mounter: mounter,
158 plugin: plugin,
159 MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
160 }}, nil
161 }
162
163 func (plugin *portworxVolumePlugin) NewDeleter(logger klog.Logger, spec *volume.Spec) (volume.Deleter, error) {
164 return plugin.newDeleterInternal(spec, plugin.util)
165 }
166
167 func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manager portworxManager) (volume.Deleter, error) {
168 if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume == nil {
169 return nil, fmt.Errorf("spec.PersistentVolumeSource.PortworxVolume is nil")
170 }
171
172 return &portworxVolumeDeleter{
173 portworxVolume: &portworxVolume{
174 volName: spec.Name(),
175 volumeID: spec.PersistentVolume.Spec.PortworxVolume.VolumeID,
176 manager: manager,
177 plugin: plugin,
178 }}, nil
179 }
180
181 func (plugin *portworxVolumePlugin) NewProvisioner(logger klog.Logger, options volume.VolumeOptions) (volume.Provisioner, error) {
182 return plugin.newProvisionerInternal(options, plugin.util)
183 }
184
185 func (plugin *portworxVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager portworxManager) (volume.Provisioner, error) {
186 return &portworxVolumeProvisioner{
187 portworxVolume: &portworxVolume{
188 manager: manager,
189 plugin: plugin,
190 },
191 options: options,
192 }, nil
193 }
194
195 func (plugin *portworxVolumePlugin) RequiresFSResize() bool {
196 return false
197 }
198
199 func (plugin *portworxVolumePlugin) ExpandVolumeDevice(
200 spec *volume.Spec,
201 newSize resource.Quantity,
202 oldSize resource.Quantity) (resource.Quantity, error) {
203 klog.V(4).Infof("Expanding: %s from %v to %v", spec.Name(), oldSize, newSize)
204 err := plugin.util.ResizeVolume(spec, newSize, plugin.host)
205 if err != nil {
206 return oldSize, err
207 }
208
209 klog.V(4).Infof("Successfully resized %s to %v", spec.Name(), newSize)
210 return newSize, nil
211 }
212
213 func (plugin *portworxVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
214 portworxVolume := &v1.Volume{
215 Name: volumeName,
216 VolumeSource: v1.VolumeSource{
217 PortworxVolume: &v1.PortworxVolumeSource{
218 VolumeID: volumeName,
219 },
220 },
221 }
222 return volume.ReconstructedVolume{
223 Spec: volume.NewSpecFromVolume(portworxVolume),
224 }, nil
225 }
226
227 func (plugin *portworxVolumePlugin) SupportsMountOption() bool {
228 return false
229 }
230
231 func (plugin *portworxVolumePlugin) SupportsBulkVolumeVerification() bool {
232 return false
233 }
234
235 func (plugin *portworxVolumePlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
236 return false, nil
237 }
238
239 func getVolumeSource(
240 spec *volume.Spec) (*v1.PortworxVolumeSource, bool, error) {
241 if spec.Volume != nil && spec.Volume.PortworxVolume != nil {
242 return spec.Volume.PortworxVolume, spec.Volume.PortworxVolume.ReadOnly, nil
243 } else if spec.PersistentVolume != nil &&
244 spec.PersistentVolume.Spec.PortworxVolume != nil {
245 return spec.PersistentVolume.Spec.PortworxVolume, spec.ReadOnly, nil
246 }
247
248 return nil, false, fmt.Errorf("Spec does not reference a Portworx Volume type")
249 }
250
251
252 type portworxManager interface {
253
254 CreateVolume(provisioner *portworxVolumeProvisioner) (volumeID string, volumeSizeGB int64, labels map[string]string, err error)
255
256 DeleteVolume(deleter *portworxVolumeDeleter) error
257
258 AttachVolume(mounter *portworxVolumeMounter, attachOptions map[string]string) (string, error)
259
260 DetachVolume(unmounter *portworxVolumeUnmounter) error
261
262 MountVolume(mounter *portworxVolumeMounter, mountDir string) error
263
264 UnmountVolume(unmounter *portworxVolumeUnmounter, mountDir string) error
265
266 ResizeVolume(spec *volume.Spec, newSize resource.Quantity, host volume.VolumeHost) error
267 }
268
269
270
271 type portworxVolume struct {
272 volName string
273 podUID types.UID
274
275 volumeID string
276
277 manager portworxManager
278
279 mounter mount.Interface
280 plugin *portworxVolumePlugin
281 volume.MetricsProvider
282 }
283
284 type portworxVolumeMounter struct {
285 *portworxVolume
286
287 fsType string
288
289 readOnly bool
290
291 diskMounter *mount.SafeFormatAndMount
292 }
293
294 var _ volume.Mounter = &portworxVolumeMounter{}
295
296 func (b *portworxVolumeMounter) GetAttributes() volume.Attributes {
297 return volume.Attributes{
298 ReadOnly: b.readOnly,
299 Managed: !b.readOnly,
300 SELinuxRelabel: false,
301 }
302 }
303
304
305 func (b *portworxVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
306 return b.SetUpAt(b.GetPath(), mounterArgs)
307 }
308
309
310 func (b *portworxVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
311 notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
312 klog.Infof("Portworx Volume set up. Dir: %s %v %v", dir, !notMnt, err)
313 if err != nil && !os.IsNotExist(err) {
314 klog.Errorf("Cannot validate mountpoint: %s", dir)
315 return err
316 }
317 if !notMnt {
318 return nil
319 }
320
321 attachOptions := make(map[string]string)
322 attachOptions[attachContextKey] = dir
323 attachOptions[attachHostKey] = b.plugin.host.GetHostName()
324 if _, err := b.manager.AttachVolume(b, attachOptions); err != nil {
325 return err
326 }
327
328 klog.V(4).Infof("Portworx Volume %s attached", b.volumeID)
329
330 if err := os.MkdirAll(dir, 0750); err != nil {
331 return err
332 }
333
334 if err := b.manager.MountVolume(b, dir); err != nil {
335 return err
336 }
337 if !b.readOnly {
338 volume.SetVolumeOwnership(b, dir, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil))
339 }
340 klog.Infof("Portworx Volume %s setup at %s", b.volumeID, dir)
341 return nil
342 }
343
344 func (pwx *portworxVolume) GetPath() string {
345 return getPath(pwx.podUID, pwx.volName, pwx.plugin.host)
346 }
347
348 type portworxVolumeUnmounter struct {
349 *portworxVolume
350 }
351
352 var _ volume.Unmounter = &portworxVolumeUnmounter{}
353
354
355
356 func (c *portworxVolumeUnmounter) TearDown() error {
357 return c.TearDownAt(c.GetPath())
358 }
359
360
361
362 func (c *portworxVolumeUnmounter) TearDownAt(dir string) error {
363 klog.Infof("Portworx Volume TearDown of %s", dir)
364
365 if err := c.manager.UnmountVolume(c, dir); err != nil {
366 return err
367 }
368
369
370 if err := c.manager.DetachVolume(c); err != nil {
371 return err
372 }
373
374 return nil
375 }
376
377 type portworxVolumeDeleter struct {
378 *portworxVolume
379 }
380
381 var _ volume.Deleter = &portworxVolumeDeleter{}
382
383 func (d *portworxVolumeDeleter) GetPath() string {
384 return getPath(d.podUID, d.volName, d.plugin.host)
385 }
386
387 func (d *portworxVolumeDeleter) Delete() error {
388 return d.manager.DeleteVolume(d)
389 }
390
391 type portworxVolumeProvisioner struct {
392 *portworxVolume
393 options volume.VolumeOptions
394 }
395
396 var _ volume.Provisioner = &portworxVolumeProvisioner{}
397
398 func (c *portworxVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
399 if !util.ContainsAllAccessModes(c.plugin.GetAccessModes(), c.options.PVC.Spec.AccessModes) {
400 return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", c.options.PVC.Spec.AccessModes, c.plugin.GetAccessModes())
401 }
402
403 if util.CheckPersistentVolumeClaimModeBlock(c.options.PVC) {
404 return nil, fmt.Errorf("%s does not support block volume provisioning", c.plugin.GetPluginName())
405 }
406
407 volumeID, sizeGiB, labels, err := c.manager.CreateVolume(c)
408 if err != nil {
409 return nil, err
410 }
411
412 pv := &v1.PersistentVolume{
413 ObjectMeta: metav1.ObjectMeta{
414 Name: c.options.PVName,
415 Labels: map[string]string{},
416 Annotations: map[string]string{
417 util.VolumeDynamicallyCreatedByKey: "portworx-volume-dynamic-provisioner",
418 },
419 },
420 Spec: v1.PersistentVolumeSpec{
421 PersistentVolumeReclaimPolicy: c.options.PersistentVolumeReclaimPolicy,
422 AccessModes: c.options.PVC.Spec.AccessModes,
423 Capacity: v1.ResourceList{
424 v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGiB)),
425 },
426 PersistentVolumeSource: v1.PersistentVolumeSource{
427 PortworxVolume: &v1.PortworxVolumeSource{
428 VolumeID: volumeID,
429 },
430 },
431 },
432 }
433
434 if len(labels) != 0 {
435 if pv.Labels == nil {
436 pv.Labels = make(map[string]string)
437 }
438 for k, v := range labels {
439 pv.Labels[k] = v
440 }
441 }
442
443 if len(c.options.PVC.Spec.AccessModes) == 0 {
444 pv.Spec.AccessModes = c.plugin.GetAccessModes()
445 }
446
447 return pv, nil
448 }
449
View as plain text