package couchctl import ( "context" "strings" "testing" "github.com/go-logr/logr" "github.com/google/uuid" "gotest.tools/v3/assert" "gotest.tools/v3/poll" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/lib/fog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" ) var ( username = "admin" password = "password" couchURL = "http://localhost:5984" _ Changes = (*ChangesIter)(nil) ) type ChangesResults struct { Results []ChangesResult `json:"results"` } type ChangesResult struct { Seq string `json:"seq"` ID string `json:"id"` Changes []struct { Rev string `json:"rev"` } `json:"changes"` Doc map[string]string `json:"doc"` } type ChangesIter struct { changes ChangesResults index int err error } func NewChangesIter(results ...ChangesResult) *ChangesIter { return &ChangesIter{ changes: ChangesResults{ Results: results, }, index: -1, } } func (f *ChangesIter) Next() bool { f.index++ if len(f.changes.Results) == 0 || f.index >= len(f.changes.Results) { return false } return true } func (f *ChangesIter) Err() error { return f.err } func (f *ChangesIter) ID() string { return f.changes.Results[f.index].ID } func (f *ChangesIter) Changes() []string { changes := f.changes.Results[f.index].Changes results := make([]string, len(changes)) for i, c := range changes { results[i] = c.Rev } return results } func (f *ChangesIter) AsChangesFunc() ChangesFunc { return func(_ context.Context, _, _, _ string) (Changes, error) { return f, nil } } func TestReplicationEvent(t *testing.T) { cfg := Config{BannerEdgeID: uuid.NewString()} ctx, cancel := context.WithCancel(context.Background()) ctx = logr.NewContext(ctx, fog.New()) q := &controllertest.Queue{Interface: workqueue.New()} re := NewReplicationEvent(&cfg) re.changesFunc = NewChangesIter(mockChangesResults()...).AsChangesFunc() err := re.Start(ctx, q) assert.NilError(t, err, "failed to start replication event") assert.Equal(t, q.Len(), 0, "queue should be empty before listen is called") repl1 := &dsapi.CouchDBReplicationSet{ObjectMeta: metav1.ObjectMeta{Name: "test1", Namespace: "test"}} err = re.Listen(repl1, username, password, couchURL) assert.NilError(t, err, "replication changes failed") poll.WaitOn(t, func(_ poll.LogT) poll.Result { re.Lock() defer re.Unlock() l := len(re.cache) if l == 1 { return poll.Success() } return poll.Continue("invalid queue length: %d, wanted 1", l) }) assert.Equal(t, re.cache[repl1.Namespace+"/"+repl1.Name], username+":"+password+"@"+couchURL) re.changesFunc = NewChangesIter(mockChangesResults()...).AsChangesFunc() repl2 := &dsapi.CouchDBReplicationSet{ObjectMeta: metav1.ObjectMeta{Name: "test2", Namespace: "test"}} err = re.Listen(repl2, username, password, couchURL) assert.NilError(t, err, "failed to listen to replication changes") poll.WaitOn(t, func(_ poll.LogT) poll.Result { re.Lock() defer re.Unlock() l := len(re.cache) if l == 2 { return poll.Success() } return poll.Continue("invalid queue length: %d, wanted 2", l) }) assert.Equal(t, re.cache[repl2.Namespace+"/"+repl2.Name], username+":"+password+"@"+couchURL) poll.WaitOn(t, func(_ poll.LogT) poll.Result { if q.Len() == 2 { return poll.Success() } return poll.Continue("queue length should be 2. length: %d", q.Len()) }) cancel() poll.WaitOn(t, func(_ poll.LogT) poll.Result { re.Lock() defer re.Unlock() l := len(re.cache) if l == 0 { return poll.Success() } return poll.Continue("on cancel, cache should be empty. length: %d", l) }) } func mockChangesResults() []ChangesResult { var results []ChangesResult uid := strings.Replace(uuid.NewString(), "-", "", -1) results = append(results, ChangesResult{ Seq: "1", ID: "repl_doc", Changes: []struct { Rev string `json:"rev"` }{ {Rev: "1-" + uid}, }, Doc: map[string]string{"_id": "repl_doc", "_rev": "1-" + uid}, }) uid = strings.Replace(uuid.NewString(), "-", "", -1) results = append(results, ChangesResult{ Seq: "2", ID: "repl_doc", Changes: []struct { Rev string `json:"rev"` }{ {Rev: "2-" + uid}, }, Doc: map[string]string{"_id": "repl_doc", "_rev": "2-" + uid}, }) return results }