package linkerd import ( "context" "fmt" "os" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gotest.tools/v3/poll" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/edge/linkerd" l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/ktest" "edge-infra.dev/test/f2/x/ktest/envtest" ) var f f2.Framework func TestMain(m *testing.M) { ctrl.SetLogger(fog.New()) f = f2.New(context.Background(), f2.WithExtensions( ktest.New( ktest.SkipNamespaceCreation(), ktest.WithCtrlManager(createManager), ktest.WithEnvtestOptions(envtest.WithoutCRDs()), ), ), ). Setup(func(ctx f2.Context) (f2.Context, error) { k, err := ktest.FromContext(ctx) if err != nil { return ctx, err } // delete all linkerd workloadinjections err = k.Client.DeleteAllOf(ctx, &l5dv1alpha1.LinkerdWorkloadInjection{}, &client.DeleteAllOfOptions{DeleteOptions: client.DeleteOptions{GracePeriodSeconds: new(int64)}}) if err != nil { return ctx, err } l5d := &l5dv1alpha1.Linkerd{} err = k.Client.Get(ctx, types.NamespacedName{Name: l5dv1alpha1.Name}, l5d) if err != nil { return ctx, err } // disable LinkerdWorkloadInjection/linkerd l5d.Annotations[l5dv1alpha1.WorkloadInjectionDisabledAnnotation] = "true" return ctx, k.Client.Update(ctx, l5d) }). Teardown(func(ctx f2.Context) (f2.Context, error) { k, err := ktest.FromContext(ctx) if err != nil { return ctx, err } l5d := &l5dv1alpha1.Linkerd{} err = k.Client.Get(ctx, types.NamespacedName{Name: l5dv1alpha1.Name}, l5d) if err != nil { return ctx, err } // re-enable LinkerdWorkloadInjection/linkerd delete(l5d.Annotations, l5dv1alpha1.WorkloadInjectionDisabledAnnotation) return ctx, k.Client.Update(ctx, l5d) }). WithLabel("dsds", "true") os.Exit(f.Run(m)) } func TestSpecificNamespaceWorkloadinjection(t *testing.T) { const specificNamespaceWLI = "specific-namespace" var k *ktest.K8s var beforeRestart time.Time feature := f2.NewFeature("workloadinjection"). Setup("create workloadinjection", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) beforeRestart = time.Now() require.NoError(t, k.Client.Create(ctx, &l5dv1alpha1.LinkerdWorkloadInjection{ ObjectMeta: metav1.ObjectMeta{Name: specificNamespaceWLI}, Spec: l5dv1alpha1.LinkerdWorkloadInjectionSpec{ Namespaces: []string{"corednsctl"}, Force: true, }, })) return ctx }). Test("workloadinjection completed", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) wli := l5dv1alpha1.LinkerdWorkloadInjection{} require.Eventually(t, func() bool { if err := k.Client.Get(ctx, types.NamespacedName{Name: specificNamespaceWLI}, &wli); err != nil { return false } return wli.IsCompleted() && conditions.IsReady(&wli) && wli.Status.Inventory != nil && len(wli.Status.Inventory.Entries) == 1 }, ktest.Timeout*3, ktest.Tick, "Wait for linkerd reinjection job to complete") assert.Empty(t, wli.Status.FailedInventory) return ctx }). Test("specific namespace is restarted", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) // check pods restarted k.WaitOn(t, func(_ poll.LogT) poll.Result { podList := &corev1.PodList{} require.NoError(t, k.Client.List(ctx, podList, client.InNamespace("corednsctl"))) for _, pod := range podList.Items { // ignore these pods if !pod.DeletionTimestamp.IsZero() { continue } if pod.Status.Phase == corev1.PodRunning && !pod.CreationTimestamp.After(beforeRestart) { return poll.Continue("pod still running: %s", pod.Name) } } return poll.Success() }, poll.WithDelay(ktest.Tick), poll.WithTimeout(ktest.Timeout*5)) return ctx }). Teardown("delete created workloadinjection", func(ctx f2.Context, t *testing.T) f2.Context { deleteWLI(ctx, t, specificNamespaceWLI) return ctx }). Feature() f.Test(t, feature) } func TestAllNamespacesWorkloadinjection(t *testing.T) { const allNamespacesWLI = "all-namespaces" var k *ktest.K8s var beforeRestart time.Time feature := f2.NewFeature("workloadinjection"). Setup("create workloadinjection", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) beforeRestart = time.Now() require.NoError(t, k.Client.Create(ctx, &l5dv1alpha1.LinkerdWorkloadInjection{ ObjectMeta: metav1.ObjectMeta{Name: allNamespacesWLI}, Spec: l5dv1alpha1.LinkerdWorkloadInjectionSpec{ Namespaces: []string{}, Force: true, }, })) return ctx }). Test("workloadinjection completed", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) wli := l5dv1alpha1.LinkerdWorkloadInjection{} require.Eventually(t, func() bool { if err := k.Client.Get(ctx, types.NamespacedName{Name: allNamespacesWLI}, &wli); err != nil { return false } return wli.IsCompleted() && conditions.IsReady(&wli) && wli.Status.Inventory != nil }, ktest.Timeout*5, ktest.Tick, "Wait for linkerd reinjection job to complete") assert.True(t, len(wli.Status.Inventory.Entries) > 1) assert.Empty(t, wli.Status.FailedInventory) return ctx }). Test("all namespaces are restarted", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) // check pods restarted k.WaitOn(t, func(_ poll.LogT) poll.Result { podList := &corev1.PodList{} require.NoError(t, k.Client.List(ctx, podList)) stalePods := []string{} for _, pod := range podList.Items { // ignore these pods if pod.Namespace == "fluent-operator" || pod.Spec.HostNetwork || !isLinkerdEnabled(pod) || !pod.DeletionTimestamp.IsZero() { continue } if pod.Status.Phase == corev1.PodRunning && !pod.CreationTimestamp.After(beforeRestart) { stalePods = append(stalePods, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)) } } if len(stalePods) == 0 { return poll.Success() } return poll.Continue("pods still running: %v", stalePods) }, poll.WithDelay(ktest.Tick), poll.WithTimeout(ktest.Timeout*5)) return ctx }). Teardown("delete created workloadinjection", func(ctx f2.Context, t *testing.T) f2.Context { deleteWLI(ctx, t, allNamespacesWLI) return ctx }). Feature() f.Test(t, feature) } func deleteWLI(ctx f2.Context, t *testing.T, name string) { k := ktest.FromContextT(ctx, t) require.NoError(t, k.Client.Delete(ctx, &l5dv1alpha1.LinkerdWorkloadInjection{ObjectMeta: metav1.ObjectMeta{Name: name}})) require.Eventually(t, func() bool { err := k.Client.Get(ctx, types.NamespacedName{Name: name}, &l5dv1alpha1.LinkerdWorkloadInjection{}) return apierrors.IsNotFound(err) }, ktest.Timeout*5, ktest.Tick, "wait for workloadinjection to delete") } func createManager(opts ...controller.Option) (ctrl.Manager, error) { mgrCfg, mgrOpts := controller.ProcessOptions(opts...) mgrOpts.Scheme = createScheme() return ctrl.NewManager(mgrCfg, mgrOpts) } func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(l5dv1alpha1.AddToScheme(scheme)) return scheme } func isLinkerdEnabled(pod corev1.Pod) bool { if inject, found := pod.GetAnnotations()[linkerd.InjectionAnnotation]; found && inject == "enabled" { return true } return false }