...

Source file src/edge-infra.dev/pkg/edge/lighthouse/listener_test.go

Documentation: edge-infra.dev/pkg/edge/lighthouse

     1  package lighthouse
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"time"
     7  
     8  	"sigs.k8s.io/controller-runtime/pkg/client/fake"
     9  
    10  	pubSub "cloud.google.com/go/pubsub"
    11  
    12  	"edge-infra.dev/pkg/edge/lighthouse/config"
    13  	"edge-infra.dev/pkg/edge/lighthouse/event"
    14  	"edge-infra.dev/pkg/edge/lighthouse/testutils"
    15  	"edge-infra.dev/pkg/lib/gcp/pubsub"
    16  	"edge-infra.dev/pkg/lib/logging"
    17  	chariotTestutils "edge-infra.dev/test/framework/gcp/pubsub"
    18  )
    19  
    20  const (
    21  	clusterChannel = "cluster_events"
    22  	bannerChannel  = "banner_events"
    23  )
    24  
    25  var (
    26  	cl = fake.NewClientBuilder().Build()
    27  )
    28  
    29  func (s *Suite) TestNewWatchTower() {
    30  	lstner := NewWatchTower()
    31  	s.NotNil(lstner)
    32  }
    33  
    34  func (s *Suite) TestSetLivelinessProbe() {
    35  	lstner := NewWatchTower()
    36  	s.NotNil(lstner)
    37  	lstner.SetLivelinessProbe(time.Minute * 15)
    38  	s.Equal(lstner.GetLivelinessProbe(), time.Minute*15)
    39  }
    40  
    41  func (s *Suite) TestSetMaxConcurrent() {
    42  	lstner := NewWatchTower()
    43  	s.NotNil(lstner)
    44  	s.Equal(lstner.GetMaxConcurrent(), int64(DefaultMaxConcurrent))
    45  	lstner.SetMaxConcurrent(120)
    46  	s.Equal(lstner.GetMaxConcurrent(), int64(120))
    47  }
    48  
    49  func (s *Suite) TestSetProjectID() {
    50  	lstner := NewWatchTower()
    51  	s.NotNil(lstner)
    52  	lstner.SetProjectID(s.ProjectID)
    53  	s.Equal(lstner.GetProjectID(), s.ProjectID)
    54  }
    55  
    56  func (s *Suite) TestSetTopicID() {
    57  	lstner := NewWatchTower()
    58  	s.NotNil(lstner)
    59  	lstner.SetTopicID(s.TopicID)
    60  	s.Equal(lstner.GetTopicID(), s.TopicID)
    61  }
    62  
    63  func (s *Suite) TestRegisterChannel() {
    64  	dbCfg := s.SeededPostgres.EdgePostgres()
    65  	customDBCfg := testutils.DialMe{DB: dbCfg}
    66  	lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
    67  	s.NoError(lstner.RegisterChannel(clusterChannel))
    68  }
    69  
    70  func (s *Suite) TestUnregisterChannel() {
    71  	dbCfg := s.SeededPostgres.EdgePostgres()
    72  	customDBCfg := testutils.DialMe{DB: dbCfg}
    73  	lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
    74  	s.NoError(lstner.RegisterChannel(clusterChannel))
    75  	s.True(lstner.ChannelExists(clusterChannel))
    76  	s.NoError(lstner.UnregisterChannel(clusterChannel))
    77  	s.False(lstner.ChannelExists(clusterChannel))
    78  }
    79  
    80  func (s *Suite) TestUnregisterAllChannels() {
    81  	dbCfg := s.SeededPostgres.EdgePostgres()
    82  	customDBCfg := testutils.DialMe{DB: dbCfg}
    83  	lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
    84  	s.NoError(lstner.RegisterChannel(clusterChannel))
    85  	s.NoError(lstner.RegisterChannel(bannerChannel))
    86  	s.True(lstner.ChannelExists(clusterChannel))
    87  	s.True(lstner.ChannelExists(bannerChannel))
    88  	s.NoError(lstner.UnregisterAllChannels())
    89  	s.False(lstner.ChannelExists(clusterChannel))
    90  	s.False(lstner.ChannelExists(bannerChannel))
    91  }
    92  
    93  func (s *Suite) TestStream() {
    94  	ctx := context.Background()
    95  	insertOperation := "INSERT"
    96  	dbCfg := s.SeededPostgres.EdgePostgres()
    97  	customDBCfg := testutils.DialMe{DB: dbCfg}
    98  	_, err := config.Init(context.Background(), cl)
    99  	s.NoError(err)
   100  	lstner := NewWatchTower().AddListener(customDBCfg, customDBCfg.ConnectionString(), 0, 0, s.Log)
   101  	err = lstner.RegisterChannel(clusterChannel)
   102  	s.NoError(err)
   103  	s.True(lstner.ChannelExists(clusterChannel))
   104  	mockPubsub, err := chariotTestutils.NewMockPubsubServer()
   105  	s.NoError(err)
   106  	pubSubService, err := pubsub.NewWithOptions(context.Background(), s.ProjectID, mockPubsub)
   107  	s.NoError(err)
   108  	lstner.AddPubsubClient(&pubSubService)
   109  	pubsubTopic, err := chariotTestutils.CreateMockTopic(context.Background(), s.ProjectID, s.TopicID, mockPubsub)
   110  	s.NoError(err)
   111  	lstner.SetProjectID(s.ProjectID)
   112  	lstner.SetTopicID(pubsubTopic.ID())
   113  	go lstner.Stream(context.Background(), logging.NewLogger().WithName("lighthouse"))
   114  	_, err = s.DB.Exec("INSERT INTO clusters(cluster_name, project_id, registered, active, cluster_edge_id, banner_edge_id, bsl_site_id, location) VALUES(gen_random_uuid(), gen_random_uuid(), true, true, gen_random_uuid(), '3396a52c-6a22-4049-9593-5a63b596a100', 'bsl_site_id', 'us-east-1')")
   115  	s.NoError(err)
   116  	go func() {
   117  		for {
   118  			subscription, err := pubSubService.CreateSubscription(ctx, pubsubTopic.ID(), s.SubscriptionID, s.AckDeadline, "")
   119  			s.NoError(err)
   120  			err = subscription.Receive(ctx, func(_ context.Context, msg *pubSub.Message) {
   121  				evt := event.New()
   122  				err = json.Unmarshal(msg.Data, &evt)
   123  				s.NoError(err)
   124  				s.Equal(evt.Channel, clusterChannel)
   125  				s.Equal(evt.Operation, insertOperation)
   126  				s.NotNil(evt.ClusterInfraClusterEdgeID)
   127  				msg.Ack()
   128  			})
   129  			s.NoError(err)
   130  		}
   131  	}()
   132  }
   133  

View as plain text