1
16
17 package storageversion
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23 "testing"
24 "time"
25
26 apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
27 coordinationv1 "k8s.io/api/coordination/v1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/apiserver/pkg/features"
32 utilfeature "k8s.io/apiserver/pkg/util/feature"
33 "k8s.io/client-go/informers"
34 "k8s.io/client-go/kubernetes"
35 featuregatetesting "k8s.io/component-base/featuregate/testing"
36 "k8s.io/klog/v2/ktesting"
37 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
38 "k8s.io/kubernetes/pkg/controller/storageversiongc"
39 "k8s.io/kubernetes/pkg/controlplane"
40 "k8s.io/kubernetes/test/integration/framework"
41 "k8s.io/utils/pointer"
42 )
43
44 const (
45 svName = "storageversion.integration.test.foos"
46 idA = "id-1"
47 idB = "id-2"
48 idNonExist = "id-non-exist"
49 )
50
51 func TestStorageVersionGarbageCollection(t *testing.T) {
52 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
53 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
54 result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
55 defer result.TearDownFn()
56
57 kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
58 if err != nil {
59 t.Fatalf("Unexpected error: %v", err)
60 }
61
62 informers := informers.NewSharedInformerFactory(kubeclient, time.Second)
63 leaseInformer := informers.Coordination().V1().Leases()
64 storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
65
66 _, ctx := ktesting.NewTestContext(t)
67 controller := storageversiongc.NewStorageVersionGC(ctx, kubeclient, leaseInformer, storageVersionInformer)
68
69 ctx, cancel := context.WithCancel(context.Background())
70 defer cancel()
71 go leaseInformer.Informer().Run(ctx.Done())
72 go storageVersionInformer.Informer().Run(ctx.Done())
73 go controller.Run(ctx)
74
75 createTestAPIServerIdentityLease(t, kubeclient, idA)
76 createTestAPIServerIdentityLease(t, kubeclient, idB)
77
78 t.Run("storage version with non-existing id should be GC'ed", func(t *testing.T) {
79 createTestStorageVersion(t, kubeclient, idNonExist)
80 assertStorageVersionDeleted(t, kubeclient)
81 })
82
83 t.Run("storage version with valid id should not be GC'ed", func(t *testing.T) {
84 createTestStorageVersion(t, kubeclient, idA)
85 time.Sleep(10 * time.Second)
86 sv, err := kubeclient.InternalV1alpha1().StorageVersions().Get(
87 context.TODO(), svName, metav1.GetOptions{})
88 if err != nil {
89 t.Fatalf("failed to retrieve valid storage version: %v", err)
90 }
91 if len(sv.Status.StorageVersions) != 1 {
92 t.Errorf("unexpected number of storage version entries, expected 1, got: %v",
93 sv.Status.StorageVersions)
94 }
95 expectedID := idA
96 if sv.Status.StorageVersions[0].APIServerID != expectedID {
97 t.Errorf("unexpected storage version entry id, expected %v, got: %v",
98 expectedID, sv.Status.StorageVersions[0].APIServerID)
99 }
100 assertCommonEncodingVersion(t, kubeclient, pointer.String(idToVersion(t, idA)))
101 if err := kubeclient.InternalV1alpha1().StorageVersions().Delete(
102 context.TODO(), svName, metav1.DeleteOptions{}); err != nil {
103 t.Fatalf("failed to cleanup valid storage version: %v", err)
104 }
105 })
106
107 t.Run("deleting an id should delete a storage version entry that it owns", func(t *testing.T) {
108 createTestStorageVersion(t, kubeclient, idA, idB)
109 assertStorageVersionEntries(t, kubeclient, 2, idA)
110 assertCommonEncodingVersion(t, kubeclient, nil)
111 deleteTestAPIServerIdentityLease(t, kubeclient, idA)
112 assertStorageVersionEntries(t, kubeclient, 1, idB)
113 assertCommonEncodingVersion(t, kubeclient, pointer.String(idToVersion(t, idB)))
114 })
115
116 t.Run("deleting an id should delete a storage version object that it owns entirely", func(t *testing.T) {
117 deleteTestAPIServerIdentityLease(t, kubeclient, idB)
118 assertStorageVersionDeleted(t, kubeclient)
119 })
120 }
121
122 func createTestStorageVersion(t *testing.T, client kubernetes.Interface, ids ...string) {
123 sv := &apiserverinternalv1alpha1.StorageVersion{
124 ObjectMeta: metav1.ObjectMeta{
125 Name: svName,
126 },
127 }
128 for _, id := range ids {
129 version := idToVersion(t, id)
130 v := apiserverinternalv1alpha1.ServerStorageVersion{
131 APIServerID: id,
132 EncodingVersion: version,
133 DecodableVersions: []string{version},
134 }
135 sv.Status.StorageVersions = append(sv.Status.StorageVersions, v)
136 }
137
138
139 if len(ids) == 1 {
140 sv.Status.CommonEncodingVersion = pointer.String(sv.Status.StorageVersions[0].EncodingVersion)
141 }
142
143 createdSV, err := client.InternalV1alpha1().StorageVersions().Create(context.TODO(), sv, metav1.CreateOptions{})
144 if err != nil {
145 t.Fatalf("failed to create storage version %s: %v", svName, err)
146 }
147
148 createdSV.Status = sv.Status
149 if _, err := client.InternalV1alpha1().StorageVersions().UpdateStatus(
150 context.TODO(), createdSV, metav1.UpdateOptions{}); err != nil {
151 t.Fatalf("failed to update store version status: %v", err)
152 }
153 }
154
155 func assertStorageVersionDeleted(t *testing.T, client kubernetes.Interface) {
156 if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
157 _, err := client.InternalV1alpha1().StorageVersions().Get(
158 context.TODO(), svName, metav1.GetOptions{})
159 if apierrors.IsNotFound(err) {
160 return true, nil
161 }
162 if err != nil {
163 return false, err
164 }
165 return false, nil
166 }); err != nil {
167 t.Fatalf("failed to wait for storageversion garbage collection: %v", err)
168 }
169 }
170
171 func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) {
172 lease := &coordinationv1.Lease{
173 ObjectMeta: metav1.ObjectMeta{
174 Name: name,
175 Namespace: metav1.NamespaceSystem,
176 Labels: map[string]string{
177 controlplane.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer,
178 },
179 },
180 Spec: coordinationv1.LeaseSpec{
181 HolderIdentity: pointer.String(name),
182 LeaseDurationSeconds: pointer.Int32(3600),
183
184 AcquireTime: &metav1.MicroTime{Time: time.Now()},
185 RenewTime: &metav1.MicroTime{Time: time.Now()},
186 },
187 }
188 if _, err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Create(
189 context.TODO(), lease, metav1.CreateOptions{}); err != nil {
190 t.Fatalf("failed to create apiserver identity lease %s: %v", name, err)
191 }
192 }
193
194 func deleteTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) {
195 if err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(
196 context.TODO(), name, metav1.DeleteOptions{}); err != nil {
197 t.Fatalf("failed to delete apiserver identity lease %s: %v", name, err)
198 }
199 }
200
201 func assertStorageVersionEntries(t *testing.T, client kubernetes.Interface,
202 numEntries int, firstID string) {
203 var lastErr error
204 if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
205 sv, err := client.InternalV1alpha1().StorageVersions().Get(
206 context.TODO(), svName, metav1.GetOptions{})
207 if err != nil {
208 return false, err
209 }
210 if len(sv.Status.StorageVersions) != numEntries {
211 lastErr = fmt.Errorf("unexpected number of storage version entries, expected %v, got: %v",
212 numEntries, len(sv.Status.StorageVersions))
213 return false, nil
214 }
215 if sv.Status.StorageVersions[0].APIServerID != firstID {
216 lastErr = fmt.Errorf("unexpected first storage version entry id, expected %v, got: %v",
217 firstID, sv.Status.StorageVersions[0].APIServerID)
218 return false, nil
219 }
220 return true, nil
221 }); err != nil {
222 t.Fatalf("failed to get expected storage verion entries: %v, last error: %v", err, lastErr)
223 }
224 }
225
226 func assertCommonEncodingVersion(t *testing.T, client kubernetes.Interface, e *string) {
227 sv, err := client.InternalV1alpha1().StorageVersions().Get(
228 context.TODO(), svName, metav1.GetOptions{})
229 if err != nil {
230 t.Fatalf("failed to retrieve storage version: %v", err)
231 }
232 if e == nil {
233 if sv.Status.CommonEncodingVersion != nil {
234 t.Errorf("unexpected non-nil common encoding version: %v", sv.Status.CommonEncodingVersion)
235 }
236 return
237 }
238 if sv.Status.CommonEncodingVersion == nil || *sv.Status.CommonEncodingVersion != *e {
239 t.Errorf("unexpected common encoding version, expected: %v, got %v", e, sv.Status.CommonEncodingVersion)
240 }
241 }
242
243 func idToVersion(t *testing.T, id string) string {
244
245 if !strings.HasPrefix(id, "id-") {
246 t.Fatalf("should not happen: test using id without id- prefix: %s", id)
247 }
248 return fmt.Sprintf("v%s", strings.TrimPrefix(id, "id-"))
249 }
250
View as plain text