...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/integration/daemon_test.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion/integration

     1  package integration
     2  
     3  import (
     4  	"fmt"
     5  	"testing"
     6  	"time"
     7  
     8  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
     9  	"edge-infra.dev/pkg/edge/datasync/cushion"
    10  	"edge-infra.dev/test/f2"
    11  	"edge-infra.dev/test/f2/x/ktest"
    12  
    13  	"github.com/go-kivik/kivik/v4"
    14  	"gotest.tools/v3/assert"
    15  
    16  	corev1 "k8s.io/api/core/v1"
    17  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    18  	"sigs.k8s.io/controller-runtime/pkg/client"
    19  )
    20  
    21  func TestPubSubReceiverHandle(t *testing.T) {
    22  	var daemon *cushion.Daemon
    23  
    24  	feat := f2.NewFeature("Daemon Test").
    25  		Setup("Daemon Setup", func(ctx f2.Context, t *testing.T) f2.Context {
    26  			log.Info("Daemon Setup")
    27  
    28  			err := cl.Create(ctx, &corev1.Namespace{
    29  				TypeMeta:   metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"},
    30  				ObjectMeta: metav1.ObjectMeta{Name: cushionNS},
    31  			})
    32  			assert.NilError(t, client.IgnoreAlreadyExists(err), "fail to create daemon")
    33  
    34  			daemon, err = cushion.NewDaemon(cfg, opt...)
    35  			assert.NilError(t, err, "fail to create daemon")
    36  
    37  			return ctx
    38  		}).
    39  		Test("Verify PubSubReceiverHandle", func(ctx f2.Context, t *testing.T) f2.Context {
    40  			log.Info("Verifying PubSubReceiverHandle")
    41  
    42  			go func() {
    43  				err := daemon.Run(ctx)
    44  				assert.NilError(t, err, "fail to run daemon")
    45  			}()
    46  
    47  			return ctx
    48  		}).
    49  		Test("Send Datasync PubSub  Message", func(ctx f2.Context, t *testing.T) f2.Context {
    50  			log.Info("Send PubSub Message")
    51  
    52  			err := publisher.Send(ctx, cfg.Topic, tlog, map[string]string{
    53  				"tenant_id": cfg.BannerBSLID,
    54  				"db_name":   dbName(ctx),
    55  				"entity_id": ctx.RunID,
    56  			})
    57  			assert.NilError(t, err, "failed to send message")
    58  
    59  			return ctx
    60  		}).
    61  		Test("CouchDBDatabase Created", func(ctx f2.Context, t *testing.T) f2.Context {
    62  			log.Info("CouchDBDatabase Created")
    63  
    64  			k := ktest.FromContextT(ctx, t)
    65  			couchDB := &dsapi.CouchDBDatabase{
    66  				TypeMeta: metav1.TypeMeta{
    67  					APIVersion: dsapi.GroupVersion.String(),
    68  					Kind:       "CouchDBDatabase",
    69  				},
    70  				ObjectMeta: metav1.ObjectMeta{
    71  					Name:      cushion.K8sDBName(dbName(ctx)),
    72  					Namespace: cushionNS,
    73  				},
    74  			}
    75  			k.WaitOn(t, k.ObjExists(couchDB))
    76  
    77  			return ctx
    78  		}).
    79  		Test("Verify Data In CouchDB", func(ctx f2.Context, t *testing.T) f2.Context {
    80  			log.Info("Verify Data In CouchDB")
    81  
    82  			k := ktest.FromContextT(ctx, t)
    83  
    84  			db := c.Client.DB(dbName(ctx))
    85  
    86  			timeout := time.After(k.Timeout)
    87  
    88  			var row *kivik.Document
    89  		loop:
    90  			for {
    91  				select {
    92  				case <-timeout:
    93  					log.Info("timeout waiting for DB", "timeout", k.Timeout)
    94  					break loop
    95  				default:
    96  					row = db.Get(ctx, ctx.RunID)
    97  					if row.Err() == nil {
    98  						break loop
    99  					}
   100  					time.Sleep(k.Tick)
   101  				}
   102  			}
   103  
   104  			assert.NilError(t, row.Err(), "invalid response from couchdb")
   105  
   106  			// todo compare doc with updated ID
   107  			return ctx
   108  		}).
   109  		Feature()
   110  	f.Test(t, feat)
   111  }
   112  
   113  func dbName(ctx f2.Context) string {
   114  	return fmt.Sprintf("db%s", ctx.RunID)
   115  }
   116  

View as plain text