1
16
17 package portworx
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "strconv"
24
25 osdapi "github.com/libopenstorage/openstorage/api"
26 osdclient "github.com/libopenstorage/openstorage/api/client"
27 volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
28 osdspec "github.com/libopenstorage/openstorage/api/spec"
29 volumeapi "github.com/libopenstorage/openstorage/volume"
30 v1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/resource"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 volumehelpers "k8s.io/cloud-provider/volume/helpers"
34 "k8s.io/klog/v2"
35
36 api "k8s.io/kubernetes/pkg/apis/core"
37 "k8s.io/kubernetes/pkg/volume"
38 )
39
40 const (
41 osdMgmtDefaultPort = 9001
42 osdDriverVersion = "v1"
43 pxdDriverName = "pxd"
44 pvcClaimLabel = "pvc"
45 pvcNamespaceLabel = "namespace"
46 pxServiceName = "portworx-service"
47 pxDriverName = "pxd-sched"
48 )
49
50 type portworxVolumeUtil struct {
51 portworxClient *osdclient.Client
52 }
53
54
55 func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) {
56 driver, err := util.getPortworxDriver(p.plugin.host)
57 if err != nil || driver == nil {
58 klog.Errorf("Failed to get portworx driver. Err: %v", err)
59 return "", 0, nil, err
60 }
61
62 klog.Infof("Creating Portworx volume for PVC: %v", p.options.PVC.Name)
63
64 capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
65
66 requestGiB, err := volumehelpers.RoundUpToGiB(capacity)
67 if err != nil {
68 return "", 0, nil, err
69 }
70
71
72
73
74
75 specHandler := osdspec.NewSpecHandler()
76 spec, locator, source, _ := specHandler.SpecFromOpts(p.options.Parameters)
77 if spec == nil {
78 spec = specHandler.DefaultSpec()
79 }
80
81
82 if spec.VolumeLabels == nil {
83 spec.VolumeLabels = make(map[string]string, 0)
84 }
85
86 for k, v := range p.options.Parameters {
87 spec.VolumeLabels[k] = v
88 }
89
90
91 spec.Size = uint64(requestGiB * volumehelpers.GiB)
92
93
94 if locator == nil {
95 locator = &osdapi.VolumeLocator{
96 VolumeLabels: make(map[string]string),
97 }
98 }
99 locator.Name = p.options.PVName
100
101
102 locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
103 locator.VolumeLabels[pvcNamespaceLabel] = p.options.PVC.Namespace
104
105 for k, v := range p.options.PVC.Annotations {
106 if _, present := spec.VolumeLabels[k]; present {
107 klog.Warningf("not saving annotation: %s=%s in spec labels due to an existing key", k, v)
108 continue
109 }
110 spec.VolumeLabels[k] = v
111 }
112
113 volumeID, err := driver.Create(locator, source, spec)
114 if err != nil {
115 klog.Errorf("Error creating Portworx Volume : %v", err)
116 return "", 0, nil, err
117 }
118
119 klog.Infof("Successfully created Portworx volume for PVC: %v", p.options.PVC.Name)
120 return volumeID, requestGiB, nil, err
121 }
122
123
124 func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
125 driver, err := util.getPortworxDriver(d.plugin.host)
126 if err != nil || driver == nil {
127 klog.Errorf("Failed to get portworx driver. Err: %v", err)
128 return err
129 }
130
131 err = driver.Delete(d.volumeID)
132 if err != nil {
133 klog.Errorf("Error deleting Portworx Volume (%v): %v", d.volName, err)
134 return err
135 }
136 return nil
137 }
138
139
140 func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) {
141 driver, err := util.getLocalPortworxDriver(m.plugin.host)
142 if err != nil || driver == nil {
143 klog.Errorf("Failed to get portworx driver. Err: %v", err)
144 return "", err
145 }
146
147 devicePath, err := driver.Attach(m.volName, attachOptions)
148 if err != nil {
149 klog.Errorf("Error attaching Portworx Volume (%v): %v", m.volName, err)
150 return "", err
151 }
152 return devicePath, nil
153 }
154
155
156 func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
157 driver, err := util.getLocalPortworxDriver(u.plugin.host)
158 if err != nil || driver == nil {
159 klog.Errorf("Failed to get portworx driver. Err: %v", err)
160 return err
161 }
162
163 err = driver.Detach(u.volName, false )
164 if err != nil {
165 klog.Errorf("Error detaching Portworx Volume (%v): %v", u.volName, err)
166 return err
167 }
168 return nil
169 }
170
171
172 func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
173 driver, err := util.getLocalPortworxDriver(m.plugin.host)
174 if err != nil || driver == nil {
175 klog.Errorf("Failed to get portworx driver. Err: %v", err)
176 return err
177 }
178
179 err = driver.Mount(m.volName, mountPath)
180 if err != nil {
181 klog.Errorf("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
182 return err
183 }
184 return nil
185 }
186
187
188 func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
189 driver, err := util.getLocalPortworxDriver(u.plugin.host)
190 if err != nil || driver == nil {
191 klog.Errorf("Failed to get portworx driver. Err: %v", err)
192 return err
193 }
194
195 err = driver.Unmount(u.volName, mountPath)
196 if err != nil {
197 klog.Errorf("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
198 return err
199 }
200 return nil
201 }
202
203 func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error {
204 driver, err := util.getPortworxDriver(volumeHost)
205 if err != nil || driver == nil {
206 klog.Errorf("Failed to get portworx driver. Err: %v", err)
207 return err
208 }
209
210 vols, err := driver.Inspect([]string{spec.Name()})
211 if err != nil {
212 return err
213 }
214
215 if len(vols) != 1 {
216 return fmt.Errorf("failed to inspect Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
217 }
218
219 vol := vols[0]
220 tBytes, err := volumehelpers.RoundUpToB(newSize)
221 if err != nil {
222 return err
223 }
224 newSizeInBytes := uint64(tBytes)
225 if vol.Spec.Size >= newSizeInBytes {
226 klog.Infof("Portworx volume: %s already at size: %d greater than or equal to new "+
227 "requested size: %d. Skipping resize.", spec.Name(), vol.Spec.Size, newSizeInBytes)
228 return nil
229 }
230
231 vol.Spec.Size = newSizeInBytes
232 err = driver.Set(spec.Name(), vol.Locator, vol.Spec)
233 if err != nil {
234 return err
235 }
236
237
238 vols, err = driver.Inspect([]string{spec.Name()})
239 if err != nil {
240 return err
241 }
242
243 if len(vols) != 1 {
244 return fmt.Errorf("failed to inspect resized Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
245 }
246
247 updatedVol := vols[0]
248 if updatedVol.Spec.Size < vol.Spec.Size {
249 return fmt.Errorf("Portworx volume: %s doesn't match expected size after resize. expected:%v actual:%v",
250 spec.Name(), vol.Spec.Size, updatedVol.Spec.Size)
251 }
252
253 return nil
254 }
255
256 func isClientValid(client *osdclient.Client) (bool, error) {
257 if client == nil {
258 return false, nil
259 }
260
261 _, err := client.Versions(osdapi.OsdVolumePath)
262 if err != nil {
263 klog.Errorf("portworx client failed driver versions check. Err: %v", err)
264 return false, err
265 }
266
267 return true, nil
268 }
269
270 func createDriverClient(hostname string, port int32) (*osdclient.Client, error) {
271 client, err := volumeclient.NewDriverClient(fmt.Sprintf("http://%s", net.JoinHostPort(hostname, strconv.Itoa(int(port)))),
272 pxdDriverName, osdDriverVersion, pxDriverName)
273 if err != nil {
274 return nil, err
275 }
276
277 isValid, err := isClientValid(client)
278 if isValid {
279 return client, nil
280 }
281 return nil, err
282 }
283
284
285
286
287
288
289 func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
290
291 if isValid, _ := isClientValid(util.portworxClient); isValid {
292 return volumeclient.VolumeDriver(util.portworxClient), nil
293 }
294
295
296 var err error
297 util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort)
298 if err != nil || util.portworxClient == nil {
299
300 svc, err := getPortworxService(volumeHost)
301 if err != nil {
302 return nil, err
303 }
304
305
306 util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort)
307 if err != nil || util.portworxClient == nil {
308 klog.Errorf("Failed to connect to portworx service. Err: %v", err)
309 return nil, err
310 }
311
312 klog.Infof("Using portworx cluster service at: %v:%d as api endpoint",
313 svc.Spec.ClusterIP, osdMgmtDefaultPort)
314 } else {
315 klog.Infof("Using portworx service at: %v:%d as api endpoint",
316 volumeHost.GetHostName(), osdMgmtDefaultPort)
317 }
318
319 return volumeclient.VolumeDriver(util.portworxClient), nil
320 }
321
322
323
324
325
326
327
328 func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
329 if util.portworxClient != nil {
330
331 if isValid, _ := isClientValid(util.portworxClient); isValid {
332 return volumeclient.VolumeDriver(util.portworxClient), nil
333 }
334 }
335
336
337 svc, err := getPortworxService(volumeHost)
338 if err != nil {
339 return nil, err
340 }
341
342 osgMgmtPort := lookupPXAPIPortFromService(svc)
343 util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort)
344 if err != nil {
345 return nil, err
346 }
347
348 klog.Infof("Using portworx local service at: %v:%d as api endpoint",
349 volumeHost.GetHostName(), osgMgmtPort)
350 return volumeclient.VolumeDriver(util.portworxClient), nil
351 }
352
353
354
355 func lookupPXAPIPortFromService(svc *v1.Service) int32 {
356 for _, p := range svc.Spec.Ports {
357 if p.Port == osdMgmtDefaultPort {
358 return p.TargetPort.IntVal
359 }
360 }
361 return osdMgmtDefaultPort
362 }
363
364
365 func getPortworxService(host volume.VolumeHost) (*v1.Service, error) {
366 kubeClient := host.GetKubeClient()
367 if kubeClient == nil {
368 err := fmt.Errorf("failed to get kubeclient when creating portworx client")
369 klog.Errorf(err.Error())
370 return nil, err
371 }
372
373 opts := metav1.GetOptions{}
374 svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(context.TODO(), pxServiceName, opts)
375 if err != nil {
376 klog.Errorf("Failed to get service. Err: %v", err)
377 return nil, err
378 }
379
380 if svc == nil {
381 err = fmt.Errorf("service: %v not found. Consult Portworx docs to deploy it", pxServiceName)
382 klog.Errorf(err.Error())
383 return nil, err
384 }
385
386 return svc, nil
387 }
388
View as plain text