...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/replication_event_test.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"strings"
     6  	"testing"
     7  
     8  	"github.com/go-logr/logr"
     9  	"github.com/google/uuid"
    10  
    11  	"gotest.tools/v3/assert"
    12  	"gotest.tools/v3/poll"
    13  
    14  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    15  	"edge-infra.dev/pkg/lib/fog"
    16  
    17  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    18  	"k8s.io/client-go/util/workqueue"
    19  
    20  	"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
    21  )
    22  
    23  var (
    24  	username         = "admin"
    25  	password         = "password"
    26  	couchURL         = "http://localhost:5984"
    27  	_        Changes = (*ChangesIter)(nil)
    28  )
    29  
    30  type ChangesResults struct {
    31  	Results []ChangesResult `json:"results"`
    32  }
    33  
    34  type ChangesResult struct {
    35  	Seq     string `json:"seq"`
    36  	ID      string `json:"id"`
    37  	Changes []struct {
    38  		Rev string `json:"rev"`
    39  	} `json:"changes"`
    40  	Doc map[string]string `json:"doc"`
    41  }
    42  
    43  type ChangesIter struct {
    44  	changes ChangesResults
    45  	index   int
    46  	err     error
    47  }
    48  
    49  func NewChangesIter(results ...ChangesResult) *ChangesIter {
    50  	return &ChangesIter{
    51  		changes: ChangesResults{
    52  			Results: results,
    53  		},
    54  		index: -1,
    55  	}
    56  }
    57  
    58  func (f *ChangesIter) Next() bool {
    59  	f.index++
    60  	if len(f.changes.Results) == 0 ||
    61  		f.index >= len(f.changes.Results) {
    62  		return false
    63  	}
    64  	return true
    65  }
    66  
    67  func (f *ChangesIter) Err() error {
    68  	return f.err
    69  }
    70  
    71  func (f *ChangesIter) ID() string {
    72  	return f.changes.Results[f.index].ID
    73  }
    74  
    75  func (f *ChangesIter) Changes() []string {
    76  	changes := f.changes.Results[f.index].Changes
    77  	results := make([]string, len(changes))
    78  	for i, c := range changes {
    79  		results[i] = c.Rev
    80  	}
    81  	return results
    82  }
    83  
    84  func (f *ChangesIter) AsChangesFunc() ChangesFunc {
    85  	return func(_ context.Context, _, _, _ string) (Changes, error) {
    86  		return f, nil
    87  	}
    88  }
    89  
    90  func TestReplicationEvent(t *testing.T) {
    91  	cfg := Config{BannerEdgeID: uuid.NewString()}
    92  
    93  	ctx, cancel := context.WithCancel(context.Background())
    94  	ctx = logr.NewContext(ctx, fog.New())
    95  	q := &controllertest.Queue{Interface: workqueue.New()}
    96  
    97  	re := NewReplicationEvent(&cfg)
    98  	re.changesFunc = NewChangesIter(mockChangesResults()...).AsChangesFunc()
    99  
   100  	err := re.Start(ctx, q)
   101  	assert.NilError(t, err, "failed to start replication event")
   102  
   103  	assert.Equal(t, q.Len(), 0, "queue should be empty before listen is called")
   104  
   105  	repl1 := &dsapi.CouchDBReplicationSet{ObjectMeta: metav1.ObjectMeta{Name: "test1", Namespace: "test"}}
   106  	err = re.Listen(repl1, username, password, couchURL)
   107  	assert.NilError(t, err, "replication changes failed")
   108  
   109  	poll.WaitOn(t, func(_ poll.LogT) poll.Result {
   110  		re.Lock()
   111  		defer re.Unlock()
   112  		l := len(re.cache)
   113  		if l == 1 {
   114  			return poll.Success()
   115  		}
   116  		return poll.Continue("invalid queue length: %d, wanted 1", l)
   117  	})
   118  	assert.Equal(t, re.cache[repl1.Namespace+"/"+repl1.Name], username+":"+password+"@"+couchURL)
   119  
   120  	re.changesFunc = NewChangesIter(mockChangesResults()...).AsChangesFunc()
   121  	repl2 := &dsapi.CouchDBReplicationSet{ObjectMeta: metav1.ObjectMeta{Name: "test2", Namespace: "test"}}
   122  	err = re.Listen(repl2, username, password, couchURL)
   123  	assert.NilError(t, err, "failed to listen to replication changes")
   124  
   125  	poll.WaitOn(t, func(_ poll.LogT) poll.Result {
   126  		re.Lock()
   127  		defer re.Unlock()
   128  		l := len(re.cache)
   129  		if l == 2 {
   130  			return poll.Success()
   131  		}
   132  		return poll.Continue("invalid queue length: %d, wanted 2", l)
   133  	})
   134  	assert.Equal(t, re.cache[repl2.Namespace+"/"+repl2.Name], username+":"+password+"@"+couchURL)
   135  
   136  	poll.WaitOn(t, func(_ poll.LogT) poll.Result {
   137  		if q.Len() == 2 {
   138  			return poll.Success()
   139  		}
   140  		return poll.Continue("queue length should be 2. length: %d", q.Len())
   141  	})
   142  
   143  	cancel()
   144  	poll.WaitOn(t, func(_ poll.LogT) poll.Result {
   145  		re.Lock()
   146  		defer re.Unlock()
   147  		l := len(re.cache)
   148  		if l == 0 {
   149  			return poll.Success()
   150  		}
   151  		return poll.Continue("on cancel, cache should be empty. length: %d", l)
   152  	})
   153  }
   154  
   155  func mockChangesResults() []ChangesResult {
   156  	var results []ChangesResult
   157  
   158  	uid := strings.Replace(uuid.NewString(), "-", "", -1)
   159  	results = append(results, ChangesResult{
   160  		Seq: "1",
   161  		ID:  "repl_doc",
   162  		Changes: []struct {
   163  			Rev string `json:"rev"`
   164  		}{
   165  			{Rev: "1-" + uid},
   166  		},
   167  		Doc: map[string]string{"_id": "repl_doc", "_rev": "1-" + uid},
   168  	})
   169  
   170  	uid = strings.Replace(uuid.NewString(), "-", "", -1)
   171  	results = append(results, ChangesResult{
   172  		Seq: "2",
   173  		ID:  "repl_doc",
   174  		Changes: []struct {
   175  			Rev string `json:"rev"`
   176  		}{
   177  			{Rev: "2-" + uid},
   178  		},
   179  		Doc: map[string]string{"_id": "repl_doc", "_rev": "2-" + uid},
   180  	})
   181  
   182  	return results
   183  }
   184  

View as plain text