1
16
17 package csi
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "net"
25 "strconv"
26 "sync"
27
28 csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/credentials/insecure"
32 "google.golang.org/grpc/status"
33 api "k8s.io/api/core/v1"
34 "k8s.io/apimachinery/pkg/api/resource"
35 utilfeature "k8s.io/apiserver/pkg/util/feature"
36 "k8s.io/klog/v2"
37 "k8s.io/kubernetes/pkg/features"
38 "k8s.io/kubernetes/pkg/volume"
39 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
40 )
41
42 type csiClient interface {
43 NodeGetInfo(ctx context.Context) (
44 nodeID string,
45 maxVolumePerNode int64,
46 accessibleTopology map[string]string,
47 err error)
48
49
50
51
52 NodePublishVolume(
53 ctx context.Context,
54 volumeid string,
55 readOnly bool,
56 stagingTargetPath string,
57 targetPath string,
58 accessMode api.PersistentVolumeAccessMode,
59 publishContext map[string]string,
60 volumeContext map[string]string,
61 secrets map[string]string,
62 fsType string,
63 mountOptions []string,
64 fsGroup *int64,
65 ) error
66
67 NodeExpandVolume(ctx context.Context, rsOpts csiResizeOptions) (resource.Quantity, error)
68 NodeUnpublishVolume(
69 ctx context.Context,
70 volID string,
71 targetPath string,
72 ) error
73
74
75
76
77 NodeStageVolume(ctx context.Context,
78 volID string,
79 publishVolumeInfo map[string]string,
80 stagingTargetPath string,
81 fsType string,
82 accessMode api.PersistentVolumeAccessMode,
83 secrets map[string]string,
84 volumeContext map[string]string,
85 mountOptions []string,
86 fsGroup *int64,
87 ) error
88
89 NodeGetVolumeStats(
90 ctx context.Context,
91 volID string,
92 targetPath string,
93 ) (*volume.Metrics, error)
94 NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
95 NodeSupportsStageUnstage(ctx context.Context) (bool, error)
96 NodeSupportsNodeExpand(ctx context.Context) (bool, error)
97 NodeSupportsVolumeStats(ctx context.Context) (bool, error)
98 NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error)
99 NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error)
100 }
101
102
103 type csiAddr string
104
105
106 type csiDriverName string
107
108
109 type csiDriverClient struct {
110 driverName csiDriverName
111 addr csiAddr
112 metricsManager *MetricsManager
113 nodeV1ClientCreator nodeV1ClientCreator
114 }
115
116 type csiResizeOptions struct {
117 volumeID string
118 volumePath string
119 stagingTargetPath string
120 fsType string
121 accessMode api.PersistentVolumeAccessMode
122 newSize resource.Quantity
123 mountOptions []string
124 secrets map[string]string
125 }
126
127 var _ csiClient = &csiDriverClient{}
128
129 type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) (
130 nodeClient csipbv1.NodeClient,
131 closer io.Closer,
132 err error,
133 )
134
135 type nodeV1AccessModeMapper func(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode
136
137
138
139
140
141
142 func newV1NodeClient(addr csiAddr, metricsManager *MetricsManager) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
143 var conn *grpc.ClientConn
144 conn, err = newGrpcConn(addr, metricsManager)
145 if err != nil {
146 return nil, nil, err
147 }
148
149 nodeClient = csipbv1.NewNodeClient(conn)
150 return nodeClient, conn, nil
151 }
152
153 func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
154 if driverName == "" {
155 return nil, fmt.Errorf("driver name is empty")
156 }
157
158 existingDriver, driverExists := csiDrivers.Get(string(driverName))
159 if !driverExists {
160 return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
161 }
162
163 nodeV1ClientCreator := newV1NodeClient
164 return &csiDriverClient{
165 driverName: driverName,
166 addr: csiAddr(existingDriver.endpoint),
167 nodeV1ClientCreator: nodeV1ClientCreator,
168 metricsManager: NewCSIMetricsManager(string(driverName)),
169 }, nil
170 }
171
172 func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
173 nodeID string,
174 maxVolumePerNode int64,
175 accessibleTopology map[string]string,
176 err error) {
177 klog.V(4).InfoS(log("calling NodeGetInfo rpc"))
178
179 var getNodeInfoError error
180 nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
181 if getNodeInfoError != nil {
182 klog.InfoS("Error calling CSI NodeGetInfo()", "err", getNodeInfoError.Error())
183 }
184 return nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError
185 }
186
187 func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
188 nodeID string,
189 maxVolumePerNode int64,
190 accessibleTopology map[string]string,
191 err error) {
192
193 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
194 if err != nil {
195 return "", 0, nil, err
196 }
197 defer closer.Close()
198
199 res, err := nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
200 if err != nil {
201 return "", 0, nil, err
202 }
203
204 topology := res.GetAccessibleTopology()
205 if topology != nil {
206 accessibleTopology = topology.Segments
207 }
208 return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
209 }
210
211 func (c *csiDriverClient) NodePublishVolume(
212 ctx context.Context,
213 volID string,
214 readOnly bool,
215 stagingTargetPath string,
216 targetPath string,
217 accessMode api.PersistentVolumeAccessMode,
218 publishContext map[string]string,
219 volumeContext map[string]string,
220 secrets map[string]string,
221 fsType string,
222 mountOptions []string,
223 fsGroup *int64,
224 ) error {
225 klog.V(4).InfoS(log("calling NodePublishVolume rpc"), "volID", volID, "targetPath", targetPath)
226 if volID == "" {
227 return errors.New("missing volume id")
228 }
229 if targetPath == "" {
230 return errors.New("missing target path")
231 }
232
233 if c.nodeV1ClientCreator == nil {
234 return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
235 }
236
237 accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
238 if err != nil {
239 return err
240 }
241
242 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
243 if err != nil {
244 return err
245 }
246 defer closer.Close()
247
248 req := &csipbv1.NodePublishVolumeRequest{
249 VolumeId: volID,
250 TargetPath: targetPath,
251 Readonly: readOnly,
252 PublishContext: publishContext,
253 VolumeContext: volumeContext,
254 Secrets: secrets,
255 VolumeCapability: &csipbv1.VolumeCapability{
256 AccessMode: &csipbv1.VolumeCapability_AccessMode{
257 Mode: accessModeMapper(accessMode),
258 },
259 },
260 }
261 if stagingTargetPath != "" {
262 req.StagingTargetPath = stagingTargetPath
263 }
264
265 if fsType == fsTypeBlockName {
266 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
267 Block: &csipbv1.VolumeCapability_BlockVolume{},
268 }
269 } else {
270 mountVolume := &csipbv1.VolumeCapability_MountVolume{
271 FsType: fsType,
272 MountFlags: mountOptions,
273 }
274 if fsGroup != nil {
275 mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 )
276 }
277 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
278 Mount: mountVolume,
279 }
280 }
281
282 _, err = nodeClient.NodePublishVolume(ctx, req)
283 if err != nil && !isFinalError(err) {
284 return volumetypes.NewUncertainProgressError(err.Error())
285 }
286 return err
287 }
288
289 func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
290 if c.nodeV1ClientCreator == nil {
291 return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
292 }
293
294 if opts.volumeID == "" {
295 return opts.newSize, errors.New("missing volume id")
296 }
297 if opts.volumePath == "" {
298 return opts.newSize, errors.New("missing volume path")
299 }
300
301 if opts.newSize.Value() < 0 {
302 return opts.newSize, errors.New("size can not be less than 0")
303 }
304
305 accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
306 if err != nil {
307 return opts.newSize, err
308 }
309
310 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
311 if err != nil {
312 return opts.newSize, err
313 }
314 defer closer.Close()
315
316 req := &csipbv1.NodeExpandVolumeRequest{
317 VolumeId: opts.volumeID,
318 VolumePath: opts.volumePath,
319 CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
320 VolumeCapability: &csipbv1.VolumeCapability{
321 AccessMode: &csipbv1.VolumeCapability_AccessMode{
322 Mode: accessModeMapper(opts.accessMode),
323 },
324 },
325 Secrets: opts.secrets,
326 }
327
328
329
330 if opts.stagingTargetPath != "" {
331 req.StagingTargetPath = opts.stagingTargetPath
332 }
333
334 if opts.fsType == fsTypeBlockName {
335 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
336 Block: &csipbv1.VolumeCapability_BlockVolume{},
337 }
338 } else {
339 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
340 Mount: &csipbv1.VolumeCapability_MountVolume{
341 FsType: opts.fsType,
342 MountFlags: opts.mountOptions,
343 },
344 }
345 }
346
347 resp, err := nodeClient.NodeExpandVolume(ctx, req)
348 if err != nil {
349 if !isFinalError(err) {
350 return opts.newSize, volumetypes.NewUncertainProgressError(err.Error())
351 }
352 return opts.newSize, err
353 }
354
355 updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
356 return *updatedQuantity, nil
357 }
358
359 func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
360 klog.V(4).InfoS(log("calling NodeUnpublishVolume rpc"), "volID", volID, "targetPath", targetPath)
361 if volID == "" {
362 return errors.New("missing volume id")
363 }
364 if targetPath == "" {
365 return errors.New("missing target path")
366 }
367 if c.nodeV1ClientCreator == nil {
368 return errors.New("nodeV1ClientCreate is nil")
369 }
370
371 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
372 if err != nil {
373 return err
374 }
375 defer closer.Close()
376
377 req := &csipbv1.NodeUnpublishVolumeRequest{
378 VolumeId: volID,
379 TargetPath: targetPath,
380 }
381
382 _, err = nodeClient.NodeUnpublishVolume(ctx, req)
383 return err
384 }
385
386 func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
387 volID string,
388 publishContext map[string]string,
389 stagingTargetPath string,
390 fsType string,
391 accessMode api.PersistentVolumeAccessMode,
392 secrets map[string]string,
393 volumeContext map[string]string,
394 mountOptions []string,
395 fsGroup *int64,
396 ) error {
397 klog.V(4).InfoS(log("calling NodeStageVolume rpc"), "volID", volID, "stagingTargetPath", stagingTargetPath)
398 if volID == "" {
399 return errors.New("missing volume id")
400 }
401 if stagingTargetPath == "" {
402 return errors.New("missing staging target path")
403 }
404 if c.nodeV1ClientCreator == nil {
405 return errors.New("nodeV1ClientCreate is nil")
406 }
407
408 accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
409 if err != nil {
410 return err
411 }
412
413 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
414 if err != nil {
415 return err
416 }
417 defer closer.Close()
418
419 req := &csipbv1.NodeStageVolumeRequest{
420 VolumeId: volID,
421 PublishContext: publishContext,
422 StagingTargetPath: stagingTargetPath,
423 VolumeCapability: &csipbv1.VolumeCapability{
424 AccessMode: &csipbv1.VolumeCapability_AccessMode{
425 Mode: accessModeMapper(accessMode),
426 },
427 },
428 Secrets: secrets,
429 VolumeContext: volumeContext,
430 }
431
432 if fsType == fsTypeBlockName {
433 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
434 Block: &csipbv1.VolumeCapability_BlockVolume{},
435 }
436 } else {
437 mountVolume := &csipbv1.VolumeCapability_MountVolume{
438 FsType: fsType,
439 MountFlags: mountOptions,
440 }
441 if fsGroup != nil {
442 mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 )
443 }
444 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
445 Mount: mountVolume,
446 }
447 }
448
449 _, err = nodeClient.NodeStageVolume(ctx, req)
450 if err != nil && !isFinalError(err) {
451 return volumetypes.NewUncertainProgressError(err.Error())
452 }
453 return err
454 }
455
456 func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
457 klog.V(4).InfoS(log("calling NodeUnstageVolume rpc"), "volID", volID, "stagingTargetPath", stagingTargetPath)
458 if volID == "" {
459 return errors.New("missing volume id")
460 }
461 if stagingTargetPath == "" {
462 return errors.New("missing staging target path")
463 }
464 if c.nodeV1ClientCreator == nil {
465 return errors.New("nodeV1ClientCreate is nil")
466 }
467
468 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
469 if err != nil {
470 return err
471 }
472 defer closer.Close()
473
474 req := &csipbv1.NodeUnstageVolumeRequest{
475 VolumeId: volID,
476 StagingTargetPath: stagingTargetPath,
477 }
478 _, err = nodeClient.NodeUnstageVolume(ctx, req)
479 return err
480 }
481
482 func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
483 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME)
484 }
485
486 func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
487 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
488 }
489
490 func (c *csiDriverClient) getNodeV1AccessModeMapper(ctx context.Context) (nodeV1AccessModeMapper, error) {
491 supported, err := c.NodeSupportsSingleNodeMultiWriterAccessMode(ctx)
492 if err != nil {
493 return nil, err
494 }
495 if supported {
496 return asSingleNodeMultiWriterCapableCSIAccessModeV1, nil
497 }
498 return asCSIAccessModeV1, nil
499 }
500
501 func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
502 switch am {
503 case api.ReadWriteOnce:
504 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
505 case api.ReadOnlyMany:
506 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
507 case api.ReadWriteMany:
508 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
509
510
511
512 case api.ReadWriteOncePod:
513 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
514 }
515 return csipbv1.VolumeCapability_AccessMode_UNKNOWN
516 }
517
518 func asSingleNodeMultiWriterCapableCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
519 switch am {
520 case api.ReadWriteOnce:
521 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER
522 case api.ReadOnlyMany:
523 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
524 case api.ReadWriteMany:
525 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
526 case api.ReadWriteOncePod:
527 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER
528 }
529 return csipbv1.VolumeCapability_AccessMode_UNKNOWN
530 }
531
532 func newGrpcConn(addr csiAddr, metricsManager *MetricsManager) (*grpc.ClientConn, error) {
533 network := "unix"
534 klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", addr)
535
536 return grpc.Dial(
537 string(addr),
538 grpc.WithAuthority("localhost"),
539 grpc.WithTransportCredentials(insecure.NewCredentials()),
540 grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
541 return (&net.Dialer{}).DialContext(ctx, network, target)
542 }),
543 grpc.WithChainUnaryInterceptor(metricsManager.RecordMetricsInterceptor),
544 )
545 }
546
547
548
549
550
551
552
553 type csiClientGetter struct {
554 sync.RWMutex
555 csiClient csiClient
556 driverName csiDriverName
557 }
558
559 func (c *csiClientGetter) Get() (csiClient, error) {
560 c.RLock()
561 if c.csiClient != nil {
562 c.RUnlock()
563 return c.csiClient, nil
564 }
565 c.RUnlock()
566 c.Lock()
567 defer c.Unlock()
568
569 if c.csiClient != nil {
570 return c.csiClient, nil
571 }
572 csi, err := newCsiDriverClient(c.driverName)
573 if err != nil {
574 return nil, err
575 }
576 c.csiClient = csi
577 return c.csiClient, nil
578 }
579
580 func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
581 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
582 }
583
584 func (c *csiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
585 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
586 }
587
588 func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
589 klog.V(4).InfoS(log("calling NodeGetVolumeStats rpc"), "volID", volID, "targetPath", targetPath)
590 if volID == "" {
591 return nil, errors.New("missing volume id")
592 }
593 if targetPath == "" {
594 return nil, errors.New("missing target path")
595 }
596 if c.nodeV1ClientCreator == nil {
597 return nil, errors.New("nodeV1ClientCreate is nil")
598 }
599
600 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
601 if err != nil {
602 return nil, err
603 }
604 defer closer.Close()
605
606 req := &csipbv1.NodeGetVolumeStatsRequest{
607 VolumeId: volID,
608 VolumePath: targetPath,
609 }
610
611 resp, err := nodeClient.NodeGetVolumeStats(ctx, req)
612 if err != nil {
613 return nil, err
614 }
615 usages := resp.GetUsage()
616 if usages == nil {
617 return nil, fmt.Errorf("failed to get usage from response. usage is nil")
618 }
619 metrics := &volume.Metrics{
620 Used: resource.NewQuantity(int64(0), resource.BinarySI),
621 Capacity: resource.NewQuantity(int64(0), resource.BinarySI),
622 Available: resource.NewQuantity(int64(0), resource.BinarySI),
623 InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI),
624 Inodes: resource.NewQuantity(int64(0), resource.BinarySI),
625 InodesFree: resource.NewQuantity(int64(0), resource.BinarySI),
626 }
627
628 if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
629 isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx)
630 if err != nil {
631 return nil, err
632 }
633
634 if isSupportNodeVolumeCondition {
635 abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage()
636 metrics.Abnormal, metrics.Message = &abnormal, &message
637 }
638 }
639
640 for _, usage := range usages {
641 if usage == nil {
642 continue
643 }
644 unit := usage.GetUnit()
645 switch unit {
646 case csipbv1.VolumeUsage_BYTES:
647 metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
648 metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
649 metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
650 case csipbv1.VolumeUsage_INODES:
651 metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
652 metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
653 metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
654 default:
655 klog.ErrorS(nil, "unknown unit in VolumeUsage", "unit", unit.String())
656 }
657
658 }
659 return metrics, nil
660 }
661
662 func (c *csiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) {
663 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
664 }
665
666 func (c *csiDriverClient) NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error) {
667 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP)
668 }
669
670 func (c *csiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
671 klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if the node service has %s capability", capabilityType))
672 capabilities, err := c.nodeGetCapabilities(ctx)
673 if err != nil {
674 return false, err
675 }
676
677 for _, capability := range capabilities {
678 if capability == nil || capability.GetRpc() == nil {
679 continue
680 }
681 if capability.GetRpc().GetType() == capabilityType {
682 return true, nil
683 }
684 }
685 return false, nil
686 }
687
688 func (c *csiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) {
689 if c.nodeV1ClientCreator == nil {
690 return []*csipbv1.NodeServiceCapability{}, errors.New("nodeV1ClientCreate is nil")
691 }
692
693 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
694 if err != nil {
695 return []*csipbv1.NodeServiceCapability{}, err
696 }
697 defer closer.Close()
698
699 req := &csipbv1.NodeGetCapabilitiesRequest{}
700 resp, err := nodeClient.NodeGetCapabilities(ctx, req)
701 if err != nil {
702 return []*csipbv1.NodeServiceCapability{}, err
703 }
704 return resp.GetCapabilities(), nil
705 }
706
707 func isFinalError(err error) bool {
708
709
710
711 st, ok := status.FromError(err)
712 if !ok {
713
714
715
716 return false
717 }
718 switch st.Code() {
719 case codes.Canceled,
720 codes.DeadlineExceeded,
721 codes.Unavailable,
722 codes.ResourceExhausted,
723 codes.Aborted:
724 return false
725 }
726
727
728 return true
729 }
730
View as plain text