...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/integration/cushion_test.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion/integration

     1  package integration
     2  
     3  import (
     4  	"context"
     5  	_ "embed"
     6  	"fmt"
     7  	"os"
     8  	"strings"
     9  	"testing"
    10  	"time"
    11  
    12  	"cloud.google.com/go/pubsub"
    13  
    14  	"edge-infra.dev/pkg/edge/chariot"
    15  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    16  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    17  	"edge-infra.dev/pkg/edge/datasync/cushion"
    18  	"edge-infra.dev/pkg/lib/fog"
    19  	epubsub "edge-infra.dev/pkg/lib/gcp/pubsub"
    20  	"edge-infra.dev/test/f2"
    21  	"edge-infra.dev/test/f2/integration"
    22  	"edge-infra.dev/test/f2/x/ktest"
    23  
    24  	kouchdb "github.com/go-kivik/kivik/v4/couchdb"
    25  	"github.com/go-logr/logr"
    26  	"github.com/google/uuid"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    29  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    30  	"sigs.k8s.io/controller-runtime/pkg/client"
    31  )
    32  
    33  const cushionNS = "cushion"
    34  
    35  var (
    36  	log logr.Logger
    37  	f   f2.Framework
    38  	_   chariot.IPubSubReceiver = (*PubSubMock)(nil)
    39  	_   epubsub.Publisher       = (*PubSubMock)(nil)
    40  
    41  	cfg *cushion.Config
    42  	cl  client.WithWatch
    43  	c   *couchdb.CouchDB
    44  	opt []cushion.Option
    45  
    46  	psm       *PubSubMock
    47  	publisher epubsub.Publisher
    48  
    49  	//go:embed testdata/tlog.json
    50  	tlog []byte
    51  )
    52  
    53  func TestMain(m *testing.M) {
    54  	log = fog.New().WithName("cushion_integration_test")
    55  
    56  	f = f2.New(
    57  		context.Background(),
    58  		f2.WithExtensions(
    59  			ktest.New(
    60  				ktest.WithScheme(createScheme()),
    61  			),
    62  		),
    63  	)
    64  
    65  	f.Setup(func(ctx f2.Context) (f2.Context, error) {
    66  		k, err := ktest.FromContext(ctx)
    67  		if err != nil {
    68  			log.Error(err, "fail to get k8s from context")
    69  			return ctx, err
    70  		}
    71  
    72  		log.Info("Initial setup done")
    73  
    74  		cl, err = client.NewWithWatch(k.Env.Config, client.Options{Scheme: createScheme()})
    75  		if err != nil {
    76  			log.Error(err, "fail to create client")
    77  			return ctx, err
    78  		}
    79  
    80  		if integration.IsL1() {
    81  			err = mockCushionDaemonOptions()
    82  		} else {
    83  			err = buildCushionDaemonOptions(ctx)
    84  		}
    85  
    86  		if err != nil {
    87  			log.Error(err, "fail to get cushion daemon options", "isL1", integration.IsL1())
    88  			return ctx, err
    89  		}
    90  
    91  		return ctx, nil
    92  	}).Teardown(func(ctx f2.Context) (f2.Context, error) {
    93  		if psm != nil {
    94  			psm.Close()
    95  		}
    96  		return ctx, nil
    97  	})
    98  
    99  	os.Exit(f.Run(m))
   100  }
   101  
   102  type PubSubMock struct{ msg chan chariot.IPubSubMessage }
   103  
   104  func NewPubSubReceiver() *PubSubMock {
   105  	return &PubSubMock{
   106  		msg: make(chan chariot.IPubSubMessage),
   107  	}
   108  }
   109  
   110  func (l PubSubMock) Receive(ctx context.Context, f func(context.Context, chariot.IPubSubMessage)) error {
   111  	for message := range l.msg {
   112  		f(ctx, message)
   113  	}
   114  	return nil
   115  }
   116  
   117  // Send pub/sub message
   118  func (l PubSubMock) Send(_ context.Context, _ string, message []byte, attributes map[string]string) error {
   119  	l.msg <- chariot.NewPubSubMessageFromMessage(&pubsub.Message{
   120  		ID:         attributes["entity_id"],
   121  		Data:       message,
   122  		Attributes: attributes,
   123  	})
   124  	return nil
   125  }
   126  
   127  func (l PubSubMock) Close() {
   128  	close(l.msg)
   129  }
   130  
   131  func mockCushionDaemonOptions() error {
   132  	couchServer := NewMockCouchDBServer()
   133  	cfg = mockConfig(couchServer.Server.URL)
   134  	psm = NewPubSubReceiver()
   135  	publisher = psm
   136  	c = &couchdb.CouchDB{}
   137  	err := c.NewFromURL(cfg.CouchUsername, cfg.CouchPassword, cfg.CouchURL, kouchdb.OptionNoRequestCompression())
   138  	if err != nil {
   139  		return err
   140  	}
   141  
   142  	opt = []cushion.Option{
   143  		cushion.OptionLogger(log),
   144  		cushion.OptionPubSubReceiver(psm),
   145  		cushion.OptionKubeClient(cl),
   146  		cushion.OptionCouchDBStorage(c.Client),
   147  		cushion.OptionDatabaseStatus(cushion.DatabaseStatusFunc(
   148  			func(_ context.Context, k8sDB *dsapi.CouchDBDatabase, _ time.Duration) error {
   149  				log.Info("database status called", "k8sDB", k8sDB.Namespace, "db", k8sDB.Spec.Name)
   150  				couchServer.MockDB(k8sDB.Spec.Name)
   151  				return c.CreateDB(context.Background(), k8sDB.Spec.Name)
   152  			})),
   153  	}
   154  	return nil
   155  }
   156  
   157  func buildCushionDaemonOptions(ctx context.Context) error {
   158  	var err error
   159  	cfg, err = cushion.NewConfig()
   160  	if err != nil {
   161  		log.Error(err, "fail to create new config")
   162  		return err
   163  	}
   164  	opt, err = cushion.DaemonOptions(cfg, log)
   165  	if err != nil {
   166  		log.Error(err, "fail to create daemon options")
   167  		return err
   168  	}
   169  	publisher, err = epubsub.New(ctx, cfg.ProjectID)
   170  	if err != nil {
   171  		log.Error(err, "fail to create new pubsub sender")
   172  		return err
   173  	}
   174  	c = &couchdb.CouchDB{}
   175  	couchURL := fmt.Sprintf("http://%s:%s", cfg.CouchURL, cfg.CouchPort)
   176  	return c.NewFromURL(cfg.CouchUsername, cfg.CouchPassword, couchURL)
   177  }
   178  
   179  func createScheme() *runtime.Scheme {
   180  	scheme := runtime.NewScheme()
   181  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   182  	utilruntime.Must(dsapi.AddToScheme(scheme))
   183  	return scheme
   184  }
   185  
   186  func mockConfig(url string) *cushion.Config {
   187  	uid := uuid.NewString()
   188  	id := strings.ReplaceAll(uid, "-", "")
   189  	return &cushion.Config{
   190  		Port:            8080,
   191  		BannerBSLID:     id,
   192  		BannerEdgeID:    uid,
   193  		ProjectID:       "ret-edge-test-foreman",
   194  		Topic:           "data-sync-c2e",
   195  		Subscription:    id,
   196  		MaxMessages:     10,
   197  		CouchURL:        url,
   198  		CouchPort:       "5984",
   199  		CouchUsername:   cushionNS,
   200  		CouchPassword:   "randompassword",
   201  		BulkSize:        1,
   202  		MaxWaitInterval: 1 * time.Second,
   203  	}
   204  }
   205  

View as plain text