package syncedobject import ( "context" crypto "crypto/rand" "encoding/base64" "encoding/json" "fmt" "math/rand" "os" "testing" "time" "cloud.google.com/go/pubsub" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "google.golang.org/api/option" "gopkg.in/yaml.v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" soapi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1" "edge-infra.dev/pkg/edge/chariot" chariotClientApi "edge-infra.dev/pkg/edge/chariot/client" "edge-infra.dev/pkg/edge/controllers/syncedobject" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/lib/promassert" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/ktest" "edge-infra.dev/test/f2/x/pstest" ) const DefaultSecretValue = "mock secret value" var testConfig = &syncedobject.Config{ PublishTopic: "chariot-rides", GCPProjectID: "test-foreman0", MetricsAddr: "127.0.0.1:8080", } func init() { addr := fmt.Sprintf("http://%s/metrics", testConfig.MetricsAddr) promassert.ScrapePrometheusMetrics = promassert.ScrapeHTTP(addr) } var f f2.Framework const timeout = 10 * time.Second const tick = 50 * time.Millisecond func TestMain(m *testing.M) { var ps = pstest.New( pstest.WithProjectID(testConfig.GCPProjectID), ) var k = ktest.New(ktest.WithCtrlManager(testConfig.CreateMgr), ktest.WithKonfigKonnector()) f = f2.New(context.Background(), f2.WithExtensions(k, ps)).Component("syncedobjectctl") os.Exit(f.Run(m)) } func TestSyncedObjectReconciliation(t *testing.T) { var feat = f2.NewFeature(t.Name()). Setup("topic", setupTopic). Test("reconciles", func(ctx f2.Context, t *testing.T) f2.Context { var so = randomSyncedObject() so.Spec.Directory = nil createSyncedObject(ctx, t, so) return ctx }). Test("reconciles with dir", func(ctx f2.Context, t *testing.T) f2.Context { var so = randomSyncedObject() var dir = "myDir" so.Spec.Directory = &dir createSyncedObject(ctx, t, so) return ctx }).Feature() f.Test(t, feat) } func TestSyncedObjectInvalidSpecStalled(t *testing.T) { var feat = f2.NewFeature(t.Name()). Setup("topic", setupTopic). Test("reconciles to stalled condition", func(ctx f2.Context, t *testing.T) f2.Context { var so = randomSyncedObject() so.Spec.Object = "" // an invalid spec. require.NoError(t, ktest.FromContextT(ctx, t).Client.Create(ctx, &so)) var kso soapi.SyncedObject require.Eventually(t, func() bool { err := ktest.FromContextT(ctx, t).Client.Get(ctx, types.NamespacedName{ Name: so.Name, Namespace: so.Namespace, }, &kso) return err == nil && conditions.IsStalled(&kso) }, timeout, tick) return ctx }).Feature() f.Test(t, feat) } func TestSyncedObjectExpireAtDeletesExpiredObjects(t *testing.T) { var so = randomSyncedObject() so.Spec.ExpireAt = &metav1.Time{ Time: time.Now().Add(3 * time.Second), } var reqs = make(chan chariot.Request, 2) //nolint:dupl var feat = f2.NewFeature(t.Name()). Setup("topic", setupTopic). Setup("receive", receiveMessages(reqs)). Test("reconcile", func(ctx f2.Context, t *testing.T) f2.Context { createSyncedObjectAndValidateChariotMessage(ctx, t, reqs, so) return ctx }). Test("is deleted when expired", func(ctx f2.Context, t *testing.T) f2.Context { require.Eventually(t, func() bool { var kso soapi.SyncedObject err := ktest.FromContextT(ctx, t).Client.Get(ctx, types.NamespacedName{ Name: so.Name, Namespace: so.Namespace, }, &kso) if time.Now().Before(so.Spec.ExpireAt.Time) { // The synced object should still exist before it expires. require.NoError(t, err) } return err != nil && client.IgnoreNotFound(err) == nil }, timeout, tick, "SyncedObject never got deleted") return ctx }). Test("sent delete pubsub message", func(ctx f2.Context, t *testing.T) f2.Context { select { case <-time.After(timeout): t.Fatal("never got delete chariot request for expired synced object") case req := <-reqs: validateChariotMessage(t, chariotClientApi.Delete, so, req) } return ctx }).Feature() f.Test(t, feat) } func TestSyncedObjectAbandonAnnotation(t *testing.T) { var so = randomSyncedObject() so.ObjectMeta.Annotations = map[string]string{ soapi.AnnotationDeletionPolicy: soapi.AnnotationDeletionPolicyAbandon, } so.Spec.ExpireAt = &metav1.Time{ Time: time.Now().Add(time.Second), } var reqs = make(chan chariot.Request, 2) //nolint:dupl var feat = f2.NewFeature(t.Name()). Setup("topic", setupTopic). Setup("receive", receiveMessages(reqs)). Test("reconcile", func(ctx f2.Context, t *testing.T) f2.Context { createSyncedObjectAndValidateChariotMessage(ctx, t, reqs, so) return ctx }). Test("is deleted when expired", func(ctx f2.Context, t *testing.T) f2.Context { require.Eventually(t, func() bool { var kso soapi.SyncedObject err := ktest.FromContextT(ctx, t).Client.Get(ctx, types.NamespacedName{ Name: so.Name, Namespace: so.Namespace, }, &kso) if time.Now().Before(so.Spec.ExpireAt.Time) { // The synced object should still exist before it expires. require.NoError(t, err) } return err != nil && client.IgnoreNotFound(err) == nil }, timeout, tick, "SyncedObject never got deleted") return ctx }). Test("delete pubsub message is not sent", func(ctx f2.Context, t *testing.T) f2.Context { select { case <-time.After(2 * time.Second): case req := <-reqs: t.Fatalf("should not get delete chariot request for expired synced object with abandon annotation: %v", req) } return ctx }).Feature() f.Test(t, feat) } func TestSyncedObjectAbandonAnnotationForOutdated(t *testing.T) { var so = randomSyncedObject() so.ObjectMeta.Annotations = map[string]string{ soapi.AnnotationDeletionPolicy: soapi.AnnotationDeletionPolicyAbandon, } var reqs = make(chan chariot.Request, 2) var feat = f2.NewFeature(t.Name()). Setup("topic", setupTopic). Setup("receive", receiveMessages(reqs)). Test("reconcile", func(ctx f2.Context, t *testing.T) f2.Context { createSyncedObjectAndValidateChariotMessage(ctx, t, reqs, so) return ctx }). Test("update the cluster to change the storage location", func(ctx f2.Context, t *testing.T) f2.Context { require.NoError(t, so.SetStatusDetails()) so.Spec.Cluster = uuid.New().String() storageLocationOutdated, err := syncedobject.IsStorageLocationOutdated(&so) require.NoError(t, err) require.True(t, storageLocationOutdated) updateSyncedObject(ctx, t, so) return ctx }). Test("delete pubsub message is not sent", func(ctx f2.Context, t *testing.T) f2.Context { select { case <-time.After(timeout): t.Fatal("timed out waiting for create message") case req := <-reqs: validateChariotMessage(t, chariotClientApi.Create, so, req) } select { case <-time.After(2 * time.Second): case req := <-reqs: t.Fatalf("should not get delete chariot request for updated synced object with abandon annotation: %v", req) } return ctx }).Feature() f.Test(t, feat) } func TestSyncedObjectMetricsWereSet(t *testing.T) { var so = randomSyncedObject() // gather metrics before test. const met = "edge_soctl_reconcile_condition_status" var labels = prometheus.Labels{"status": "True"} var initialMetricValue = promassert.Gauge(met).With(labels).TryFold() var feat = f2.NewFeature(t.Name()). Setup("topic", setupTopic). Test("metrics test", func(ctx f2.Context, t *testing.T) f2.Context { createSyncedObject(ctx, t, so) return ctx }). Test("metrics test", func(ctx f2.Context, t *testing.T) f2.Context { // Check that the metric value increased by 1 after applying and reconciling the syncedobject crd. require.Eventually(t, func() bool { finalMetricValue := promassert.Gauge(met).With(labels).TryFold() return 1 == finalMetricValue-initialMetricValue }, timeout, time.Second, "Metric was never set") return ctx }).Feature() f.Test(t, feat) } func TestSyncedObjectSpecUpdate(t *testing.T) { var ( origBanner = "ret-edge-original-banner" origCluster = uuid.New().String() origObj = randomBase64EncodedYamlObject() origChariotID, err = syncedobject.CreateChariotID(origObj) origStorageLocation = chariot.FmtStorageLocation(origBanner, origCluster, "", origChariotID) origNN = types.NamespacedName{ Name: "guid-abc-12345", Namespace: "default", } ) require.NoError(t, err) var origSo = soapi.SyncedObject{ ObjectMeta: metav1.ObjectMeta{ Name: origNN.Name, Namespace: origNN.Namespace, }, Spec: soapi.SyncedObjectSpec{ Banner: origBanner, Cluster: origCluster, Object: origObj, Directory: nil, ExpireAt: nil, }, Status: soapi.SyncedObjectStatus{ StoredObject: origObj, StorageLocation: origStorageLocation, Banner: origBanner, Cluster: origCluster, Directory: nil, }, } var reqs = make(chan chariot.Request, 2) var feat = f2.NewFeature(t.Name()). Setup("topic", setupTopic). Setup("receive", receiveMessages(reqs)). Test("reconcile", func(ctx f2.Context, t *testing.T) f2.Context { var so = origSo storageLocationOutdated, err := syncedobject.IsStorageLocationOutdated(&so) require.NoError(t, err) require.False(t, storageLocationOutdated) so.Status = soapi.SyncedObjectStatus{} // clear copy for create. createSyncedObjectAndValidateChariotMessage(ctx, t, reqs, so) return ctx }). Test("validate banner changed", func(ctx f2.Context, t *testing.T) f2.Context { var so = origSo so.Spec.Banner = "ret-edge-updated-banner" storageLocationOutdated, err := syncedobject.IsStorageLocationOutdated(&so) require.NoError(t, err) require.True(t, storageLocationOutdated) updateAndValidateChariotMessages(ctx, t, reqs, so) updateAndValidateChariotMessages(ctx, t, reqs, origSo) return ctx }). Test("validate cluster changed", func(ctx f2.Context, t *testing.T) f2.Context { var so = origSo so.Spec.Cluster = uuid.New().String() storageLocationOutdated, err := syncedobject.IsStorageLocationOutdated(&so) require.NoError(t, err) require.True(t, storageLocationOutdated) updateAndValidateChariotMessages(ctx, t, reqs, so) updateAndValidateChariotMessages(ctx, t, reqs, origSo) return ctx }). Test("validate dir changed", func(ctx f2.Context, t *testing.T) f2.Context { var so = origSo var updatedDir = "McRibIsBack" so.Spec.Directory = &updatedDir storageLocationOutdated, err := syncedobject.IsStorageLocationOutdated(&so) require.NoError(t, err) require.True(t, storageLocationOutdated) updateAndValidateChariotMessages(ctx, t, reqs, so) updateAndValidateChariotMessages(ctx, t, reqs, origSo) return ctx }). Test("validate object changed", func(ctx f2.Context, t *testing.T) f2.Context { var so = origSo so.Spec.Object = randomBase64EncodedYamlObject() storageLocationOutdated, err := syncedobject.IsStorageLocationOutdated(&so) require.NoError(t, err) require.True(t, storageLocationOutdated) updateAndValidateChariotMessages(ctx, t, reqs, so) updateAndValidateChariotMessages(ctx, t, reqs, origSo) return ctx }). Test("validate object changed without affecting storage location", func(ctx f2.Context, t *testing.T) f2.Context { // The storage location only changes when the object's GVKNN is modified. // Changing the object without affecting its GVKNN must not produce a delete message. var so = origSo var fr, err = DecodeFakeResourceBase64(so.Spec.Object) require.NoError(t, err) fr.Spec = FakeSpec{ Foo: "some new foo", Bar: "bar changed too", Baz: "will this cause a deletion snafu", } so.Spec.Object = fr.EncodeBase64() storageLocationOutdated, err := syncedobject.IsStorageLocationOutdated(&so) require.NoError(t, err) require.False(t, storageLocationOutdated) updateSyncedObject(ctx, t, so) select { case <-time.After(timeout): t.Fatal("timed out waiting for create message") case req := <-reqs: validateChariotMessage(t, chariotClientApi.Create, so, req) } return ctx }).Feature() f.Test(t, feat) } func setupTopic(ctx f2.Context, t *testing.T) f2.Context { var conn = pstest.FromContextT(ctx, t).Conn client, err := pubsub.NewClient(context.Background(), testConfig.GCPProjectID, option.WithGRPCConn(conn)) require.NoError(t, err) testConfig.PubsubTopic = client.Topic(testConfig.PublishTopic) exists, err := testConfig.PubsubTopic.Exists(ctx) require.NoError(t, err) if !exists { testConfig.PubsubTopic, err = client.CreateTopic(ctx, testConfig.PublishTopic) require.NoError(t, err) } return ctx } func receiveMessages(ch chan chariot.Request) func(f2.Context, *testing.T) f2.Context { return func(ctx f2.Context, t *testing.T) f2.Context { pstest.WithNewSubscription("test", testConfig.PublishTopic)(ctx, t) var ps = pstest.FromContextT(ctx, t) go func() { //nolint: errcheck // error will be returned when the Feature is done. ps.Subscribe(context.Background(), "test", func(_ context.Context, msg *pubsub.Message) { var req chariot.Request if err := json.Unmarshal(msg.Data, &req); err != nil { t.Fatal(err) } // uncomment for debugging: //t.Logf("received chariot message: %s", msg.Data) msg.Ack() ch <- req }) close(ch) }() return ctx } } func createSyncedObject(ctx f2.Context, t *testing.T, so soapi.SyncedObject) { require.NoError(t, ktest.FromContextT(ctx, t).Client.Create(ctx, &so)) waitForStatusToUpdate(ctx, t, &so) } func createSyncedObjectAndValidateChariotMessage(ctx f2.Context, t *testing.T, reqs chan chariot.Request, so soapi.SyncedObject) { createSyncedObject(ctx, t, so) select { case <-time.After(timeout): t.Fatal("timed out waiting for chariot message") case req := <-reqs: validateChariotMessage(t, chariotClientApi.Create, so, req) } } func validateChariotMessage(t *testing.T, operation chariotClientApi.Operation, so soapi.SyncedObject, req chariot.Request) { var dir string if so.Spec.Directory != nil { dir = *so.Spec.Directory } require.Equal(t, operation.String(), req.Operation) require.Equal(t, so.Spec.Banner, req.Banner) require.Equal(t, so.Spec.Cluster, req.Cluster) require.Equal(t, so.Spec.Object, base64.StdEncoding.EncodeToString(req.Objects[0])) require.Equal(t, dir, req.Dir) require.Equal(t, syncedobject.SyncedObjectChariotOwner, req.Owner) } func checkSpec(expected, actual soapi.SyncedObjectSpec) bool { switch { case expected.Directory != nil && actual.Directory == nil: return false case expected.Directory == nil && actual.Directory != nil: return false case expected.Directory != nil && actual.Directory != nil && *expected.Directory != *actual.Directory: return false case expected.ExpireAt != nil && actual.ExpireAt == nil: return false case expected.ExpireAt == nil && actual.ExpireAt != nil: return false case expected.ExpireAt != nil && actual.ExpireAt != nil && !expected.ExpireAt.Equal(actual.ExpireAt): return false case expected.Banner != actual.Banner: return false case expected.Cluster != actual.Cluster: return false case expected.Object != actual.Object: return false } return true } func waitForStatusToUpdate(ctx f2.Context, t *testing.T, expected *soapi.SyncedObject) { require.Eventually(t, func() bool { var actual soapi.SyncedObject var err = ktest.FromContextT(ctx, t).Client.Get(ctx, types.NamespacedName{ Name: expected.ObjectMeta.Name, Namespace: expected.ObjectMeta.Namespace, }, &actual) if err != nil { return false } else if !conditions.IsReady(&actual) { return false } else if !controllerutil.ContainsFinalizer(&actual, soapi.Finalizer) { return false } var expectedDir, actualStatusDir string if expected.Spec.Directory != nil { expectedDir = *expected.Spec.Directory } if actual.Status.Directory != nil { actualStatusDir = *actual.Status.Directory } chariotID, err := syncedobject.CreateChariotID(expected.Spec.Object) require.NoError(t, err) switch { case err != nil: return false case actual.Status.Banner != expected.Spec.Banner: return false case actual.Status.Cluster != expected.Spec.Cluster: return false case actual.Status.StoredObject != expected.Spec.Object: return false case actual.Status.StorageLocation != chariot.FmtStorageLocation(expected.Spec.Banner, expected.Spec.Cluster, expectedDir, chariotID): return false case actualStatusDir != expectedDir: return false } return checkSpec(expected.Spec, actual.Spec) }, timeout, tick, "SyncedObject never became Ready") } func updateSyncedObject(ctx f2.Context, t *testing.T, so soapi.SyncedObject) (before, after soapi.SyncedObject) { var currentSo soapi.SyncedObject require.NoError(t, ktest.FromContextT(ctx, t).Client.Get(ctx, types.NamespacedName{ Name: so.ObjectMeta.Name, Namespace: so.ObjectMeta.Namespace, }, ¤tSo)) var updateSo = currentSo updateSo.Spec = so.Spec // only update the spec. require.NoError(t, ktest.FromContextT(ctx, t).Client.Update(ctx, &updateSo)) var dir string if so.Spec.Directory != nil { dir = *so.Spec.Directory } var expectedSo = updateSo expectedChariotID, err := syncedobject.CreateChariotID(so.Spec.Object) expectedSo.Status = soapi.SyncedObjectStatus{ StoredObject: so.Spec.Object, StorageLocation: chariot.FmtStorageLocation(so.Spec.Banner, so.Spec.Cluster, dir, expectedChariotID), Banner: so.Spec.Banner, Cluster: so.Spec.Cluster, Directory: so.Spec.Directory, } require.NoError(t, err) waitForStatusToUpdate(ctx, t, &expectedSo) var afterSo soapi.SyncedObject require.NoError(t, ktest.FromContextT(ctx, t).Client.Get(ctx, types.NamespacedName{ Name: so.ObjectMeta.Name, Namespace: so.ObjectMeta.Namespace, }, &afterSo)) return currentSo, afterSo } func updateAndValidateChariotMessages(ctx f2.Context, t *testing.T, reqs chan chariot.Request, so soapi.SyncedObject) { before, after := updateSyncedObject(ctx, t, so) select { case <-time.After(timeout): t.Fatal("timed out getting delete message for outdated object on update") case req := <-reqs: validateChariotMessage(t, chariotClientApi.Delete, before, req) } select { case <-time.After(timeout): t.Fatal("timed out getting create message for new object on update") case req := <-reqs: validateChariotMessage(t, chariotClientApi.Create, after, req) } } func randomSyncedObject() soapi.SyncedObject { var dir *string //nolint:gosec // not doing cryptography if rand.Int63()%2 == 0 { var d = uuid.New().String() dir = &d } var bannerBytes [8]byte crypto.Read(bannerBytes[:]) //nolint:errcheck return soapi.SyncedObject{ ObjectMeta: metav1.ObjectMeta{ Name: uuid.New().String(), Namespace: "default", }, Spec: soapi.SyncedObjectSpec{ Banner: fmt.Sprintf("ret-edge-%x", bannerBytes), Cluster: uuid.New().String(), Object: randomBase64EncodedYamlObject(), Directory: dir, }, } } type FakeSpec struct { Foo string `yaml:"foo"` Bar string `yaml:"bar"` Baz string `yaml:"baz"` } type FakeResource struct { APIVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata metav1.ObjectMeta `yaml:"metadata"` Spec FakeSpec `yaml:"spec"` } func (fr *FakeResource) EncodeBase64() string { b, _ := yaml.Marshal(fr) //nolint:errcheck return base64.StdEncoding.EncodeToString(b) } func DecodeFakeResourceBase64(object string) (*FakeResource, error) { var fr FakeResource y, _ := base64.StdEncoding.DecodeString(object) //nolint:errcheck if err := yaml.Unmarshal(y, &fr); err != nil { return nil, err } return &fr, nil } // randomBase64EncodedYamlObject creates an object with GVKNN data (and some labels). func randomBase64EncodedYamlObject() string { // Group/Version var groupVersion = "clusterregistry.k8s.io/v1alpha1" // Kind var kinds = [3]string{"Cluster", "Namespace", "Tenant"} var kind = kinds[rand.Int()%len(kinds)] //nolint:gosec // not doing cryptography // Name var nameBytes [16]byte //nolint:errcheck // not doing cryptography crypto.Read(nameBytes[:]) //nolint:errcheck var name = fmt.Sprintf("%x", nameBytes) // Namespace var namespace string if rand.Int()%2 == 0 { //nolint:gosec // not doing cryptography // half probability of no namespace var namespaces = [5]string{"bulldozer", "ci", "godoc", "jack-bot", "policy-bot"} namespace = namespaces[rand.Int()%len(namespaces)] //nolint:gosec // not doing cryptography } // Label var labels map[string]string if rand.Int()%2 == 0 { //nolint:gosec // not doing cryptography // half probability of no labels var clusterNameBytes [4]byte var clusterTypeBytes [4]byte var fleetNameBytes [4]byte crypto.Read(clusterNameBytes[:]) //nolint:errcheck crypto.Read(clusterTypeBytes[:]) //nolint:errcheck crypto.Read(fleetNameBytes[:]) //nolint:errcheck labels = map[string]string{ "cluster.edge.ncr.com": fmt.Sprintf("%x", clusterNameBytes), "cluster.edge.ncr.com/type": fmt.Sprintf("%x", clusterTypeBytes), "fleet.edge.ncr.com": fmt.Sprintf("%x", fleetNameBytes), } } // Spec var specBytes [64]byte crypto.Read(specBytes[:]) //nolint:errcheck // Build the Yaml object var resource = FakeResource{ APIVersion: groupVersion, Kind: kind, Metadata: metav1.ObjectMeta{ Name: name, Namespace: namespace, Labels: labels, }, Spec: FakeSpec{ Foo: fmt.Sprintf("%x", specBytes[:16]), Bar: fmt.Sprintf("%x", specBytes[16:32]), Baz: fmt.Sprintf("%x", specBytes[32:64]), }, } fmt.Println("synced object base64:", resource.EncodeBase64()) return resource.EncodeBase64() }