1
16
17 package util
18
19 import (
20 "fmt"
21 "strconv"
22 "time"
23
24 "google.golang.org/grpc/codes"
25 "google.golang.org/grpc/status"
26 "k8s.io/component-base/metrics"
27 "k8s.io/component-base/metrics/legacyregistry"
28 "k8s.io/kubernetes/pkg/volume"
29 "k8s.io/kubernetes/pkg/volume/util/types"
30 )
31
32 const (
33 statusSuccess = "success"
34 statusFailUnknown = "fail-unknown"
35 )
36
37
45
46 var StorageOperationMetric = metrics.NewHistogramVec(
47 &metrics.HistogramOpts{
48 Name: "storage_operation_duration_seconds",
49 Help: "Storage operation duration",
50 Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
51 StabilityLevel: metrics.ALPHA,
52 },
53 []string{"volume_plugin", "operation_name", "status", "migrated"},
54 )
55
56 var storageOperationEndToEndLatencyMetric = metrics.NewHistogramVec(
57 &metrics.HistogramOpts{
58 Name: "volume_operation_total_seconds",
59 Help: "Storage operation end to end duration in seconds",
60 Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
61 StabilityLevel: metrics.ALPHA,
62 },
63 []string{"plugin_name", "operation_name"},
64 )
65
66 var csiOperationsLatencyMetric = metrics.NewHistogramVec(
67 &metrics.HistogramOpts{
68 Subsystem: "csi",
69 Name: "operations_seconds",
70 Help: "Container Storage Interface operation duration with gRPC error code status total",
71 Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
72 StabilityLevel: metrics.ALPHA,
73 },
74 []string{"driver_name", "method_name", "grpc_status_code", "migrated"},
75 )
76
77 func init() {
78 registerMetrics()
79 }
80
81 func registerMetrics() {
82
83
84 legacyregistry.MustRegister(StorageOperationMetric)
85 legacyregistry.MustRegister(storageOperationEndToEndLatencyMetric)
86 legacyregistry.MustRegister(csiOperationsLatencyMetric)
87 }
88
89
90 func OperationCompleteHook(plugin, operationName string) func(types.CompleteFuncParam) {
91 requestTime := time.Now()
92 opComplete := func(c types.CompleteFuncParam) {
93 timeTaken := time.Since(requestTime).Seconds()
94
95 status := statusSuccess
96 if *c.Err != nil {
97
98
99 status = statusFailUnknown
100 }
101 migrated := false
102 if c.Migrated != nil {
103 migrated = *c.Migrated
104 }
105 StorageOperationMetric.WithLabelValues(plugin, operationName, status, strconv.FormatBool(migrated)).Observe(timeTaken)
106 }
107 return opComplete
108 }
109
110
111 func FSGroupCompleteHook(plugin volume.VolumePlugin, spec *volume.Spec) func(types.CompleteFuncParam) {
112 return OperationCompleteHook(GetFullQualifiedPluginNameForVolume(plugin.GetPluginName(), spec), "volume_apply_access_control")
113 }
114
115
116
117
118
119
120 func GetFullQualifiedPluginNameForVolume(pluginName string, spec *volume.Spec) string {
121 if spec != nil {
122 if spec.Volume != nil && spec.Volume.CSI != nil {
123 return fmt.Sprintf("%s:%s", pluginName, spec.Volume.CSI.Driver)
124 }
125 if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil {
126 return fmt.Sprintf("%s:%s", pluginName, spec.PersistentVolume.Spec.CSI.Driver)
127 }
128 }
129 return pluginName
130 }
131
132
133
134 func RecordOperationLatencyMetric(plugin, operationName string, secondsTaken float64) {
135 storageOperationEndToEndLatencyMetric.WithLabelValues(plugin, operationName).Observe(secondsTaken)
136 }
137
138
139
140 func RecordCSIOperationLatencyMetrics(driverName string,
141 operationName string,
142 operationErr error,
143 operationDuration time.Duration,
144 migrated string) {
145 csiOperationsLatencyMetric.WithLabelValues(driverName, operationName, getErrorCode(operationErr), migrated).Observe(operationDuration.Seconds())
146 }
147
148 func getErrorCode(err error) string {
149 if err == nil {
150 return codes.OK.String()
151 }
152
153 st, ok := status.FromError(err)
154 if !ok {
155
156
157 return "unknown-non-grpc"
158 }
159
160 return st.Code().String()
161 }
162
View as plain text