// nolint: unparam package server_test import ( "context" "sync" "testing" "time" server "edge-infra.dev/pkg/edge/psqlinjector" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/pstest" "cloud.google.com/go/pubsub" "github.com/stretchr/testify/require" "google.golang.org/api/option" ) // nolint: dupl func TestReceiverMuxHappy(t *testing.T) { var cfg = new(server.Config) var bannerProjectIDs = []string{"foo", "bar", "baz"} var pollFunc = func(_ context.Context) ([]string, error) { return bannerProjectIDs, nil } var got = struct { sync.Mutex sync.WaitGroup attrs map[string]string }{ attrs: make(map[string]string), } var handlerFunc = func(_ context.Context, msg *pubsub.Message) error { t.Logf("message %q with attributes %v", msg.ID, msg.Attributes) defer got.Done() got.Lock() defer got.Unlock() for k, v := range msg.Attributes { got.attrs[k] = v } return nil } // We set all the cluster's clusters.infra_status_updated_at to this time, then check that it's updated. var start = time.Now().UTC().Truncate(time.Microsecond) // truncated due to precision https://www.postgresql.org/docs/current/datatype-datetime.html t.Logf("start time %v", start) // teardown psqlinjector plumbing var doneCtx, stopReceiverMux = context.WithCancel(context.Background()) var runErrCh = make(chan error, 1) // created when the "run" test executes. var client *pubsub.Client var feat = f2.NewFeature(t.Name()). Setup("config", func(ctx f2.Context, t *testing.T) f2.Context { return setupConfig(ctx, t, cfg) }). Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context { return createKinformTopics(ctx, t, cfg, bannerProjectIDs...) }). Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) var err error client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn)) require.NoError(t, err) return ctx }). Test("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context { var bmcfg = server.ReceiverMuxConfig{ ForemanProjectID: cfg.ForemanProjectID, SubscriptionID: cfg.SubscriptionID, TopicID: cfg.TopicID, Conn: cfg.TestPubSubConn, PollPeriod: cfg.PollBannersPeriod, PollMaxRetries: cfg.PollBannersMaxRetries, PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod, PollFunc: pollFunc, Handler: handlerFunc, } rm, err := server.NewReceiverMux(&bmcfg) require.NoError(t, err) go func() { err := rm.Run(doneCtx) runErrCh <- err }() return ctx }). Test("send message to each subscription", func(ctx f2.Context, t *testing.T) f2.Context { for _, banner := range bannerProjectIDs { got.Add(1) var topic = client.TopicInProject(cfg.TopicID, banner) // sanity check exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), Attributes: map[string]string{ banner: "", }, }) } return ctx }). Test("verify message from each subscription", func(ctx f2.Context, t *testing.T) f2.Context { got.Wait() got.Lock() defer got.Unlock() for _, banner := range bannerProjectIDs { _, found := got.attrs[banner] require.True(t, found, "did not find attribute with banner") } return ctx }). Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context { stopReceiverMux() require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error") return ctx }). Feature() f2f.Test(t, feat) } // nolint: dupl func TestReceiverMuxDynamicAdd(t *testing.T) { var cfg = new(server.Config) var bannerProjectIDs = struct { sync.Mutex ids []string }{ ids: []string{"foo", "bar", "baz"}, } var pollFunc = func(_ context.Context) ([]string, error) { bannerProjectIDs.Lock() defer bannerProjectIDs.Unlock() return bannerProjectIDs.ids, nil } var got = struct { sync.Mutex sync.WaitGroup attrs map[string]string }{ attrs: make(map[string]string), } var handlerFunc = func(_ context.Context, msg *pubsub.Message) error { t.Logf("message %q with attributes %v", msg.ID, msg.Attributes) defer got.Done() got.Lock() defer got.Unlock() for k, v := range msg.Attributes { got.attrs[k] = v } return nil } // We set all the cluster's clusters.infra_status_updated_at to this time, then check that it's updated. var start = time.Now().UTC().Truncate(time.Microsecond) // truncated due to precision https://www.postgresql.org/docs/current/datatype-datetime.html t.Logf("start time %v", start) // teardown psqlinjector plumbing var doneCtx, stopReceiverMux = context.WithCancel(context.Background()) var runErrCh = make(chan error, 1) // created when the "run" test executes. var client *pubsub.Client var feat = f2.NewFeature(t.Name()). Setup("config", func(ctx f2.Context, t *testing.T) f2.Context { return setupConfig(ctx, t, cfg) }). Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context { return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...) }). Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) var err error client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn)) require.NoError(t, err) return ctx }). Test("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context { var bmcfg = server.ReceiverMuxConfig{ ForemanProjectID: cfg.ForemanProjectID, SubscriptionID: cfg.SubscriptionID, Conn: cfg.TestPubSubConn, TopicID: cfg.TopicID, PollPeriod: cfg.PollBannersPeriod, PollMaxRetries: cfg.PollBannersMaxRetries, PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod, Handler: handlerFunc, PollFunc: pollFunc, } rm, err := server.NewReceiverMux(&bmcfg) require.NoError(t, err) go func() { err := rm.Run(doneCtx) runErrCh <- err }() return ctx }). Test("send message to each subscription", func(ctx f2.Context, t *testing.T) f2.Context { var v = "one" for _, banner := range bannerProjectIDs.ids { got.Add(1) var topic = client.TopicInProject(cfg.TopicID, banner) // sanity check exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), Attributes: map[string]string{ banner: v, }, }) got.Wait() value, found := got.attrs[banner] require.True(t, found, "did not find attribute with banner") require.Equal(t, value, v) } return ctx }). Test("create new banner and verify messages", func(ctx f2.Context, t *testing.T) f2.Context { var newBanner = "dynamic" ctx = createKinformTopics(ctx, t, cfg, newBanner) bannerProjectIDs.Lock() bannerProjectIDs.ids = append(bannerProjectIDs.ids, newBanner) bannerProjectIDs.Unlock() <-time.After(time.Second) got.Add(len(bannerProjectIDs.ids)) for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) // sanity check exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), Attributes: map[string]string{ banner: "two", }, }) } got.Wait() for _, banner := range bannerProjectIDs.ids { value, found := got.attrs[banner] require.True(t, found, "did not find attribute with banner") require.Equal(t, value, "two") } return ctx }). Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context { stopReceiverMux() require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error") return ctx }). Feature() f2f.Test(t, feat) } // nolint: dupl func TestReceiverMuxDynamicRemove(t *testing.T) { var cfg = new(server.Config) var bannerProjectIDs = struct { sync.Mutex ids []string }{ ids: []string{"foo", "bar", "baz"}, } var pollFunc = func(_ context.Context) ([]string, error) { bannerProjectIDs.Lock() defer bannerProjectIDs.Unlock() return bannerProjectIDs.ids, nil } var got = struct { sync.Mutex sync.WaitGroup attrs map[string]string }{ attrs: make(map[string]string), } var handlerFunc = func(_ context.Context, msg *pubsub.Message) error { defer got.Done() got.Lock() defer got.Unlock() for k, v := range msg.Attributes { got.attrs[k] = v } return nil } // teardown psqlinjector plumbing var doneCtx, stopReceiverMux = context.WithCancel(context.Background()) var runErrCh = make(chan error, 1) // created when the "run" test executes. var client *pubsub.Client var feat = f2.NewFeature(t.Name()). Setup("config", func(ctx f2.Context, t *testing.T) f2.Context { return setupConfig(ctx, t, cfg) }). Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context { return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...) }). Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) var err error client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn)) require.NoError(t, err) return ctx }). Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context { var bmcfg = server.ReceiverMuxConfig{ SubscriptionID: cfg.SubscriptionID, ForemanProjectID: cfg.ForemanProjectID, Conn: cfg.TestPubSubConn, PollPeriod: cfg.PollBannersPeriod, PollMaxRetries: cfg.PollBannersMaxRetries, PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod, TopicID: cfg.TopicID, PollFunc: pollFunc, Handler: handlerFunc, } rm, err := server.NewReceiverMux(&bmcfg) require.NoError(t, err) go func() { err := rm.Run(doneCtx) runErrCh <- err }() return ctx }). Test("delete a banner and wait until messages are no longer received", func(ctx f2.Context, t *testing.T) f2.Context { var removed = bannerProjectIDs.ids[0] var topic = client.TopicInProject(cfg.TopicID, removed) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) var removeBannerOnce sync.Once var receivedCount int for { got.Add(1) topic.Publish(ctx, &pubsub.Message{ Data: []byte(removed), }) var stillReceiving = make(chan struct{}) go func() { got.Wait() close(stillReceiving) removeBannerOnce.Do(func() { bannerProjectIDs.Lock() bannerProjectIDs.ids = bannerProjectIDs.ids[1:] bannerProjectIDs.Unlock() }) }() select { case <-time.After(time.Second): t.Logf("receiver mux dynamically unsubscribed") // verify receiver mux was receiving from the subscription before it was shut down. t.Logf("received count: %d", receivedCount) require.True(t, receivedCount > 0) // clean up goroutine, and block until this channel is closed, so that wg.Wait doesn't panic when it's called in the next case test. got.Done() <-stillReceiving return ctx case <-stillReceiving: // this occurs in a few milliseconds, so 1 second is more than enough time. receivedCount++ } } }). Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context { var v = "others still working" got.Add(len(bannerProjectIDs.ids)) for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), Attributes: map[string]string{ banner: v, }, }) } got.Wait() for _, banner := range bannerProjectIDs.ids { value, found := got.attrs[banner] require.True(t, found, "did not find attribute with banner") require.Equal(t, value, v) } return ctx }). Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context { stopReceiverMux() require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error") return ctx }). Feature() f2f.Test(t, feat) } // nolint: dupl func TestReceiverMuxDynamicRemoveOnReceiveError(t *testing.T) { var cfg = new(server.Config) var bannerProjectIDs = struct { sync.Mutex ids []string }{ ids: []string{"foo", "bar", "baz"}, } var pollFunc = func(_ context.Context) ([]string, error) { bannerProjectIDs.Lock() defer bannerProjectIDs.Unlock() return bannerProjectIDs.ids, nil } var got struct { sync.Mutex sync.WaitGroup } const markDone = "markDone" const panicCanary = "shouldBeDropped" var handlerFunc = func(_ context.Context, msg *pubsub.Message) error { for k := range msg.Attributes { switch k { case markDone: defer got.Done() case panicCanary: panic("should not have received this message") } } return nil } // teardown psqlinjector plumbing var doneCtx, stopReceiverMux = context.WithCancel(context.Background()) var runErrCh = make(chan error, 1) // created when the "run" test executes. var client *pubsub.Client var feat = f2.NewFeature(t.Name()). Setup("config", func(ctx f2.Context, t *testing.T) f2.Context { return setupConfig(ctx, t, cfg) }). Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context { return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...) }). Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) var err error client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn)) require.NoError(t, err) return ctx }). Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context { var bmcfg = server.ReceiverMuxConfig{ ForemanProjectID: cfg.ForemanProjectID, SubscriptionID: cfg.SubscriptionID, TopicID: cfg.TopicID, Conn: cfg.TestPubSubConn, PollMaxRetries: cfg.PollBannersMaxRetries, PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod, PollFunc: pollFunc, PollPeriod: cfg.PollBannersPeriod, Handler: handlerFunc, } rm, err := server.NewReceiverMux(&bmcfg) require.NoError(t, err) go func() { err := rm.Run(doneCtx) runErrCh <- err }() return ctx }). Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context { got.Add(len(bannerProjectIDs.ids)) for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), Attributes: map[string]string{ markDone: "yes", }, }) } got.Wait() return ctx }). Test("delete a banner subscription and send canary message", func(ctx f2.Context, t *testing.T) f2.Context { var broken = bannerProjectIDs.ids[0] var topic = client.TopicInProject(cfg.TopicID, broken) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) err = client.SubscriptionInProject(cfg.SubscriptionID, broken).Delete(ctx) require.NoError(t, err) // this message should never be received. topic.Publish(ctx, &pubsub.Message{ Data: []byte(broken), Attributes: map[string]string{ panicCanary: "messages sent before the new subscription exists should not be received", }, }) topic.Flush() // give time for poll to elapse <-time.After(1 * time.Second) // this message should never be received. topic.Publish(ctx, &pubsub.Message{ Data: []byte(broken), Attributes: map[string]string{ panicCanary: "messages sent before the new subscription exists should not be received", }, }) topic.Flush() return ctx }). Test("recreate the banner subscription", func(ctx f2.Context, t *testing.T) f2.Context { var broken = bannerProjectIDs.ids[0] ps := pstest.FromContextT(ctx, t) client2, err := pubsub.NewClient(ctx, broken, option.WithGRPCConn(ps.Conn)) require.NoError(t, err) var topic = client2.Topic(cfg.TopicID) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{ Topic: topic, }) require.NoError(t, err) exists, err = sub.Exists(ctx) require.NoError(t, err) require.True(t, exists) return ctx }). Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context { got.Add(len(bannerProjectIDs.ids)) for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), Attributes: map[string]string{ markDone: "yes", }, }) } got.Wait() t.Logf("receiver mux is receiving from all subscriptions again") return ctx }). Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context { stopReceiverMux() require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error") return ctx }). Feature() f2f.Test(t, feat) } // nolint: dupl func TestReceiverMuxSubDoesNotExistAtStartup(t *testing.T) { var cfg = new(server.Config) var bannerProjectIDs = struct { sync.Mutex ids []string }{ ids: []string{"foo", "bar", "baz"}, } var pollFunc = func(_ context.Context) ([]string, error) { bannerProjectIDs.Lock() defer bannerProjectIDs.Unlock() return bannerProjectIDs.ids, nil } var got struct { sync.Mutex sync.WaitGroup } var handlerFunc = func(_ context.Context, _ *pubsub.Message) error { got.Done() return nil } // teardown psqlinjector plumbing var doneCtx, stopReceiverMux = context.WithCancel(context.Background()) var runErrCh = make(chan error, 1) // created when the "run" test executes. var client *pubsub.Client var feat = f2.NewFeature(t.Name()). Setup("config", func(ctx f2.Context, t *testing.T) f2.Context { return setupConfig(ctx, t, cfg) }). Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context { return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids[1:]...) }). Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) var err error client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn)) require.NoError(t, err) return ctx }). Setup("topic without subscription", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) client2, err := pubsub.NewClient(ctx, bannerProjectIDs.ids[0], option.WithGRPCConn(ps.Conn)) require.NoError(t, err) topic, err := client2.CreateTopic(ctx, cfg.TopicID) require.NoError(t, err, "error creating pubsub topic") exists, err := topic.Exists(ctx) require.NoError(t, err, "error checking if topic exists") require.True(t, exists, "topic does not exist") return ctx }). Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context { var bmcfg = server.ReceiverMuxConfig{ ForemanProjectID: cfg.ForemanProjectID, SubscriptionID: cfg.SubscriptionID, TopicID: cfg.TopicID, Conn: cfg.TestPubSubConn, PollPeriod: cfg.PollBannersPeriod, PollMaxRetries: cfg.PollBannersMaxRetries, PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod, PollFunc: pollFunc, Handler: handlerFunc, } rm, err := server.NewReceiverMux(&bmcfg) require.NoError(t, err) go func() { err := rm.Run(doneCtx) runErrCh <- err }() return ctx }). Test("send message to all banners", func(ctx f2.Context, t *testing.T) f2.Context { got.Add(len(bannerProjectIDs.ids)) for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), }) } // prove that the subscription does not exist yet. go func() { <-time.After(time.Second) got.Done() // this will cause a panic if the subscription actually exists. }() got.Wait() return ctx }). Test("create the missing banner subscription", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) client2, err := pubsub.NewClient(ctx, bannerProjectIDs.ids[0], option.WithGRPCConn(ps.Conn)) require.NoError(t, err) var topic = client2.Topic(cfg.TopicID) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{ Topic: topic, }) require.NoError(t, err) exists, err = sub.Exists(ctx) require.NoError(t, err) require.True(t, exists) return ctx }). Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context { got.Add(len(bannerProjectIDs.ids)) for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), }) } got.Wait() t.Logf("receiver mux is receiving from all subscriptions again") return ctx }). Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context { stopReceiverMux() require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error") return ctx }). Feature() f2f.Test(t, feat) }