...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/integration/heartbeat_test.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector/integration

     1  package server_test
     2  
     3  import (
     4  	"context"
     5  	"testing"
     6  	"time"
     7  
     8  	server "edge-infra.dev/pkg/edge/psqlinjector"
     9  	"edge-infra.dev/pkg/f8n/kinform/model"
    10  	"edge-infra.dev/test/f2"
    11  
    12  	"cloud.google.com/go/pubsub"
    13  	"github.com/google/uuid"
    14  	"github.com/stretchr/testify/require"
    15  )
    16  
    17  func TestInjectorClusterHeartbeat(t *testing.T) {
    18  	var cfg = new(server.Config)
    19  	var bannerProjectIDs = []string{"foo", "bar", "baz"}
    20  
    21  	// We set all the cluster's clusters.infra_status_updated_at to this time, then check that it's updated.
    22  	var start = time.Now().UTC().Truncate(time.Microsecond) // truncated due to precision https://www.postgresql.org/docs/current/datatype-datetime.html
    23  	t.Logf("start time %v", start)
    24  
    25  	// map[project_id]
    26  	var clusterEdgeIDs map[string]uuid.UUID
    27  
    28  	// teardown psqlinjector plumbing
    29  	var doneCtx, stopInjector = context.WithCancel(context.Background())
    30  	var runErrCh chan error // created when the "run" test executes.
    31  
    32  	var feat = f2.NewFeature(t.Name()).
    33  		Setup("config", func(ctx f2.Context, t *testing.T) f2.Context {
    34  			return setupConfig(ctx, t, cfg)
    35  		}).
    36  		Setup("topics", func(ctx f2.Context, t *testing.T) f2.Context {
    37  			return createKinformTopics(ctx, t, cfg, bannerProjectIDs...)
    38  		}).
    39  		Setup("db", func(ctx f2.Context, t *testing.T) f2.Context {
    40  			clusterEdgeIDs = populateTablesWithProjectIDs(ctx, t, bannerProjectIDs...)
    41  			return ctx
    42  		}).
    43  		Test("run", func(ctx f2.Context, t *testing.T) f2.Context {
    44  			pi, err := server.New(cfg)
    45  			require.NoError(t, err, "could not create injector")
    46  
    47  			runErrCh = make(chan error, 1)
    48  			go func() {
    49  				runErrCh <- pi.Run(doneCtx)
    50  			}()
    51  			return ctx
    52  		}).
    53  		Test("send heartbeat", func(ctx f2.Context, t *testing.T) f2.Context {
    54  			return sendHeartbeats(ctx, t, cfg, start, clusterEdgeIDs)
    55  		}).
    56  		Test("verify infra_status_updated_at times", func(ctx f2.Context, t *testing.T) f2.Context {
    57  			// allow time for the messages to flow from pubsub to psqlinjector to the database.
    58  			var timeout = time.After(5 * time.Second)
    59  			for {
    60  				select {
    61  				case <-time.After(50 * time.Millisecond):
    62  				case <-timeout:
    63  					t.Fatalf("did not update status times quick enough")
    64  				}
    65  
    66  				var updated = true
    67  				var updatedTimes = selectInfraStatusUpdatedAt(ctx, t, clusterEdgeIDs)
    68  				for _, updatedAt := range updatedTimes {
    69  					if !start.Equal(updatedAt) {
    70  						updated = false
    71  					}
    72  				}
    73  				if updated {
    74  					t.Logf("found updated times %v", updatedTimes)
    75  					return ctx
    76  				}
    77  			}
    78  		}).
    79  		Teardown("stop", func(ctx f2.Context, t *testing.T) f2.Context {
    80  			stopInjector()
    81  
    82  			// before receiving from the runErrCh channel, check that the "run" test created the channel.
    83  			require.NotNil(t, runErrCh, "psqlinjector never started")
    84  
    85  			// When managers stop due to context canceled they don't return an error.
    86  			var err = <-runErrCh
    87  			require.NoErrorf(t, err, "psqlinjector.Run returned unexpected error: %v", err)
    88  
    89  			return ctx
    90  		}).
    91  		Feature()
    92  
    93  	f2f.Test(t, feat)
    94  }
    95  
    96  func sendHeartbeats(ctx f2.Context, t *testing.T, cfg *server.Config, timestamp time.Time, clusterEdgeIDs map[string]uuid.UUID) f2.Context {
    97  	// These checks ensure that the timestamp will equal the time scanned from the database.
    98  	require.True(t, timestamp.Equal(timestamp.Truncate(time.Microsecond)), "postgres has a time precision of 1 microsecond")
    99  	require.Equal(t, timestamp.Location(), time.UTC, "postgres uses UTC")
   100  
   101  	var sessionID = uuid.New()
   102  	for p, ceid := range clusterEdgeIDs {
   103  		var client = createPubSubClient(ctx, t, p)
   104  
   105  		var msg = model.ClusterHeartbeat{
   106  			Cluster:   ceid,
   107  			Timestamp: timestamp,
   108  			SessionID: sessionID,
   109  		}
   110  
   111  		var data, _ = msg.Data()
   112  
   113  		result := client.Topic(cfg.TopicID).Publish(ctx, &pubsub.Message{
   114  			Attributes: map[string]string{
   115  				model.AttrPayloadType: msg.PayloadType(),
   116  				model.AttrClusterUUID: ceid.String(),
   117  			},
   118  			Data: data,
   119  		})
   120  
   121  		_, err := result.Get(ctx)
   122  		require.NoError(t, err, "error publishing heartbeat")
   123  
   124  		client.Topic(cfg.TopicID).Flush()
   125  	}
   126  	return ctx
   127  }
   128  

View as plain text