...
1
16
17 package csi
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "google.golang.org/grpc"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
27 "k8s.io/kubernetes/pkg/volume"
28 volumeutil "k8s.io/kubernetes/pkg/volume/util"
29 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
30 )
31
32 var _ volume.MetricsProvider = &metricsCsi{}
33
34
35
36
37 type metricsCsi struct {
38
39 targetPath string
40
41
42 volumeID string
43
44
45 csiClientGetter
46 }
47
48
49 func NewMetricsCsi(volumeID string, targetPath string, driverName csiDriverName) volume.MetricsProvider {
50 mc := &metricsCsi{volumeID: volumeID, targetPath: targetPath}
51 mc.csiClientGetter.driverName = driverName
52 return mc
53 }
54
55 func (mc *metricsCsi) GetMetrics() (*volume.Metrics, error) {
56 startTime := time.Now()
57 defer servermetrics.CollectVolumeStatCalDuration(string(mc.csiClientGetter.driverName), startTime)
58 currentTime := metav1.Now()
59 ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
60 defer cancel()
61
62 csiClient, err := mc.csiClientGetter.Get()
63 if err != nil {
64
65
66 return nil, volumetypes.NewTransientOperationFailure(err.Error())
67 }
68
69 volumeStatsSet, err := csiClient.NodeSupportsVolumeStats(ctx)
70 if err != nil {
71 return nil, err
72 }
73
74
75 if !volumeStatsSet {
76 return nil, volume.NewNotSupportedErrorWithDriverName(
77 string(mc.csiClientGetter.driverName))
78 }
79
80 metrics, err := csiClient.NodeGetVolumeStats(ctx, mc.volumeID, mc.targetPath)
81 if err != nil {
82 return nil, err
83 }
84 if metrics == nil {
85 return nil, fmt.Errorf("csi.NodeGetVolumeStats returned nil metrics for volume %s", mc.volumeID)
86 }
87
88 metrics.Time = currentTime
89 return metrics, nil
90 }
91
92
93 type MetricsManager struct {
94 driverName string
95 }
96
97
98 func NewCSIMetricsManager(driverName string) *MetricsManager {
99 cmm := MetricsManager{
100 driverName: driverName,
101 }
102 return &cmm
103 }
104
105 type additionalInfo struct {
106 Migrated string
107 }
108 type additionalInfoKeyType struct{}
109
110 var additionalInfoKey additionalInfoKeyType
111
112
113
114 func (cmm *MetricsManager) RecordMetricsInterceptor(
115 ctx context.Context,
116 method string,
117 req, reply interface{},
118 cc *grpc.ClientConn,
119 invoker grpc.UnaryInvoker,
120 opts ...grpc.CallOption) error {
121 start := time.Now()
122 err := invoker(ctx, method, req, reply, cc, opts...)
123 duration := time.Since(start)
124
125 additionalInfoVal := ctx.Value(additionalInfoKey)
126 migrated := "false"
127 if additionalInfoVal != nil {
128 additionalInfoVal, ok := additionalInfoVal.(additionalInfo)
129 if !ok {
130 return err
131 }
132 migrated = additionalInfoVal.Migrated
133 }
134
135 volumeutil.RecordCSIOperationLatencyMetrics(cmm.driverName, method, err, duration, migrated)
136
137 return err
138 }
139
View as plain text