...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/integration/mux_healthcheck_test.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector/integration

     1  // nolint: unparam
     2  // nolint: dupl
     3  package server_test
     4  
     5  import (
     6  	"context"
     7  	"fmt"
     8  	"sync"
     9  	"testing"
    10  	"time"
    11  
    12  	server "edge-infra.dev/pkg/edge/psqlinjector"
    13  	"edge-infra.dev/test/f2"
    14  	"edge-infra.dev/test/f2/x/pstest"
    15  
    16  	"cloud.google.com/go/pubsub"
    17  	"github.com/google/uuid"
    18  	"github.com/stretchr/testify/require"
    19  	"google.golang.org/api/option"
    20  )
    21  
    22  func TestReceiverMuxHealthCheck(t *testing.T) {
    23  	var cfg = new(server.Config)
    24  
    25  	var bannerProjectIDs struct {
    26  		sync.Mutex
    27  		ids []string
    28  	}
    29  
    30  	for range 20 {
    31  		bannerProjectIDs.ids = append(bannerProjectIDs.ids, fmt.Sprintf("sub%s", uuid.New()))
    32  	}
    33  
    34  	var pollFunc = func(_ context.Context) ([]string, error) {
    35  		bannerProjectIDs.Lock()
    36  		defer bannerProjectIDs.Unlock()
    37  		return bannerProjectIDs.ids, nil
    38  	}
    39  
    40  	var got struct {
    41  		sync.Mutex
    42  		sync.WaitGroup
    43  	}
    44  	const markNacked = "nacked"
    45  	var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
    46  		defer got.Done()
    47  		for k := range msg.Attributes {
    48  			if k == markNacked {
    49  				return fmt.Errorf("nacked")
    50  			}
    51  		}
    52  		return nil
    53  	}
    54  
    55  	// teardown psqlinjector plumbing
    56  	var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
    57  	var runErrCh = make(chan error, 1) // created when the "run" test executes.
    58  
    59  	var client *pubsub.Client
    60  
    61  	var getHealthCheck func(*testing.T) server.ReceiverMuxHealthCheck
    62  
    63  	var feat = f2.NewFeature(t.Name()).
    64  		Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
    65  			return setupConfig(ctx, t, cfg)
    66  		}).
    67  		Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
    68  			return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
    69  		}).
    70  		Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
    71  			ps := pstest.FromContextT(ctx, t)
    72  			var err error
    73  			client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
    74  			require.NoError(t, err)
    75  			return ctx
    76  		}).
    77  		Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
    78  			var bmcfg = server.ReceiverMuxConfig{
    79  				ForemanProjectID:             cfg.ForemanProjectID,
    80  				SubscriptionID:               cfg.SubscriptionID,
    81  				TopicID:                      cfg.TopicID,
    82  				Conn:                         cfg.TestPubSubConn,
    83  				PollPeriod:                   cfg.PollBannersPeriod,
    84  				PollMaxRetries:               cfg.PollBannersMaxRetries,
    85  				PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
    86  				PollFunc:                     pollFunc,
    87  				Handler:                      handlerFunc,
    88  			}
    89  			rm, err := server.NewReceiverMux(&bmcfg)
    90  			require.NoError(t, err)
    91  
    92  			getHealthCheck = func(t *testing.T) server.ReceiverMuxHealthCheck {
    93  				var bmhc = rm.HealthCheck()
    94  				/*
    95  					var buf bytes.Buffer
    96  					if err := json.NewEncoder(&buf).Encode(bmhc); err != nil {
    97  						require.NoError(t, err)
    98  					}
    99  					t.Logf("health check for %s: %s", t.Name(), buf.Bytes())
   100  				*/
   101  				require.Equal(t, bmhc.Count, len(bannerProjectIDs.ids))
   102  				return bmhc
   103  			}
   104  
   105  			go func() {
   106  				err := rm.Run(doneCtx)
   107  				runErrCh <- err
   108  			}()
   109  			return ctx
   110  		}).
   111  		Test("health check", func(ctx f2.Context, t *testing.T) f2.Context {
   112  			for _, banner := range bannerProjectIDs.ids {
   113  				var topic = client.TopicInProject(cfg.TopicID, banner)
   114  				exists, err := topic.Exists(ctx)
   115  				require.NoError(t, err)
   116  				require.True(t, exists)
   117  
   118  				got.Add(1)
   119  				topic.Publish(ctx, &pubsub.Message{
   120  					Data: []byte(banner),
   121  					// marked Acked
   122  				})
   123  			}
   124  
   125  			got.Wait()
   126  			<-time.After(time.Second)
   127  
   128  			var bmhc = getHealthCheck(t)
   129  			require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
   130  			require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount)
   131  			require.Equal(t, 0, bmhc.UnhealthyCount)
   132  			for _, projectID := range bannerProjectIDs.ids {
   133  				_, found := bmhc.Receivers[projectID]
   134  				require.True(t, found)
   135  			}
   136  
   137  			for _, rhc := range bmhc.Receivers {
   138  				require.True(t, rhc.Healthy)
   139  				require.Equal(t, rhc.RestartCount, 0)
   140  
   141  				require.Equal(t, rhc.TotalMessagesProcessed, 1)
   142  				require.Equal(t, rhc.CurrentMessagesProcessed, 1)
   143  
   144  				require.Equal(t, rhc.TotalMessagesAcked, 1)
   145  				require.Equal(t, rhc.CurrentMessagesAcked, 1)
   146  
   147  				require.Equal(t, rhc.TotalMessagesNacked, 0)
   148  				require.Equal(t, rhc.CurrentMessagesNacked, 0)
   149  
   150  				require.False(t, rhc.TotalReceiveDuration == 0)
   151  				require.False(t, rhc.CurrentReceiveDuration == 0)
   152  				require.Equal(t, rhc.TotalReceiveDuration, rhc.CurrentReceiveDuration)
   153  			}
   154  
   155  			/*
   156  				delete half of the subscriptions
   157  			*/
   158  			var broken = make(map[string]bool)
   159  			for i, projectID := range bannerProjectIDs.ids {
   160  				if i%2 == 0 {
   161  					continue
   162  				}
   163  				broken[projectID] = true
   164  				var topic = client.TopicInProject(cfg.TopicID, projectID)
   165  				exists, err := topic.Exists(ctx)
   166  				require.NoError(t, err)
   167  				require.True(t, exists)
   168  
   169  				err = client.SubscriptionInProject(cfg.SubscriptionID, projectID).Delete(ctx)
   170  				require.NoError(t, err)
   171  			}
   172  
   173  			// give time for poll to elapse
   174  			bmhc = getHealthCheck(t)
   175  			for bmhc.UnhealthyCount != len(broken) {
   176  				<-time.After(500 * time.Millisecond)
   177  				bmhc = getHealthCheck(t)
   178  			}
   179  
   180  			require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
   181  			require.Equal(t, len(bannerProjectIDs.ids)-len(broken), bmhc.HealthyCount)
   182  			require.Equal(t, len(broken), bmhc.UnhealthyCount)
   183  			for _, projectID := range bannerProjectIDs.ids {
   184  				_, found := bmhc.Receivers[projectID]
   185  				require.True(t, found)
   186  			}
   187  
   188  			for projectID, rhc := range bmhc.Receivers {
   189  				require.Equal(t, rhc.TotalMessagesProcessed, 1)
   190  				require.Equal(t, rhc.TotalMessagesAcked, 1)
   191  				require.Equal(t, rhc.TotalMessagesNacked, 0)
   192  				require.Equal(t, rhc.CurrentMessagesNacked, 0)
   193  
   194  				if !broken[projectID] {
   195  					require.True(t, rhc.Healthy)
   196  					require.Equal(t, rhc.RestartCount, 0)
   197  					require.Equal(t, rhc.CurrentMessagesProcessed, 1)
   198  					require.Equal(t, rhc.CurrentMessagesAcked, 1)
   199  					require.True(t, rhc.TotalReceiveDuration == rhc.CurrentReceiveDuration)
   200  					require.True(t, rhc.CurrentReceiveDuration != 0)
   201  					continue
   202  				}
   203  				require.False(t, rhc.Healthy)
   204  				require.Equal(t, rhc.RestartCount, 1)
   205  				require.Equal(t, rhc.CurrentMessagesProcessed, 0)
   206  				require.Equal(t, rhc.CurrentMessagesAcked, 0)
   207  				require.True(t, rhc.TotalReceiveDuration != rhc.CurrentReceiveDuration)
   208  				require.True(t, rhc.CurrentReceiveDuration == 0)
   209  			}
   210  
   211  			/*
   212  				recreate the banner subscriptions
   213  			*/
   214  
   215  			for projectID := range broken {
   216  				ps := pstest.FromContextT(ctx, t)
   217  				client2, err := pubsub.NewClient(ctx, projectID, option.WithGRPCConn(ps.Conn))
   218  				require.NoError(t, err)
   219  
   220  				var topic = client2.Topic(cfg.TopicID)
   221  				exists, err := topic.Exists(ctx)
   222  				require.NoError(t, err)
   223  				require.True(t, exists)
   224  
   225  				sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{
   226  					Topic: topic,
   227  				})
   228  				require.NoError(t, err)
   229  				exists, err = sub.Exists(ctx)
   230  				require.NoError(t, err)
   231  				require.True(t, exists)
   232  			}
   233  
   234  			// wait for all receivers to recover
   235  			bmhc = getHealthCheck(t)
   236  			for bmhc.UnhealthyCount != 0 {
   237  				<-time.After(500 * time.Millisecond)
   238  				bmhc = getHealthCheck(t)
   239  			}
   240  
   241  			require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
   242  			require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount)
   243  			require.Equal(t, 0, bmhc.UnhealthyCount)
   244  			for _, projectID := range bannerProjectIDs.ids {
   245  				_, found := bmhc.Receivers[projectID]
   246  				require.True(t, found)
   247  			}
   248  
   249  			/*
   250  				send message to active banners
   251  			*/
   252  			for _, banner := range bannerProjectIDs.ids {
   253  				var topic = client.TopicInProject(cfg.TopicID, banner)
   254  				exists, err := topic.Exists(ctx)
   255  				require.NoError(t, err)
   256  				require.True(t, exists)
   257  
   258  				got.Add(1)
   259  				topic.Publish(ctx, &pubsub.Message{
   260  					Data: []byte(banner),
   261  				})
   262  
   263  				got.Add(1)
   264  				topic.Publish(ctx, &pubsub.Message{
   265  					Data: []byte(banner),
   266  				})
   267  			}
   268  
   269  			got.Wait()
   270  			<-time.After(time.Second) // give time for poll to elapse
   271  
   272  			bmhc = getHealthCheck(t)
   273  			require.Equal(t, len(bannerProjectIDs.ids), bmhc.Count)
   274  			require.Equal(t, len(bannerProjectIDs.ids), bmhc.HealthyCount)
   275  			require.Equal(t, 0, bmhc.UnhealthyCount)
   276  			for _, projectID := range bannerProjectIDs.ids {
   277  				_, found := bmhc.Receivers[projectID]
   278  				require.True(t, found)
   279  			}
   280  
   281  			for projectID, rhc := range bmhc.Receivers {
   282  				require.True(t, rhc.Healthy)
   283  				require.Equal(t, rhc.TotalMessagesProcessed, 3)
   284  				require.Equal(t, rhc.TotalMessagesAcked, 3)
   285  				require.Equal(t, rhc.TotalMessagesNacked, 0)
   286  				require.Equal(t, rhc.CurrentMessagesNacked, 0)
   287  				require.True(t, rhc.CurrentReceiveDuration != 0)
   288  
   289  				if broken[projectID] {
   290  					// the receiver with the broken subscription has slightly different values.
   291  					require.Equal(t, rhc.RestartCount, 1)
   292  					require.Equal(t, rhc.CurrentMessagesProcessed, 2)
   293  					require.Equal(t, rhc.CurrentMessagesAcked, 2)
   294  					require.True(t, rhc.TotalReceiveDuration != rhc.CurrentReceiveDuration)
   295  				} else {
   296  					require.Equal(t, rhc.RestartCount, 0)
   297  					require.Equal(t, rhc.CurrentMessagesProcessed, 3)
   298  					require.Equal(t, rhc.CurrentMessagesAcked, 3)
   299  					require.True(t, rhc.TotalReceiveDuration == rhc.CurrentReceiveDuration)
   300  				}
   301  			}
   302  
   303  			t.Logf("receiver mux is receiving from all subscriptions again and HealthCheck is correct")
   304  			return ctx
   305  		}).
   306  		Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
   307  			stopReceiverMux()
   308  			require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
   309  			return ctx
   310  		}).
   311  		Feature()
   312  
   313  	f2f.Test(t, feat)
   314  }
   315  

View as plain text