1
16
17 package testsuites
18
19 import (
20 "context"
21 "flag"
22 "strings"
23
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/util/sets"
26 clientset "k8s.io/client-go/kubernetes"
27 "k8s.io/client-go/rest"
28 "k8s.io/component-base/metrics/testutil"
29 csitrans "k8s.io/csi-translation-lib"
30 "k8s.io/kubernetes/test/e2e/framework"
31 e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
32 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
33 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
34 )
35
36 var migratedPlugins *string
37
38 func init() {
39 migratedPlugins = flag.String("storage.migratedPlugins", "", "comma separated list of in-tree plugin names of form 'kubernetes.io/{pluginName}' migrated to CSI")
40 }
41
42 type opCounts map[string]int64
43
44
45 type migrationOpCheck struct {
46 cs clientset.Interface
47 config *rest.Config
48 pluginName string
49 skipCheck bool
50
51
52 oldInTreeOps opCounts
53 oldMigratedOps opCounts
54 }
55
56
57 var BaseSuites = []func() storageframework.TestSuite{
58 InitCapacityTestSuite,
59 InitVolumesTestSuite,
60 InitVolumeIOTestSuite,
61 InitVolumeModeTestSuite,
62 InitSubPathTestSuite,
63 InitProvisioningTestSuite,
64 InitMultiVolumeTestSuite,
65 InitVolumeExpandTestSuite,
66 InitDisruptiveTestSuite,
67 InitVolumeLimitsTestSuite,
68 InitTopologyTestSuite,
69 InitVolumeStressTestSuite,
70 InitFsGroupChangePolicyTestSuite,
71 func() storageframework.TestSuite {
72 return InitCustomEphemeralTestSuite(GenericEphemeralTestPatterns())
73 },
74 }
75
76
77 var CSISuites = append(BaseSuites,
78 func() storageframework.TestSuite {
79 return InitCustomEphemeralTestSuite(CSIEphemeralTestPatterns())
80 },
81 InitSnapshottableTestSuite,
82 InitSnapshottableStressTestSuite,
83 InitVolumePerformanceTestSuite,
84 InitReadWriteOncePodTestSuite,
85 )
86
87 func getVolumeOpsFromMetricsForPlugin(ms testutil.Metrics, pluginName string) opCounts {
88 totOps := opCounts{}
89
90 for method, samples := range ms {
91 switch method {
92 case "storage_operation_status_count":
93 for _, sample := range samples {
94 plugin := string(sample.Metric["volume_plugin"])
95 if pluginName != plugin {
96 continue
97 }
98 opName := string(sample.Metric["operation_name"])
99 if opName == "verify_controller_attached_volume" {
100
101
102 continue
103 }
104 totOps[opName] = totOps[opName] + int64(sample.Value)
105 }
106 }
107 }
108 return totOps
109 }
110
111 func getVolumeOpCounts(ctx context.Context, c clientset.Interface, config *rest.Config, pluginName string) opCounts {
112 if !framework.ProviderIs("gce", "gke", "aws") {
113 return opCounts{}
114 }
115
116 nodeLimit := 25
117
118 metricsGrabber, err := e2emetrics.NewMetricsGrabber(ctx, c, nil, config, true, false, true, false, false, false)
119
120 if err != nil {
121 framework.ExpectNoError(err, "Error creating metrics grabber: %v", err)
122 }
123
124 if !metricsGrabber.HasControlPlanePods() {
125 framework.Logf("Warning: Environment does not support getting controller-manager metrics")
126 return opCounts{}
127 }
128
129 controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
130 framework.ExpectNoError(err, "Error getting c-m metrics : %v", err)
131 totOps := getVolumeOpsFromMetricsForPlugin(testutil.Metrics(controllerMetrics), pluginName)
132
133 framework.Logf("Node name not specified for getVolumeOpCounts, falling back to listing nodes from API Server")
134 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
135 framework.ExpectNoError(err, "Error listing nodes: %v", err)
136 if len(nodes.Items) <= nodeLimit {
137
138
139
140 for _, node := range nodes.Items {
141 nodeMetrics, err := metricsGrabber.GrabFromKubelet(ctx, node.GetName())
142 framework.ExpectNoError(err, "Error getting Kubelet %v metrics: %v", node.GetName(), err)
143 totOps = addOpCounts(totOps, getVolumeOpsFromMetricsForPlugin(testutil.Metrics(nodeMetrics), pluginName))
144 }
145 } else {
146 framework.Logf("Skipping operation metrics gathering from nodes in getVolumeOpCounts, greater than %v nodes", nodeLimit)
147 }
148
149 return totOps
150 }
151
152 func addOpCounts(o1 opCounts, o2 opCounts) opCounts {
153 totOps := opCounts{}
154 seen := sets.NewString()
155 for op, count := range o1 {
156 seen.Insert(op)
157 totOps[op] = totOps[op] + count + o2[op]
158 }
159 for op, count := range o2 {
160 if !seen.Has(op) {
161 totOps[op] = totOps[op] + count
162 }
163 }
164 return totOps
165 }
166
167 func getMigrationVolumeOpCounts(ctx context.Context, cs clientset.Interface, config *rest.Config, pluginName string) (opCounts, opCounts) {
168 if len(pluginName) > 0 {
169 var migratedOps opCounts
170 l := csitrans.New()
171 csiName, err := l.GetCSINameFromInTreeName(pluginName)
172 if err != nil {
173 framework.Logf("Could not find CSI Name for in-tree plugin %v", pluginName)
174 migratedOps = opCounts{}
175 } else {
176 csiName = "kubernetes.io/csi:" + csiName
177 migratedOps = getVolumeOpCounts(ctx, cs, config, csiName)
178 }
179 return getVolumeOpCounts(ctx, cs, config, pluginName), migratedOps
180 }
181
182 framework.Logf("Test running for native CSI Driver, not checking metrics")
183 return opCounts{}, opCounts{}
184 }
185
186 func newMigrationOpCheck(ctx context.Context, cs clientset.Interface, config *rest.Config, pluginName string) *migrationOpCheck {
187 moc := migrationOpCheck{
188 cs: cs,
189 config: config,
190 pluginName: pluginName,
191 }
192 if len(pluginName) == 0 {
193
194 moc.skipCheck = true
195 return &moc
196 }
197
198 if !sets.NewString(strings.Split(*migratedPlugins, ",")...).Has(pluginName) {
199
200 framework.Logf("In-tree plugin %v is not migrated, not validating any metrics", pluginName)
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 moc.skipCheck = true
216 return &moc
217 }
218
219
220
221 if framework.NodeOSDistroIs("windows") {
222 moc.skipCheck = true
223 return &moc
224 }
225
226 moc.oldInTreeOps, moc.oldMigratedOps = getMigrationVolumeOpCounts(ctx, cs, config, pluginName)
227 return &moc
228 }
229
230 func (moc *migrationOpCheck) validateMigrationVolumeOpCounts(ctx context.Context) {
231 if moc.skipCheck {
232 return
233 }
234
235 newInTreeOps, _ := getMigrationVolumeOpCounts(ctx, moc.cs, moc.config, moc.pluginName)
236
237 for op, count := range newInTreeOps {
238 if count != moc.oldInTreeOps[op] {
239 framework.Failf("In-tree plugin %v migrated to CSI Driver, however found %v %v metrics for in-tree plugin", moc.pluginName, count-moc.oldInTreeOps[op], op)
240 }
241 }
242
243
244 }
245
246
247 func skipVolTypePatterns(pattern storageframework.TestPattern, driver storageframework.TestDriver, skipVolTypes map[storageframework.TestVolType]bool) {
248 _, supportsProvisioning := driver.(storageframework.DynamicPVTestDriver)
249 if supportsProvisioning && skipVolTypes[pattern.VolType] {
250 e2eskipper.Skipf("Driver supports dynamic provisioning, skipping %s pattern", pattern.VolType)
251 }
252 }
253
View as plain text