// nolint: unparam // nolint: dupl package server_test import ( "context" "fmt" "sync" "testing" "time" server "edge-infra.dev/pkg/edge/psqlinjector" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/pstest" "cloud.google.com/go/pubsub" "github.com/google/uuid" "github.com/stretchr/testify/require" "google.golang.org/api/option" ) func TestReceiverMuxHealthCheck(t *testing.T) { var cfg = new(server.Config) var bannerProjectIDs struct { sync.Mutex ids []string } for range 20 { bannerProjectIDs.ids = append(bannerProjectIDs.ids, fmt.Sprintf("sub%s", uuid.New())) } var pollFunc = func(_ context.Context) ([]string, error) { bannerProjectIDs.Lock() defer bannerProjectIDs.Unlock() return bannerProjectIDs.ids, nil } var got struct { sync.Mutex sync.WaitGroup } const markNacked = "nacked" var handlerFunc = func(_ context.Context, msg *pubsub.Message) error { defer got.Done() for k := range msg.Attributes { if k == markNacked { return fmt.Errorf("nacked") } } return nil } // teardown psqlinjector plumbing var doneCtx, stopReceiverMux = context.WithCancel(context.Background()) var runErrCh = make(chan error, 1) // created when the "run" test executes. var client *pubsub.Client var getHealthCheck func(*testing.T) server.ReceiverMuxHealthCheck var feat = f2.NewFeature(t.Name()). Setup("config", func(ctx f2.Context, t *testing.T) f2.Context { return setupConfig(ctx, t, cfg) }). Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context { return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...) }). Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context { ps := pstest.FromContextT(ctx, t) var err error client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn)) require.NoError(t, err) return ctx }). Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context { var bmcfg = server.ReceiverMuxConfig{ ForemanProjectID: cfg.ForemanProjectID, SubscriptionID: cfg.SubscriptionID, TopicID: cfg.TopicID, Conn: cfg.TestPubSubConn, PollPeriod: cfg.PollBannersPeriod, PollMaxRetries: cfg.PollBannersMaxRetries, PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod, PollFunc: pollFunc, Handler: handlerFunc, } rm, err := server.NewReceiverMux(&bmcfg) require.NoError(t, err) getHealthCheck = func(t *testing.T) server.ReceiverMuxHealthCheck { var bmhc = rm.HealthCheck() /* var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(bmhc); err != nil { require.NoError(t, err) } t.Logf("health check for %s: %s", t.Name(), buf.Bytes()) */ require.Equal(t, bmhc.Count, len(bannerProjectIDs.ids)) return bmhc } go func() { err := rm.Run(doneCtx) runErrCh <- err }() return ctx }). Test("health check", func(ctx f2.Context, t *testing.T) f2.Context { for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) got.Add(1) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), // marked Acked }) } got.Wait() <-time.After(time.Second) var bmhc = getHealthCheck(t) require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count) require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount) require.Equal(t, 0, bmhc.UnhealthyCount) for _, projectID := range bannerProjectIDs.ids { _, found := bmhc.Receivers[projectID] require.True(t, found) } for _, rhc := range bmhc.Receivers { require.True(t, rhc.Healthy) require.Equal(t, rhc.RestartCount, 0) require.Equal(t, rhc.TotalMessagesProcessed, 1) require.Equal(t, rhc.CurrentMessagesProcessed, 1) require.Equal(t, rhc.TotalMessagesAcked, 1) require.Equal(t, rhc.CurrentMessagesAcked, 1) require.Equal(t, rhc.TotalMessagesNacked, 0) require.Equal(t, rhc.CurrentMessagesNacked, 0) require.False(t, rhc.TotalReceiveDuration == 0) require.False(t, rhc.CurrentReceiveDuration == 0) require.Equal(t, rhc.TotalReceiveDuration, rhc.CurrentReceiveDuration) } /* delete half of the subscriptions */ var broken = make(map[string]bool) for i, projectID := range bannerProjectIDs.ids { if i%2 == 0 { continue } broken[projectID] = true var topic = client.TopicInProject(cfg.TopicID, projectID) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) err = client.SubscriptionInProject(cfg.SubscriptionID, projectID).Delete(ctx) require.NoError(t, err) } // give time for poll to elapse bmhc = getHealthCheck(t) for bmhc.UnhealthyCount != len(broken) { <-time.After(500 * time.Millisecond) bmhc = getHealthCheck(t) } require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count) require.Equal(t, len(bannerProjectIDs.ids)-len(broken), bmhc.HealthyCount) require.Equal(t, len(broken), bmhc.UnhealthyCount) for _, projectID := range bannerProjectIDs.ids { _, found := bmhc.Receivers[projectID] require.True(t, found) } for projectID, rhc := range bmhc.Receivers { require.Equal(t, rhc.TotalMessagesProcessed, 1) require.Equal(t, rhc.TotalMessagesAcked, 1) require.Equal(t, rhc.TotalMessagesNacked, 0) require.Equal(t, rhc.CurrentMessagesNacked, 0) if !broken[projectID] { require.True(t, rhc.Healthy) require.Equal(t, rhc.RestartCount, 0) require.Equal(t, rhc.CurrentMessagesProcessed, 1) require.Equal(t, rhc.CurrentMessagesAcked, 1) require.True(t, rhc.TotalReceiveDuration == rhc.CurrentReceiveDuration) require.True(t, rhc.CurrentReceiveDuration != 0) continue } require.False(t, rhc.Healthy) require.Equal(t, rhc.RestartCount, 1) require.Equal(t, rhc.CurrentMessagesProcessed, 0) require.Equal(t, rhc.CurrentMessagesAcked, 0) require.True(t, rhc.TotalReceiveDuration != rhc.CurrentReceiveDuration) require.True(t, rhc.CurrentReceiveDuration == 0) } /* recreate the banner subscriptions */ for projectID := range broken { ps := pstest.FromContextT(ctx, t) client2, err := pubsub.NewClient(ctx, projectID, option.WithGRPCConn(ps.Conn)) require.NoError(t, err) var topic = client2.Topic(cfg.TopicID) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{ Topic: topic, }) require.NoError(t, err) exists, err = sub.Exists(ctx) require.NoError(t, err) require.True(t, exists) } // wait for all receivers to recover bmhc = getHealthCheck(t) for bmhc.UnhealthyCount != 0 { <-time.After(500 * time.Millisecond) bmhc = getHealthCheck(t) } require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count) require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount) require.Equal(t, 0, bmhc.UnhealthyCount) for _, projectID := range bannerProjectIDs.ids { _, found := bmhc.Receivers[projectID] require.True(t, found) } /* send message to active banners */ for _, banner := range bannerProjectIDs.ids { var topic = client.TopicInProject(cfg.TopicID, banner) exists, err := topic.Exists(ctx) require.NoError(t, err) require.True(t, exists) got.Add(1) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), }) got.Add(1) topic.Publish(ctx, &pubsub.Message{ Data: []byte(banner), }) } got.Wait() <-time.After(time.Second) // give time for poll to elapse bmhc = getHealthCheck(t) require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count) require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount) require.Equal(t, 0, bmhc.UnhealthyCount) for _, projectID := range bannerProjectIDs.ids { _, found := bmhc.Receivers[projectID] require.True(t, found) } for projectID, rhc := range bmhc.Receivers { require.True(t, rhc.Healthy) require.Equal(t, rhc.TotalMessagesProcessed, 3) require.Equal(t, rhc.TotalMessagesAcked, 3) require.Equal(t, rhc.TotalMessagesNacked, 0) require.Equal(t, rhc.CurrentMessagesNacked, 0) require.True(t, rhc.CurrentReceiveDuration != 0) if broken[projectID] { // the receiver with the broken subscription has slightly different values. require.Equal(t, rhc.RestartCount, 1) require.Equal(t, rhc.CurrentMessagesProcessed, 2) require.Equal(t, rhc.CurrentMessagesAcked, 2) require.True(t, rhc.TotalReceiveDuration != rhc.CurrentReceiveDuration) } else { require.Equal(t, rhc.RestartCount, 0) require.Equal(t, rhc.CurrentMessagesProcessed, 3) require.Equal(t, rhc.CurrentMessagesAcked, 3) require.True(t, rhc.TotalReceiveDuration == rhc.CurrentReceiveDuration) } } t.Logf("receiver mux is receiving from all subscriptions again and HealthCheck is correct") return ctx }). Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context { stopReceiverMux() require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error") return ctx }). Feature() f2f.Test(t, feat) }