/* Copyright 2020 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package storageversion import ( "context" "fmt" "strings" "testing" "time" apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1" coordinationv1 "k8s.io/api/coordination/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/storageversiongc" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" ) const ( svName = "storageversion.integration.test.foos" idA = "id-1" idB = "id-2" idNonExist = "id-non-exist" ) func TestStorageVersionGarbageCollection(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) defer result.TearDownFn() kubeclient, err := kubernetes.NewForConfig(result.ClientConfig) if err != nil { t.Fatalf("Unexpected error: %v", err) } informers := informers.NewSharedInformerFactory(kubeclient, time.Second) leaseInformer := informers.Coordination().V1().Leases() storageVersionInformer := informers.Internal().V1alpha1().StorageVersions() _, ctx := ktesting.NewTestContext(t) controller := storageversiongc.NewStorageVersionGC(ctx, kubeclient, leaseInformer, storageVersionInformer) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go leaseInformer.Informer().Run(ctx.Done()) go storageVersionInformer.Informer().Run(ctx.Done()) go controller.Run(ctx) createTestAPIServerIdentityLease(t, kubeclient, idA) createTestAPIServerIdentityLease(t, kubeclient, idB) t.Run("storage version with non-existing id should be GC'ed", func(t *testing.T) { createTestStorageVersion(t, kubeclient, idNonExist) assertStorageVersionDeleted(t, kubeclient) }) t.Run("storage version with valid id should not be GC'ed", func(t *testing.T) { createTestStorageVersion(t, kubeclient, idA) time.Sleep(10 * time.Second) sv, err := kubeclient.InternalV1alpha1().StorageVersions().Get( context.TODO(), svName, metav1.GetOptions{}) if err != nil { t.Fatalf("failed to retrieve valid storage version: %v", err) } if len(sv.Status.StorageVersions) != 1 { t.Errorf("unexpected number of storage version entries, expected 1, got: %v", sv.Status.StorageVersions) } expectedID := idA if sv.Status.StorageVersions[0].APIServerID != expectedID { t.Errorf("unexpected storage version entry id, expected %v, got: %v", expectedID, sv.Status.StorageVersions[0].APIServerID) } assertCommonEncodingVersion(t, kubeclient, pointer.String(idToVersion(t, idA))) if err := kubeclient.InternalV1alpha1().StorageVersions().Delete( context.TODO(), svName, metav1.DeleteOptions{}); err != nil { t.Fatalf("failed to cleanup valid storage version: %v", err) } }) t.Run("deleting an id should delete a storage version entry that it owns", func(t *testing.T) { createTestStorageVersion(t, kubeclient, idA, idB) assertStorageVersionEntries(t, kubeclient, 2, idA) assertCommonEncodingVersion(t, kubeclient, nil) deleteTestAPIServerIdentityLease(t, kubeclient, idA) assertStorageVersionEntries(t, kubeclient, 1, idB) assertCommonEncodingVersion(t, kubeclient, pointer.String(idToVersion(t, idB))) }) t.Run("deleting an id should delete a storage version object that it owns entirely", func(t *testing.T) { deleteTestAPIServerIdentityLease(t, kubeclient, idB) assertStorageVersionDeleted(t, kubeclient) }) } func createTestStorageVersion(t *testing.T, client kubernetes.Interface, ids ...string) { sv := &apiserverinternalv1alpha1.StorageVersion{ ObjectMeta: metav1.ObjectMeta{ Name: svName, }, } for _, id := range ids { version := idToVersion(t, id) v := apiserverinternalv1alpha1.ServerStorageVersion{ APIServerID: id, EncodingVersion: version, DecodableVersions: []string{version}, } sv.Status.StorageVersions = append(sv.Status.StorageVersions, v) } // every id is unique and creates a different version. We know we have a common encoding // version when there is only one id. Pick it if len(ids) == 1 { sv.Status.CommonEncodingVersion = pointer.String(sv.Status.StorageVersions[0].EncodingVersion) } createdSV, err := client.InternalV1alpha1().StorageVersions().Create(context.TODO(), sv, metav1.CreateOptions{}) if err != nil { t.Fatalf("failed to create storage version %s: %v", svName, err) } // update the created sv with intended status createdSV.Status = sv.Status if _, err := client.InternalV1alpha1().StorageVersions().UpdateStatus( context.TODO(), createdSV, metav1.UpdateOptions{}); err != nil { t.Fatalf("failed to update store version status: %v", err) } } func assertStorageVersionDeleted(t *testing.T, client kubernetes.Interface) { if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { _, err := client.InternalV1alpha1().StorageVersions().Get( context.TODO(), svName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return true, nil } if err != nil { return false, err } return false, nil }); err != nil { t.Fatalf("failed to wait for storageversion garbage collection: %v", err) } } func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) { lease := &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: metav1.NamespaceSystem, Labels: map[string]string{ controlplane.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer, }, }, Spec: coordinationv1.LeaseSpec{ HolderIdentity: pointer.String(name), LeaseDurationSeconds: pointer.Int32(3600), // create fresh leases AcquireTime: &metav1.MicroTime{Time: time.Now()}, RenewTime: &metav1.MicroTime{Time: time.Now()}, }, } if _, err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Create( context.TODO(), lease, metav1.CreateOptions{}); err != nil { t.Fatalf("failed to create apiserver identity lease %s: %v", name, err) } } func deleteTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) { if err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Delete( context.TODO(), name, metav1.DeleteOptions{}); err != nil { t.Fatalf("failed to delete apiserver identity lease %s: %v", name, err) } } func assertStorageVersionEntries(t *testing.T, client kubernetes.Interface, numEntries int, firstID string) { var lastErr error if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { sv, err := client.InternalV1alpha1().StorageVersions().Get( context.TODO(), svName, metav1.GetOptions{}) if err != nil { return false, err } if len(sv.Status.StorageVersions) != numEntries { lastErr = fmt.Errorf("unexpected number of storage version entries, expected %v, got: %v", numEntries, len(sv.Status.StorageVersions)) return false, nil } if sv.Status.StorageVersions[0].APIServerID != firstID { lastErr = fmt.Errorf("unexpected first storage version entry id, expected %v, got: %v", firstID, sv.Status.StorageVersions[0].APIServerID) return false, nil } return true, nil }); err != nil { t.Fatalf("failed to get expected storage verion entries: %v, last error: %v", err, lastErr) } } func assertCommonEncodingVersion(t *testing.T, client kubernetes.Interface, e *string) { sv, err := client.InternalV1alpha1().StorageVersions().Get( context.TODO(), svName, metav1.GetOptions{}) if err != nil { t.Fatalf("failed to retrieve storage version: %v", err) } if e == nil { if sv.Status.CommonEncodingVersion != nil { t.Errorf("unexpected non-nil common encoding version: %v", sv.Status.CommonEncodingVersion) } return } if sv.Status.CommonEncodingVersion == nil || *sv.Status.CommonEncodingVersion != *e { t.Errorf("unexpected common encoding version, expected: %v, got %v", e, sv.Status.CommonEncodingVersion) } } func idToVersion(t *testing.T, id string) string { // TODO(roycaihw): rewrite the test, use a id-version table if !strings.HasPrefix(id, "id-") { t.Fatalf("should not happen: test using id without id- prefix: %s", id) } return fmt.Sprintf("v%s", strings.TrimPrefix(id, "id-")) }