...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/integration/mux_test.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector/integration

     1  // nolint: unparam
     2  package server_test
     3  
     4  import (
     5  	"context"
     6  	"sync"
     7  	"testing"
     8  	"time"
     9  
    10  	server "edge-infra.dev/pkg/edge/psqlinjector"
    11  	"edge-infra.dev/test/f2"
    12  	"edge-infra.dev/test/f2/x/pstest"
    13  
    14  	"cloud.google.com/go/pubsub"
    15  	"github.com/stretchr/testify/require"
    16  	"google.golang.org/api/option"
    17  )
    18  
    19  // nolint: dupl
    20  func TestReceiverMuxHappy(t *testing.T) {
    21  	var cfg = new(server.Config)
    22  	var bannerProjectIDs = []string{"foo", "bar", "baz"}
    23  	var pollFunc = func(_ context.Context) ([]string, error) {
    24  		return bannerProjectIDs, nil
    25  	}
    26  
    27  	var got = struct {
    28  		sync.Mutex
    29  		sync.WaitGroup
    30  		attrs map[string]string
    31  	}{
    32  		attrs: make(map[string]string),
    33  	}
    34  	var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
    35  		t.Logf("message %q with attributes %v", msg.ID, msg.Attributes)
    36  		defer got.Done()
    37  		got.Lock()
    38  		defer got.Unlock()
    39  		for k, v := range msg.Attributes {
    40  			got.attrs[k] = v
    41  		}
    42  		return nil
    43  	}
    44  
    45  	// We set all the cluster's clusters.infra_status_updated_at to this time, then check that it's updated.
    46  	var start = time.Now().UTC().Truncate(time.Microsecond) // truncated due to precision https://www.postgresql.org/docs/current/datatype-datetime.html
    47  	t.Logf("start time %v", start)
    48  
    49  	// teardown psqlinjector plumbing
    50  	var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
    51  	var runErrCh = make(chan error, 1) // created when the "run" test executes.
    52  
    53  	var client *pubsub.Client
    54  
    55  	var feat = f2.NewFeature(t.Name()).
    56  		Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
    57  			return setupConfig(ctx, t, cfg)
    58  		}).
    59  		Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
    60  			return createKinformTopics(ctx, t, cfg, bannerProjectIDs...)
    61  		}).
    62  		Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
    63  			ps := pstest.FromContextT(ctx, t)
    64  			var err error
    65  			client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
    66  			require.NoError(t, err)
    67  			return ctx
    68  		}).
    69  		Test("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
    70  			var bmcfg = server.ReceiverMuxConfig{
    71  				ForemanProjectID:             cfg.ForemanProjectID,
    72  				SubscriptionID:               cfg.SubscriptionID,
    73  				TopicID:                      cfg.TopicID,
    74  				Conn:                         cfg.TestPubSubConn,
    75  				PollPeriod:                   cfg.PollBannersPeriod,
    76  				PollMaxRetries:               cfg.PollBannersMaxRetries,
    77  				PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
    78  
    79  				PollFunc: pollFunc,
    80  				Handler:  handlerFunc,
    81  			}
    82  			rm, err := server.NewReceiverMux(&bmcfg)
    83  			require.NoError(t, err)
    84  			go func() {
    85  				err := rm.Run(doneCtx)
    86  				runErrCh <- err
    87  			}()
    88  			return ctx
    89  		}).
    90  		Test("send message to each subscription", func(ctx f2.Context, t *testing.T) f2.Context {
    91  			for _, banner := range bannerProjectIDs {
    92  				got.Add(1)
    93  
    94  				var topic = client.TopicInProject(cfg.TopicID, banner)
    95  
    96  				// sanity check
    97  				exists, err := topic.Exists(ctx)
    98  				require.NoError(t, err)
    99  				require.True(t, exists)
   100  
   101  				topic.Publish(ctx, &pubsub.Message{
   102  					Data: []byte(banner),
   103  					Attributes: map[string]string{
   104  						banner: "",
   105  					},
   106  				})
   107  			}
   108  			return ctx
   109  		}).
   110  		Test("verify message from each subscription", func(ctx f2.Context, t *testing.T) f2.Context {
   111  			got.Wait()
   112  			got.Lock()
   113  			defer got.Unlock()
   114  			for _, banner := range bannerProjectIDs {
   115  				_, found := got.attrs[banner]
   116  				require.True(t, found, "did not find attribute with banner")
   117  			}
   118  			return ctx
   119  		}).
   120  		Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
   121  			stopReceiverMux()
   122  			require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
   123  			return ctx
   124  		}).
   125  		Feature()
   126  
   127  	f2f.Test(t, feat)
   128  }
   129  
   130  // nolint: dupl
   131  func TestReceiverMuxDynamicAdd(t *testing.T) {
   132  	var cfg = new(server.Config)
   133  
   134  	var bannerProjectIDs = struct {
   135  		sync.Mutex
   136  		ids []string
   137  	}{
   138  		ids: []string{"foo", "bar", "baz"},
   139  	}
   140  	var pollFunc = func(_ context.Context) ([]string, error) {
   141  		bannerProjectIDs.Lock()
   142  		defer bannerProjectIDs.Unlock()
   143  		return bannerProjectIDs.ids, nil
   144  	}
   145  
   146  	var got = struct {
   147  		sync.Mutex
   148  		sync.WaitGroup
   149  		attrs map[string]string
   150  	}{
   151  		attrs: make(map[string]string),
   152  	}
   153  	var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
   154  		t.Logf("message %q with attributes %v", msg.ID, msg.Attributes)
   155  		defer got.Done()
   156  		got.Lock()
   157  		defer got.Unlock()
   158  		for k, v := range msg.Attributes {
   159  			got.attrs[k] = v
   160  		}
   161  		return nil
   162  	}
   163  
   164  	// We set all the cluster's clusters.infra_status_updated_at to this time, then check that it's updated.
   165  	var start = time.Now().UTC().Truncate(time.Microsecond) // truncated due to precision https://www.postgresql.org/docs/current/datatype-datetime.html
   166  	t.Logf("start time %v", start)
   167  
   168  	// teardown psqlinjector plumbing
   169  	var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
   170  	var runErrCh = make(chan error, 1) // created when the "run" test executes.
   171  
   172  	var client *pubsub.Client
   173  
   174  	var feat = f2.NewFeature(t.Name()).
   175  		Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
   176  			return setupConfig(ctx, t, cfg)
   177  		}).
   178  		Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
   179  			return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
   180  		}).
   181  		Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
   182  			ps := pstest.FromContextT(ctx, t)
   183  			var err error
   184  			client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
   185  			require.NoError(t, err)
   186  			return ctx
   187  		}).
   188  		Test("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
   189  			var bmcfg = server.ReceiverMuxConfig{
   190  				ForemanProjectID:             cfg.ForemanProjectID,
   191  				SubscriptionID:               cfg.SubscriptionID,
   192  				Conn:                         cfg.TestPubSubConn,
   193  				TopicID:                      cfg.TopicID,
   194  				PollPeriod:                   cfg.PollBannersPeriod,
   195  				PollMaxRetries:               cfg.PollBannersMaxRetries,
   196  				PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
   197  
   198  				Handler:  handlerFunc,
   199  				PollFunc: pollFunc,
   200  			}
   201  			rm, err := server.NewReceiverMux(&bmcfg)
   202  			require.NoError(t, err)
   203  			go func() {
   204  				err := rm.Run(doneCtx)
   205  				runErrCh <- err
   206  			}()
   207  			return ctx
   208  		}).
   209  		Test("send message to each subscription", func(ctx f2.Context, t *testing.T) f2.Context {
   210  			var v = "one"
   211  			for _, banner := range bannerProjectIDs.ids {
   212  				got.Add(1)
   213  
   214  				var topic = client.TopicInProject(cfg.TopicID, banner)
   215  
   216  				// sanity check
   217  				exists, err := topic.Exists(ctx)
   218  				require.NoError(t, err)
   219  				require.True(t, exists)
   220  
   221  				topic.Publish(ctx, &pubsub.Message{
   222  					Data: []byte(banner),
   223  					Attributes: map[string]string{
   224  						banner: v,
   225  					},
   226  				})
   227  
   228  				got.Wait()
   229  				value, found := got.attrs[banner]
   230  				require.True(t, found, "did not find attribute with banner")
   231  				require.Equal(t, value, v)
   232  			}
   233  			return ctx
   234  		}).
   235  		Test("create new banner and verify messages", func(ctx f2.Context, t *testing.T) f2.Context {
   236  			var newBanner = "dynamic"
   237  			ctx = createKinformTopics(ctx, t, cfg, newBanner)
   238  
   239  			bannerProjectIDs.Lock()
   240  			bannerProjectIDs.ids = append(bannerProjectIDs.ids, newBanner)
   241  			bannerProjectIDs.Unlock()
   242  
   243  			<-time.After(time.Second)
   244  			got.Add(len(bannerProjectIDs.ids))
   245  			for _, banner := range bannerProjectIDs.ids {
   246  				var topic = client.TopicInProject(cfg.TopicID, banner)
   247  
   248  				// sanity check
   249  				exists, err := topic.Exists(ctx)
   250  				require.NoError(t, err)
   251  				require.True(t, exists)
   252  
   253  				topic.Publish(ctx, &pubsub.Message{
   254  					Data: []byte(banner),
   255  					Attributes: map[string]string{
   256  						banner: "two",
   257  					},
   258  				})
   259  			}
   260  
   261  			got.Wait()
   262  			for _, banner := range bannerProjectIDs.ids {
   263  				value, found := got.attrs[banner]
   264  				require.True(t, found, "did not find attribute with banner")
   265  				require.Equal(t, value, "two")
   266  			}
   267  
   268  			return ctx
   269  		}).
   270  		Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
   271  			stopReceiverMux()
   272  			require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
   273  			return ctx
   274  		}).
   275  		Feature()
   276  
   277  	f2f.Test(t, feat)
   278  }
   279  
   280  // nolint: dupl
   281  func TestReceiverMuxDynamicRemove(t *testing.T) {
   282  	var cfg = new(server.Config)
   283  
   284  	var bannerProjectIDs = struct {
   285  		sync.Mutex
   286  		ids []string
   287  	}{
   288  		ids: []string{"foo", "bar", "baz"},
   289  	}
   290  	var pollFunc = func(_ context.Context) ([]string, error) {
   291  		bannerProjectIDs.Lock()
   292  		defer bannerProjectIDs.Unlock()
   293  		return bannerProjectIDs.ids, nil
   294  	}
   295  
   296  	var got = struct {
   297  		sync.Mutex
   298  		sync.WaitGroup
   299  		attrs map[string]string
   300  	}{
   301  		attrs: make(map[string]string),
   302  	}
   303  	var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
   304  		defer got.Done()
   305  		got.Lock()
   306  		defer got.Unlock()
   307  		for k, v := range msg.Attributes {
   308  			got.attrs[k] = v
   309  		}
   310  		return nil
   311  	}
   312  
   313  	// teardown psqlinjector plumbing
   314  	var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
   315  	var runErrCh = make(chan error, 1) // created when the "run" test executes.
   316  
   317  	var client *pubsub.Client
   318  
   319  	var feat = f2.NewFeature(t.Name()).
   320  		Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
   321  			return setupConfig(ctx, t, cfg)
   322  		}).
   323  		Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
   324  			return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
   325  		}).
   326  		Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
   327  			ps := pstest.FromContextT(ctx, t)
   328  			var err error
   329  			client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
   330  			require.NoError(t, err)
   331  			return ctx
   332  		}).
   333  		Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
   334  			var bmcfg = server.ReceiverMuxConfig{
   335  				SubscriptionID:               cfg.SubscriptionID,
   336  				ForemanProjectID:             cfg.ForemanProjectID,
   337  				Conn:                         cfg.TestPubSubConn,
   338  				PollPeriod:                   cfg.PollBannersPeriod,
   339  				PollMaxRetries:               cfg.PollBannersMaxRetries,
   340  				PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
   341  				TopicID:                      cfg.TopicID,
   342  
   343  				PollFunc: pollFunc,
   344  				Handler:  handlerFunc,
   345  			}
   346  			rm, err := server.NewReceiverMux(&bmcfg)
   347  			require.NoError(t, err)
   348  			go func() {
   349  				err := rm.Run(doneCtx)
   350  				runErrCh <- err
   351  			}()
   352  			return ctx
   353  		}).
   354  		Test("delete a banner and wait until messages are no longer received", func(ctx f2.Context, t *testing.T) f2.Context {
   355  			var removed = bannerProjectIDs.ids[0]
   356  			var topic = client.TopicInProject(cfg.TopicID, removed)
   357  			exists, err := topic.Exists(ctx)
   358  			require.NoError(t, err)
   359  			require.True(t, exists)
   360  
   361  			var removeBannerOnce sync.Once
   362  			var receivedCount int
   363  			for {
   364  				got.Add(1)
   365  				topic.Publish(ctx, &pubsub.Message{
   366  					Data: []byte(removed),
   367  				})
   368  
   369  				var stillReceiving = make(chan struct{})
   370  				go func() {
   371  					got.Wait()
   372  					close(stillReceiving)
   373  					removeBannerOnce.Do(func() {
   374  						bannerProjectIDs.Lock()
   375  						bannerProjectIDs.ids = bannerProjectIDs.ids[1:]
   376  						bannerProjectIDs.Unlock()
   377  					})
   378  				}()
   379  
   380  				select {
   381  				case <-time.After(time.Second):
   382  					t.Logf("receiver mux dynamically unsubscribed")
   383  					// verify receiver mux was receiving from the subscription before it was shut down.
   384  					t.Logf("received count: %d", receivedCount)
   385  					require.True(t, receivedCount > 0)
   386  					// clean up goroutine, and block until this channel is closed, so that wg.Wait doesn't panic when it's called in the next case test.
   387  					got.Done()
   388  					<-stillReceiving
   389  					return ctx
   390  				case <-stillReceiving:
   391  					// this occurs in a few milliseconds, so 1 second is more than enough time.
   392  					receivedCount++
   393  				}
   394  			}
   395  		}).
   396  		Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
   397  			var v = "others still working"
   398  
   399  			got.Add(len(bannerProjectIDs.ids))
   400  			for _, banner := range bannerProjectIDs.ids {
   401  				var topic = client.TopicInProject(cfg.TopicID, banner)
   402  				exists, err := topic.Exists(ctx)
   403  				require.NoError(t, err)
   404  				require.True(t, exists)
   405  
   406  				topic.Publish(ctx, &pubsub.Message{
   407  					Data: []byte(banner),
   408  					Attributes: map[string]string{
   409  						banner: v,
   410  					},
   411  				})
   412  			}
   413  
   414  			got.Wait()
   415  			for _, banner := range bannerProjectIDs.ids {
   416  				value, found := got.attrs[banner]
   417  				require.True(t, found, "did not find attribute with banner")
   418  				require.Equal(t, value, v)
   419  			}
   420  			return ctx
   421  		}).
   422  		Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
   423  			stopReceiverMux()
   424  			require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
   425  			return ctx
   426  		}).
   427  		Feature()
   428  
   429  	f2f.Test(t, feat)
   430  }
   431  
   432  // nolint: dupl
   433  func TestReceiverMuxDynamicRemoveOnReceiveError(t *testing.T) {
   434  	var cfg = new(server.Config)
   435  
   436  	var bannerProjectIDs = struct {
   437  		sync.Mutex
   438  		ids []string
   439  	}{
   440  		ids: []string{"foo", "bar", "baz"},
   441  	}
   442  	var pollFunc = func(_ context.Context) ([]string, error) {
   443  		bannerProjectIDs.Lock()
   444  		defer bannerProjectIDs.Unlock()
   445  		return bannerProjectIDs.ids, nil
   446  	}
   447  
   448  	var got struct {
   449  		sync.Mutex
   450  		sync.WaitGroup
   451  	}
   452  	const markDone = "markDone"
   453  	const panicCanary = "shouldBeDropped"
   454  	var handlerFunc = func(_ context.Context, msg *pubsub.Message) error {
   455  		for k := range msg.Attributes {
   456  			switch k {
   457  			case markDone:
   458  				defer got.Done()
   459  			case panicCanary:
   460  				panic("should not have received this message")
   461  			}
   462  		}
   463  		return nil
   464  	}
   465  
   466  	// teardown psqlinjector plumbing
   467  	var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
   468  	var runErrCh = make(chan error, 1) // created when the "run" test executes.
   469  
   470  	var client *pubsub.Client
   471  
   472  	var feat = f2.NewFeature(t.Name()).
   473  		Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
   474  			return setupConfig(ctx, t, cfg)
   475  		}).
   476  		Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
   477  			return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids...)
   478  		}).
   479  		Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
   480  			ps := pstest.FromContextT(ctx, t)
   481  			var err error
   482  			client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
   483  			require.NoError(t, err)
   484  			return ctx
   485  		}).
   486  		Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
   487  			var bmcfg = server.ReceiverMuxConfig{
   488  				ForemanProjectID:             cfg.ForemanProjectID,
   489  				SubscriptionID:               cfg.SubscriptionID,
   490  				TopicID:                      cfg.TopicID,
   491  				Conn:                         cfg.TestPubSubConn,
   492  				PollMaxRetries:               cfg.PollBannersMaxRetries,
   493  				PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
   494  				PollFunc:                     pollFunc,
   495  				PollPeriod:                   cfg.PollBannersPeriod,
   496  				Handler:                      handlerFunc,
   497  			}
   498  			rm, err := server.NewReceiverMux(&bmcfg)
   499  			require.NoError(t, err)
   500  
   501  			go func() {
   502  				err := rm.Run(doneCtx)
   503  				runErrCh <- err
   504  			}()
   505  			return ctx
   506  		}).
   507  		Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
   508  			got.Add(len(bannerProjectIDs.ids))
   509  			for _, banner := range bannerProjectIDs.ids {
   510  				var topic = client.TopicInProject(cfg.TopicID, banner)
   511  				exists, err := topic.Exists(ctx)
   512  				require.NoError(t, err)
   513  				require.True(t, exists)
   514  
   515  				topic.Publish(ctx, &pubsub.Message{
   516  					Data: []byte(banner),
   517  					Attributes: map[string]string{
   518  						markDone: "yes",
   519  					},
   520  				})
   521  			}
   522  
   523  			got.Wait()
   524  			return ctx
   525  		}).
   526  		Test("delete a banner subscription and send canary message", func(ctx f2.Context, t *testing.T) f2.Context {
   527  			var broken = bannerProjectIDs.ids[0]
   528  			var topic = client.TopicInProject(cfg.TopicID, broken)
   529  			exists, err := topic.Exists(ctx)
   530  			require.NoError(t, err)
   531  			require.True(t, exists)
   532  
   533  			err = client.SubscriptionInProject(cfg.SubscriptionID, broken).Delete(ctx)
   534  			require.NoError(t, err)
   535  
   536  			// this message should never be received.
   537  			topic.Publish(ctx, &pubsub.Message{
   538  				Data: []byte(broken),
   539  				Attributes: map[string]string{
   540  					panicCanary: "messages sent before the new subscription exists should not be received",
   541  				},
   542  			})
   543  			topic.Flush()
   544  
   545  			// give time for poll to elapse
   546  			<-time.After(1 * time.Second)
   547  
   548  			// this message should never be received.
   549  			topic.Publish(ctx, &pubsub.Message{
   550  				Data: []byte(broken),
   551  				Attributes: map[string]string{
   552  					panicCanary: "messages sent before the new subscription exists should not be received",
   553  				},
   554  			})
   555  			topic.Flush()
   556  
   557  			return ctx
   558  		}).
   559  		Test("recreate the banner subscription", func(ctx f2.Context, t *testing.T) f2.Context {
   560  			var broken = bannerProjectIDs.ids[0]
   561  
   562  			ps := pstest.FromContextT(ctx, t)
   563  			client2, err := pubsub.NewClient(ctx, broken, option.WithGRPCConn(ps.Conn))
   564  			require.NoError(t, err)
   565  
   566  			var topic = client2.Topic(cfg.TopicID)
   567  			exists, err := topic.Exists(ctx)
   568  			require.NoError(t, err)
   569  			require.True(t, exists)
   570  
   571  			sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{
   572  				Topic: topic,
   573  			})
   574  			require.NoError(t, err)
   575  			exists, err = sub.Exists(ctx)
   576  			require.NoError(t, err)
   577  			require.True(t, exists)
   578  
   579  			return ctx
   580  		}).
   581  		Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
   582  			got.Add(len(bannerProjectIDs.ids))
   583  			for _, banner := range bannerProjectIDs.ids {
   584  				var topic = client.TopicInProject(cfg.TopicID, banner)
   585  				exists, err := topic.Exists(ctx)
   586  				require.NoError(t, err)
   587  				require.True(t, exists)
   588  
   589  				topic.Publish(ctx, &pubsub.Message{
   590  					Data: []byte(banner),
   591  					Attributes: map[string]string{
   592  						markDone: "yes",
   593  					},
   594  				})
   595  			}
   596  
   597  			got.Wait()
   598  			t.Logf("receiver mux is receiving from all subscriptions again")
   599  			return ctx
   600  		}).
   601  		Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
   602  			stopReceiverMux()
   603  			require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
   604  			return ctx
   605  		}).
   606  		Feature()
   607  
   608  	f2f.Test(t, feat)
   609  }
   610  
   611  // nolint: dupl
   612  func TestReceiverMuxSubDoesNotExistAtStartup(t *testing.T) {
   613  	var cfg = new(server.Config)
   614  
   615  	var bannerProjectIDs = struct {
   616  		sync.Mutex
   617  		ids []string
   618  	}{
   619  		ids: []string{"foo", "bar", "baz"},
   620  	}
   621  	var pollFunc = func(_ context.Context) ([]string, error) {
   622  		bannerProjectIDs.Lock()
   623  		defer bannerProjectIDs.Unlock()
   624  		return bannerProjectIDs.ids, nil
   625  	}
   626  
   627  	var got struct {
   628  		sync.Mutex
   629  		sync.WaitGroup
   630  	}
   631  	var handlerFunc = func(_ context.Context, _ *pubsub.Message) error {
   632  		got.Done()
   633  		return nil
   634  	}
   635  
   636  	// teardown psqlinjector plumbing
   637  	var doneCtx, stopReceiverMux = context.WithCancel(context.Background())
   638  	var runErrCh = make(chan error, 1) // created when the "run" test executes.
   639  
   640  	var client *pubsub.Client
   641  
   642  	var feat = f2.NewFeature(t.Name()).
   643  		Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
   644  			return setupConfig(ctx, t, cfg)
   645  		}).
   646  		Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
   647  			return createKinformTopics(ctx, t, cfg, bannerProjectIDs.ids[1:]...)
   648  		}).
   649  		Setup("pubsub client", func(ctx f2.Context, t *testing.T) f2.Context {
   650  			ps := pstest.FromContextT(ctx, t)
   651  			var err error
   652  			client, err = pubsub.NewClient(ctx, "fake-foreman-project", option.WithGRPCConn(ps.Conn))
   653  			require.NoError(t, err)
   654  			return ctx
   655  		}).
   656  		Setup("topic without subscription", func(ctx f2.Context, t *testing.T) f2.Context {
   657  			ps := pstest.FromContextT(ctx, t)
   658  			client2, err := pubsub.NewClient(ctx, bannerProjectIDs.ids[0], option.WithGRPCConn(ps.Conn))
   659  			require.NoError(t, err)
   660  
   661  			topic, err := client2.CreateTopic(ctx, cfg.TopicID)
   662  			require.NoError(t, err, "error creating pubsub topic")
   663  
   664  			exists, err := topic.Exists(ctx)
   665  			require.NoError(t, err, "error checking if topic exists")
   666  			require.True(t, exists, "topic does not exist")
   667  
   668  			return ctx
   669  		}).
   670  		Setup("receiver mux", func(ctx f2.Context, t *testing.T) f2.Context {
   671  			var bmcfg = server.ReceiverMuxConfig{
   672  				ForemanProjectID:             cfg.ForemanProjectID,
   673  				SubscriptionID:               cfg.SubscriptionID,
   674  				TopicID:                      cfg.TopicID,
   675  				Conn:                         cfg.TestPubSubConn,
   676  				PollPeriod:                   cfg.PollBannersPeriod,
   677  				PollMaxRetries:               cfg.PollBannersMaxRetries,
   678  				PollSubscriptionExistsPeriod: cfg.PollSubscriptionExistsPeriod,
   679  				PollFunc:                     pollFunc,
   680  				Handler:                      handlerFunc,
   681  			}
   682  			rm, err := server.NewReceiverMux(&bmcfg)
   683  			require.NoError(t, err)
   684  			go func() {
   685  				err := rm.Run(doneCtx)
   686  				runErrCh <- err
   687  			}()
   688  			return ctx
   689  		}).
   690  		Test("send message to all banners", func(ctx f2.Context, t *testing.T) f2.Context {
   691  			got.Add(len(bannerProjectIDs.ids))
   692  			for _, banner := range bannerProjectIDs.ids {
   693  				var topic = client.TopicInProject(cfg.TopicID, banner)
   694  				exists, err := topic.Exists(ctx)
   695  				require.NoError(t, err)
   696  				require.True(t, exists)
   697  
   698  				topic.Publish(ctx, &pubsub.Message{
   699  					Data: []byte(banner),
   700  				})
   701  			}
   702  
   703  			// prove that the subscription does not exist yet.
   704  			go func() {
   705  				<-time.After(time.Second)
   706  				got.Done() // this will cause a panic if the subscription actually exists.
   707  			}()
   708  
   709  			got.Wait()
   710  			return ctx
   711  		}).
   712  		Test("create the missing banner subscription", func(ctx f2.Context, t *testing.T) f2.Context {
   713  			ps := pstest.FromContextT(ctx, t)
   714  			client2, err := pubsub.NewClient(ctx, bannerProjectIDs.ids[0], option.WithGRPCConn(ps.Conn))
   715  
   716  			require.NoError(t, err)
   717  
   718  			var topic = client2.Topic(cfg.TopicID)
   719  			exists, err := topic.Exists(ctx)
   720  			require.NoError(t, err)
   721  			require.True(t, exists)
   722  
   723  			sub, err := client2.CreateSubscription(ctx, cfg.SubscriptionID, pubsub.SubscriptionConfig{
   724  				Topic: topic,
   725  			})
   726  			require.NoError(t, err)
   727  			exists, err = sub.Exists(ctx)
   728  			require.NoError(t, err)
   729  			require.True(t, exists)
   730  
   731  			return ctx
   732  		}).
   733  		Test("send message to active banners", func(ctx f2.Context, t *testing.T) f2.Context {
   734  			got.Add(len(bannerProjectIDs.ids))
   735  			for _, banner := range bannerProjectIDs.ids {
   736  				var topic = client.TopicInProject(cfg.TopicID, banner)
   737  				exists, err := topic.Exists(ctx)
   738  				require.NoError(t, err)
   739  				require.True(t, exists)
   740  
   741  				topic.Publish(ctx, &pubsub.Message{
   742  					Data: []byte(banner),
   743  				})
   744  			}
   745  
   746  			got.Wait()
   747  			t.Logf("receiver mux is receiving from all subscriptions again")
   748  			return ctx
   749  		}).
   750  		Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
   751  			stopReceiverMux()
   752  			require.NoError(t, <-runErrCh, "ReceiverMux.Run returned unexpected error")
   753  			return ctx
   754  		}).
   755  		Feature()
   756  
   757  	f2f.Test(t, feat)
   758  }
   759  

View as plain text