...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/couchctl_test.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"path/filepath"
     9  	"strconv"
    10  	"strings"
    11  	"testing"
    12  	"time"
    13  
    14  	"github.com/google/uuid"
    15  
    16  	appsv1 "k8s.io/api/apps/v1"
    17  	corev1 "k8s.io/api/core/v1"
    18  	netv1 "k8s.io/api/networking/v1"
    19  	k8errors "k8s.io/apimachinery/pkg/api/errors"
    20  	"k8s.io/apimachinery/pkg/api/resource"
    21  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    22  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    23  	"k8s.io/apimachinery/pkg/types"
    24  	"k8s.io/client-go/rest"
    25  	"k8s.io/client-go/tools/clientcmd"
    26  	"k8s.io/client-go/util/homedir"
    27  	ctrl "sigs.k8s.io/controller-runtime"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    30  
    31  	"edge-infra.dev/pkg/edge/bsl"
    32  	"edge-infra.dev/pkg/edge/constants/api/cluster"
    33  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    34  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    35  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    36  	"edge-infra.dev/pkg/edge/info"
    37  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    38  	"edge-infra.dev/pkg/k8s/testing/kmp"
    39  	"edge-infra.dev/pkg/lib/fog"
    40  	v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
    41  	nodemeta "edge-infra.dev/pkg/sds/ien/node"
    42  	"edge-infra.dev/test/f2"
    43  	"edge-infra.dev/test/f2/integration"
    44  	"edge-infra.dev/test/f2/x/ktest"
    45  )
    46  
    47  const (
    48  	_fleetType   = "fleet-type"
    49  	_clusterType = "cluster-type"
    50  	defaultHost  = "localhost"
    51  )
    52  
    53  var (
    54  	f f2.Framework
    55  
    56  	// flags
    57  	laneNumber string
    58  	//couchdbPort string
    59  	clean bool
    60  
    61  	couchCtlConfig        *Config
    62  	couchDBServer         *dsapi.CouchDBServer
    63  	touchpointSever       *dsapi.CouchDBServer
    64  	couchDBDatabase       *dsapi.CouchDBDatabase
    65  	couchDBDUser          *dsapi.CouchDBUser
    66  	couchDBIndex          *dsapi.CouchDBIndex
    67  	couchDBDesignDoc      *dsapi.CouchDBDesignDoc
    68  	couchDBReplicationSet *dsapi.CouchDBReplicationSet
    69  	couchDBPersistence    *dsapi.CouchDBPersistence
    70  	_secretManager        secretManager
    71  )
    72  
    73  func init() {
    74  	couchCtlConfig = &Config{}
    75  	couchCtlConfig.BindFlags(f2.Flags)
    76  	f2.Flags.StringVar(&laneNumber, "lane-number", "1", "the lane number for touchpoint")
    77  	f2.Flags.BoolVar(&clean, "clean", false, "delete couchdb data before starting testing")
    78  }
    79  
    80  func TestMain(m *testing.M) {
    81  	ctrl.SetLogger(fog.New())
    82  
    83  	ctxMgr := contextAwareManager{cfg: couchCtlConfig}
    84  
    85  	f = f2.New(context.Background(), f2.WithExtensions(ktest.New(ktest.WithCtrlManager(ctxMgr.createManager))))
    86  	// f2 parses command line args, but not env variables
    87  	if err := ensureValidConfig(); err != nil {
    88  		ctrl.Log.Error(err, "invalid config values:")
    89  		os.Exit(1)
    90  	}
    91  	if f2.Labels == nil {
    92  		f2.Labels = map[string]string{}
    93  	}
    94  	f2.Labels[_fleetType] = couchCtlConfig.FleetType
    95  	f2.Labels[_clusterType] = couchCtlConfig.ClusterType
    96  
    97  	cctx, cancel := context.WithCancel(context.Background())
    98  	defer cancel()
    99  
   100  	var replicationEvent *ReplicationEvent
   101  
   102  	f.Setup(func(ctx f2.Context) (f2.Context, error) {
   103  		k, err := ktest.FromContext(ctx)
   104  		if err != nil {
   105  			return ctx, err
   106  		}
   107  		resourceName := fmt.Sprintf("couchdb-%s", couchCtlConfig.NodeUID)
   108  		laneUID := uuid.NewString()
   109  		ctrl.Log.Info("Setting up Test", "resource", resourceName, "touchpoint", laneUID)
   110  
   111  		couchDBServer = newCouchdbServer(resourceName)
   112  		touchpointSever = dsapi.NewTouchpointCouchDBServer(laneUID, laneNumber)
   113  		couchDBDatabase = newCouchDBDatabase(resourceName, couchDBServer)
   114  		couchDBDUser = newCouchDBUser(resourceName, fmt.Sprintf("username-%s", ctx.RunID), couchDBServer)
   115  		couchDBReplicationSet = newCouchDBReplicationSet(resourceName, couchDBServer)
   116  		couchDBPersistence = newCouchDBPersistence(ctx.RunID)
   117  		couchDBIndex = newCouchDBIndex(resourceName, couchDBDatabase)
   118  		couchDBDesignDoc = newCouchDBDesignDoc(resourceName, couchDBDatabase)
   119  
   120  		server := mockInterlockClient(cctx)
   121  
   122  		// Mock resources if running locally
   123  		if integration.IsL1() { //nolint:nestif
   124  			if err = localTestResources(ctx, k.Client, couchDBServer); err != nil {
   125  				return ctx, err
   126  			}
   127  		} else {
   128  			_secretManager = &gcpSecretManager{}
   129  			// Suspend CouchCTL Pallet
   130  			err := suspendPallet(ctx, k.Client, "couchctl-", true)
   131  			if client.IgnoreNotFound(err) != nil {
   132  				return ctx, err
   133  			}
   134  			err = scaleDeployment(ctx, k.Client, types.NamespacedName{
   135  				Name:      couchCtlConfig.CouchCTLNamespace,
   136  				Namespace: couchCtlConfig.CouchCTLNamespace,
   137  			}, 0)
   138  			if client.IgnoreNotFound(err) != nil {
   139  				return ctx, err
   140  			}
   141  
   142  			// delete resources created by couchdb
   143  			if clean {
   144  				err = cleanUpBeforeTest(ctx, k.Client, k.Timeout, k.Tick)
   145  			} else {
   146  				err = resetBeforeTest(ctx, k.Client)
   147  			}
   148  			if err != nil {
   149  				return ctx, err
   150  			}
   151  
   152  			err = client.IgnoreAlreadyExists(k.Client.Create(ctx, couchDBServer))
   153  			if err != nil {
   154  				return ctx, err
   155  			}
   156  		}
   157  		couchCtlConfig.InterlockAPIURL = server.URL
   158  		rle := PersistenceLeaderElectorFunc(func() bool { return true })
   159  		nrp := NodeResourcePredicateFunc(func(*Config, client.Object) bool { return true })
   160  		replicationEvent = NewReplicationEvent(couchCtlConfig)
   161  		replicationEvent.changesFunc = func(_ context.Context, _, _, _ string) (Changes, error) {
   162  			return NewChangesIter(mockChangesResults()...), nil
   163  		}
   164  		replicationEvent.isRetryableError = func(_ error) bool { return false }
   165  		err = setupControllers(k.Manager, couchCtlConfig, _secretManager, rle, nrp, replicationEvent)
   166  		if err != nil {
   167  			return ctx, err
   168  		}
   169  		return ctx, nil
   170  	}).Teardown(func(ctx f2.Context) (f2.Context, error) {
   171  		if replicationEvent != nil {
   172  			replicationEvent.Stop()
   173  		}
   174  		// if its an L1 test dont teardown since its being mocked
   175  		if integration.IsL1() {
   176  			return ctx, nil
   177  		}
   178  
   179  		k, err := ktest.FromContext(ctx)
   180  		if err != nil {
   181  			return ctx, err
   182  		}
   183  		err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBReplicationSet))
   184  		if err != nil {
   185  			return ctx, err
   186  		}
   187  		err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBDUser))
   188  		if err != nil {
   189  			return ctx, err
   190  		}
   191  		err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBDatabase))
   192  		if err != nil {
   193  			return ctx, err
   194  		}
   195  		err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBServer))
   196  		if err != nil {
   197  			return ctx, err
   198  		}
   199  		err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBPersistence))
   200  		if err != nil {
   201  			return ctx, err
   202  		}
   203  
   204  		return ctx, client.IgnoreNotFound(suspendPallet(ctx, k.Client, "couchctl-", false))
   205  	})
   206  
   207  	os.Exit(f.Run(m))
   208  }
   209  
   210  func newCouchDBDesignDoc(name string, database *dsapi.CouchDBDatabase) *dsapi.CouchDBDesignDoc {
   211  	return &dsapi.CouchDBDesignDoc{
   212  		ObjectMeta: metav1.ObjectMeta{
   213  			Name:      name,
   214  			Namespace: database.Namespace,
   215  		},
   216  		Spec: dsapi.CouchDBDesignDocSpec{
   217  			DB: database.Name,
   218  			ID: "test",
   219  			DesignDoc: dsapi.DesignDoc{
   220  				Views: map[string]map[string]string{
   221  					"test": {
   222  						"map": "function(doc) { if (doc.type === 'test') { emit(doc._id, doc); } }",
   223  					},
   224  				},
   225  			},
   226  		},
   227  	}
   228  }
   229  
   230  func newCouchDBIndex(name string, database *dsapi.CouchDBDatabase) *dsapi.CouchDBIndex {
   231  	return &dsapi.CouchDBIndex{
   232  		ObjectMeta: metav1.ObjectMeta{
   233  			Name:      name,
   234  			Namespace: database.Namespace,
   235  		},
   236  		Spec: dsapi.CouchDBIndexSpec{
   237  			DB:   database.Name,
   238  			DDoc: "test",
   239  			Name: "test",
   240  			Type: "json",
   241  			Index: dsapi.Index{
   242  				Fields:                []string{"referenceId"},
   243  				PartialFilterSelector: `{"referenceId": {"$gt": "10000"},"limit": 2,"skip": 0}`,
   244  			},
   245  		},
   246  	}
   247  }
   248  
   249  func testReady(o client.Object) f2.StepFn {
   250  	return func(ctx f2.Context, t *testing.T) f2.Context {
   251  		k := ktest.FromContextT(ctx, t)
   252  		k.WaitOn(t, k.Check(o, kmp.IsReady()))
   253  		return ctx
   254  	}
   255  }
   256  
   257  func newCouchdbServer(name string) *dsapi.CouchDBServer {
   258  	var server *dsapi.CouchDBServer
   259  	if couchCtlConfig.FleetType == fleet.CouchDB {
   260  		server = dsapi.NewAdminCouchDBServer()
   261  	} else {
   262  		server = dsapi.NewStoreCouchDBServer()
   263  		server.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue
   264  		server.Labels[couchdb.NodeUIDLabel] = couchCtlConfig.NodeUID
   265  	}
   266  	server.Name = name
   267  	// touchpoint CouchDBServer is created by PersistenceController
   268  	return server
   269  }
   270  
   271  func localTestResources(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) error {
   272  	server.Spec.URI = defaultHost
   273  	touchpointSever.Spec.URI = defaultHost
   274  
   275  	nsCouchDB := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: server.Namespace}}
   276  	err := client.IgnoreAlreadyExists(cl.Create(ctx, nsCouchDB))
   277  	if err != nil {
   278  		return err
   279  	}
   280  
   281  	nsCouchCTL := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ControllerNamespace}}
   282  	err = client.IgnoreAlreadyExists(cl.Create(ctx, nsCouchCTL))
   283  	if err != nil {
   284  		return err
   285  	}
   286  
   287  	if couchCtlConfig.FleetType == fleet.CouchDB {
   288  		err = cloudIngress(ctx, cl, server)
   289  		if err != nil {
   290  			return err
   291  		}
   292  	} else {
   293  		secret, err := cloudReplicationCredentials(ctx, cl)
   294  		if err != nil {
   295  			return err
   296  		}
   297  		err = client.IgnoreAlreadyExists(cl.Create(ctx, secret))
   298  		if err != nil {
   299  			return err
   300  		}
   301  	}
   302  
   303  	// for stores, CouchDBServer is created by CouchDBPersistence
   304  	err = client.IgnoreAlreadyExists(cl.Create(ctx, couchDBServer))
   305  	if err != nil {
   306  		return err
   307  	}
   308  
   309  	totalNodes := 1
   310  	if couchCtlConfig.ClusterType == cluster.DSDS {
   311  		// 1 store sever + 1 lane server
   312  		totalNodes = 2
   313  		err = fakePodsForCouchdbServer(ctx, cl, touchpointSever)
   314  		if err != nil {
   315  			return err
   316  		}
   317  	}
   318  	err = nodes(ctx, cl, totalNodes)
   319  	if err != nil {
   320  		return err
   321  	}
   322  
   323  	err = fakePodsForCouchdbServer(ctx, cl, couchDBServer)
   324  	if err != nil {
   325  		return err
   326  	}
   327  	couchDBCluster, err := NewMockCouchDBCluster(couchCtlConfig, cl,
   328  		couchDBServer,
   329  		couchDBDatabase,
   330  		couchDBDUser,
   331  		couchDBIndex,
   332  		couchDBDesignDoc,
   333  		couchDBReplicationSet)
   334  	if err != nil {
   335  		return err
   336  	}
   337  	couchCtlConfig.CouchDBPort = couchDBCluster.Port()
   338  	_secretManager = &mockSecretManager{
   339  		clients: make(map[string]*mockSecretManagerClient),
   340  	}
   341  	return nil
   342  }
   343  
   344  // cloudReplicationCredentials cloud to store replication information
   345  func cloudReplicationCredentials(ctx context.Context, cl client.Client) (*corev1.Secret, error) {
   346  	uri := fmt.Sprintf("https://%s.%s", couchCtlConfig.BannerEdgeID, couchCtlConfig.DatasyncDNSName)
   347  	replCreds := &couchdb.ReplicationCredentials{
   348  		UserCredentials: couchdb.UserCredentials{
   349  			UsernamePassword: couchdb.UsernamePassword{
   350  				Username: []byte("replication-user"),
   351  				Password: []byte("replication-password"),
   352  			},
   353  			URI: []byte(uri),
   354  		},
   355  		DBName: []byte(couchCtlConfig.ReplicationDB()),
   356  	}
   357  	ref := dsapi.CloudReplicationCredentials()
   358  	return replCreds.ToSecret(ctx, cl, types.NamespacedName{
   359  		Name:      ref.Name,
   360  		Namespace: ref.Namespace,
   361  	})
   362  }
   363  
   364  // couchdbServerClient get the http client for a CouchDBServer
   365  // TODO replace with helper function from pr #8029
   366  func couchdbServerClient(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) (*couchdb.CouchDB, error) {
   367  	creds, err := serverAdminCreds(ctx, cl, couchDBServer)
   368  	if err != nil {
   369  		return nil, err
   370  	}
   371  	cc := &couchdb.CouchDB{}
   372  	return cc, cc.New(couchdb.Driver, string(creds.Username), string(creds.Password),
   373  		server.Spec.URI, couchCtlConfig.CouchDBPort)
   374  }
   375  
   376  // resetBeforeTest delete status to enable clean run
   377  func resetBeforeTest(ctx context.Context, cl client.Client) error {
   378  	apiVersion := dsapi.GroupVersion.String()
   379  	err := resetStatus(ctx, cl, apiVersion, "CouchDBPersistence", client.ObjectKeyFromObject(couchDBPersistence))
   380  	if err != nil {
   381  		return err
   382  	}
   383  	err = resetStatus(ctx, cl, apiVersion, "CouchDBServer", client.ObjectKeyFromObject(couchDBServer))
   384  	if err != nil {
   385  		return err
   386  	}
   387  	err = resetStatus(ctx, cl, apiVersion, "CouchDBDatabase", client.ObjectKeyFromObject(couchDBDatabase))
   388  	if err != nil {
   389  		return err
   390  	}
   391  	err = resetStatus(ctx, cl, apiVersion, "CouchDBUser", client.ObjectKeyFromObject(couchDBDUser))
   392  	if err != nil {
   393  		return err
   394  	}
   395  	err = resetStatus(ctx, cl, apiVersion, "CouchDBReplicationSet", client.ObjectKeyFromObject(couchDBReplicationSet))
   396  	if err != nil {
   397  		return err
   398  	}
   399  	return nil
   400  }
   401  
   402  //nolint:gocyclo
   403  func cleanUpBeforeTest(ctx context.Context, cl client.Client, tick, timeout time.Duration) error {
   404  	if err := client.IgnoreNotFound(suspendPallet(ctx, cl, "couchdb-", true)); err != nil {
   405  		return err
   406  	}
   407  
   408  	replications := &dsapi.CouchDBReplicationSetList{}
   409  	err := cl.List(ctx, replications, client.InNamespace(couchCtlConfig.CouchNamespace))
   410  	if err != nil {
   411  		return err
   412  	}
   413  	for i := range replications.Items {
   414  		item := replications.Items[i]
   415  		err = client.IgnoreNotFound(cl.Delete(ctx, &item))
   416  		if err != nil {
   417  			return err
   418  		}
   419  	}
   420  
   421  	users := &dsapi.CouchDBUserList{}
   422  	err = cl.List(ctx, users, client.InNamespace(couchCtlConfig.CouchNamespace))
   423  	if err != nil {
   424  		return err
   425  	}
   426  	for i := range users.Items {
   427  		item := users.Items[i]
   428  		controllerutil.RemoveFinalizer(&item, DatasyncFinalizer)
   429  		err = client.IgnoreNotFound(cl.Update(ctx, &item))
   430  		if err != nil {
   431  			return err
   432  		}
   433  		err = client.IgnoreNotFound(cl.Delete(ctx, &item))
   434  		if err != nil {
   435  			return err
   436  		}
   437  	}
   438  
   439  	databases := &dsapi.CouchDBDatabaseList{}
   440  	err = cl.List(ctx, databases, client.InNamespace(couchCtlConfig.CouchNamespace))
   441  	if err != nil {
   442  		return err
   443  	}
   444  	for i := range databases.Items {
   445  		item := databases.Items[i]
   446  		err = client.IgnoreNotFound(cl.Delete(ctx, &item))
   447  		if err != nil {
   448  			return err
   449  		}
   450  	}
   451  
   452  	servers := &dsapi.CouchDBServerList{}
   453  	err = cl.List(ctx, servers, client.InNamespace(couchCtlConfig.CouchNamespace))
   454  	if err != nil {
   455  		return err
   456  	}
   457  	for i := range servers.Items {
   458  		item := servers.Items[i]
   459  		err = client.IgnoreNotFound(cl.Delete(ctx, &item))
   460  		if err != nil {
   461  			return err
   462  		}
   463  	}
   464  
   465  	// NOTE: since we added finalizers deletion order matters
   466  	pl := &dsapi.CouchDBPersistenceList{}
   467  	err = cl.List(ctx, pl, client.InNamespace(couchCtlConfig.CouchNamespace))
   468  	if err != nil {
   469  		return err
   470  	}
   471  	for i := range pl.Items {
   472  		item := pl.Items[i]
   473  		err = client.IgnoreNotFound(cl.Delete(ctx, &item))
   474  		if err != nil {
   475  			return err
   476  		}
   477  	}
   478  
   479  	sts := &appsv1.StatefulSetList{}
   480  	err = cl.List(ctx, sts, client.InNamespace(couchCtlConfig.CouchNamespace))
   481  	if err != nil {
   482  		return err
   483  	}
   484  	for i := range sts.Items {
   485  		item := sts.Items[i]
   486  		err = client.IgnoreNotFound(cl.Delete(ctx, &item))
   487  		if err != nil {
   488  			return err
   489  		}
   490  	}
   491  
   492  	pvc := &corev1.PersistentVolumeClaimList{}
   493  	err = cl.List(ctx, pvc, client.InNamespace(couchCtlConfig.CouchNamespace))
   494  	if err != nil {
   495  		return err
   496  	}
   497  	for i := range pvc.Items {
   498  		item := pvc.Items[i]
   499  		item.Finalizers = nil
   500  		err = client.IgnoreNotFound(cl.Update(ctx, &item))
   501  		if err != nil {
   502  			return err
   503  		}
   504  		err = client.IgnoreNotFound(cl.Delete(ctx, &item))
   505  		if err != nil {
   506  			return err
   507  		}
   508  	}
   509  
   510  	apiVersion := dsapi.GroupVersion.String()
   511  	err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBPersistence")
   512  	if err != nil {
   513  		return err
   514  	}
   515  	err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBReplicationSet")
   516  	if err != nil {
   517  		return err
   518  	}
   519  	err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBUser")
   520  	if err != nil {
   521  		return err
   522  	}
   523  	err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBDatabase")
   524  	if err != nil {
   525  		return err
   526  	}
   527  	err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBServer")
   528  	if err != nil {
   529  		return err
   530  	}
   531  
   532  	err = waitForDeletedResources(cl, tick, timeout, corev1.SchemeGroupVersion.String(), "Pod")
   533  	if err != nil {
   534  		return err
   535  	}
   536  	err = waitForDeletedResources(cl, tick, timeout, corev1.SchemeGroupVersion.String(), "PersistentVolumeClaim")
   537  	if err != nil {
   538  		return err
   539  	}
   540  
   541  	if client.IgnoreNotFound(suspendPallet(ctx, cl, "couchdb-", false)) != nil {
   542  		return err
   543  	}
   544  
   545  	return nil
   546  }
   547  
   548  func scaleDeployment(ctx context.Context, cl client.Client, nn types.NamespacedName, replicas int32) error {
   549  	deployment := &appsv1.Deployment{}
   550  	err := cl.Get(ctx, nn, deployment)
   551  	if err != nil {
   552  		return err
   553  	}
   554  	if *deployment.Spec.Replicas == replicas {
   555  		return nil
   556  	}
   557  	patch := client.MergeFrom(deployment.DeepCopy())
   558  	deployment.Spec.Replicas = &replicas
   559  	return cl.Patch(ctx, deployment, patch)
   560  }
   561  
   562  func suspendPallet(ctx context.Context, cl client.Client, prefix string, suspend bool) error {
   563  	pallets := &whv1.UnpackedPalletList{}
   564  	err := cl.List(ctx, pallets)
   565  	if err != nil {
   566  		return err
   567  	}
   568  	palletName := ""
   569  	for _, item := range pallets.Items {
   570  		if strings.HasPrefix(item.Name, prefix) {
   571  			palletName = item.Name
   572  			break
   573  		}
   574  	}
   575  	if palletName == "" {
   576  		return nil
   577  	}
   578  
   579  	pallet := &whv1.UnpackedPallet{}
   580  	err = cl.Get(ctx, client.ObjectKey{Name: palletName}, pallet)
   581  	if err != nil {
   582  		return err
   583  	}
   584  	if pallet.Spec.Suspend == suspend {
   585  		return nil
   586  	}
   587  	patch := client.MergeFrom(pallet.DeepCopy())
   588  	pallet.Spec.Suspend = suspend
   589  	return cl.Patch(ctx, pallet, patch)
   590  }
   591  
   592  func waitForDeletedResources(cl client.Client, tick, timeout time.Duration, apiVersion, kind string) error {
   593  	list := &unstructured.UnstructuredList{}
   594  	list.SetAPIVersion(apiVersion)
   595  	list.SetKind(kind)
   596  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   597  	defer cancel()
   598  	for {
   599  		select {
   600  		default:
   601  			err := cl.List(ctx, list, client.InNamespace(couchCtlConfig.CouchNamespace))
   602  			if err != nil {
   603  				return err
   604  			}
   605  			if len(list.Items) == 0 { // objects are deleted
   606  				return nil
   607  			}
   608  			time.Sleep(tick)
   609  		case <-ctx.Done():
   610  			return fmt.Errorf("%s:%s in namespace %s not deleted on time", apiVersion, kind, couchCtlConfig.CouchNamespace)
   611  		}
   612  	}
   613  }
   614  
   615  func resetStatus(ctx context.Context, cl client.Client, apiVersion, kind string, key client.ObjectKey) error {
   616  	un := &unstructured.Unstructured{}
   617  	un.SetAPIVersion(apiVersion)
   618  	un.SetKind(kind)
   619  	err := cl.Get(ctx, key, un)
   620  	switch {
   621  	case k8errors.IsNotFound(err):
   622  		return nil
   623  	case err != nil:
   624  		return err
   625  	}
   626  	unstructured.RemoveNestedField(un.UnstructuredContent(), "status")
   627  	err = cl.Status().Update(ctx, un)
   628  	return client.IgnoreNotFound(err)
   629  }
   630  
   631  func cloudIngress(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) error {
   632  	ingress := &netv1.Ingress{
   633  		ObjectMeta: metav1.ObjectMeta{
   634  			Name:      couchdb.CouchIngressName,
   635  			Namespace: server.Namespace,
   636  		},
   637  		Spec: netv1.IngressSpec{
   638  			DefaultBackend: &netv1.IngressBackend{
   639  				Service: &netv1.IngressServiceBackend{
   640  					Name: couchdb.Name,
   641  					Port: netv1.ServiceBackendPort{
   642  						Number: 5984,
   643  					},
   644  				},
   645  			},
   646  		},
   647  	}
   648  	err := client.IgnoreAlreadyExists(cl.Create(ctx, ingress))
   649  	if err != nil {
   650  		return err
   651  	}
   652  	patch := client.MergeFrom(ingress.DeepCopy())
   653  	ingress.Status = netv1.IngressStatus{
   654  		LoadBalancer: netv1.IngressLoadBalancerStatus{
   655  			Ingress: []netv1.IngressLoadBalancerIngress{
   656  				{
   657  					IP: "127.0.0.1",
   658  				},
   659  			},
   660  		},
   661  	}
   662  	return cl.Status().Patch(ctx, ingress, patch)
   663  }
   664  
   665  func nodes(ctx context.Context, cl client.Client, totalNodes int) error {
   666  	for i := 0; i < totalNodes; i++ {
   667  		nodeName := couchdb.StoreServerName
   668  		class := v1ien.Server
   669  		role := v1ien.ControlPlane
   670  		laneNumber := fmt.Sprintf("%d", i)
   671  		nodeUID := couchCtlConfig.NodeUID
   672  		if i > 0 {
   673  			nodeName = fmt.Sprintf("%s-%s", couchdb.TouchpointName, laneNumber)
   674  			class = v1ien.Touchpoint
   675  			role = v1ien.Worker
   676  			nodeUID = touchpointSever.Labels[couchdb.NodeUIDLabel]
   677  		}
   678  		node := &corev1.Node{
   679  			TypeMeta: metav1.TypeMeta{
   680  				Kind:       "Node",
   681  				APIVersion: "v1",
   682  			},
   683  			ObjectMeta: metav1.ObjectMeta{
   684  				UID:  types.UID(nodeUID),
   685  				Name: nodeName,
   686  				Labels: map[string]string{
   687  					nodemeta.ClassLabel:    string(class),
   688  					nodemeta.RoleLabel:     string(role),
   689  					nodemeta.LaneLabel:     laneNumber,
   690  					nodemeta.HostnameLabel: "ien-" + laneNumber,
   691  				},
   692  			},
   693  			Spec: corev1.NodeSpec{},
   694  		}
   695  		err := client.IgnoreAlreadyExists(cl.Create(ctx, node))
   696  		if err != nil {
   697  			return err
   698  		}
   699  		mem := "1Gi"
   700  		if i == 0 {
   701  			mem = "4Gi" // LEADER IS SELECTED BY MEMORY AVAILABLE
   702  		}
   703  		node.Status.Allocatable = corev1.ResourceList{
   704  			corev1.ResourceMemory: resource.MustParse(mem),
   705  		}
   706  		err = cl.Status().Update(ctx, node)
   707  		if err != nil {
   708  			return err
   709  		}
   710  	}
   711  	return nil
   712  }
   713  
   714  func ensureValidConfig() error {
   715  	// for testing disable reconciliation
   716  	couchCtlConfig.PollingInterval = 24 * time.Hour
   717  	// couchdb should already be enabled for integration tests
   718  	couchCtlConfig.EnablementWatchInterval = 24 * time.Hour
   719  	couchCtlConfig.ReplicationPollingInterval = 24 * time.Hour
   720  
   721  	if integration.IsL1() {
   722  		uid := uuid.NewString()
   723  		couchCtlConfig.BannerEdgeID = uuid.NewString()
   724  		couchCtlConfig.ProjectID = "ret-edge-dev-test"
   725  
   726  		couchCtlConfig.RequeueTime = 1 * time.Second
   727  		couchCtlConfig.ServerNotReady = 1 * time.Second
   728  		couchCtlConfig.DatabaseNotFound = 1 * time.Second
   729  		couchCtlConfig.IngressNotReady = 1 * time.Second
   730  
   731  		couchCtlConfig.CouchCTLNamespace = "couchctl"
   732  		couchCtlConfig.CouchNamespace = "data-sync-couchdb"
   733  
   734  		couchCtlConfig.CouchDBPort = strconv.Itoa(5984)
   735  
   736  		couchCtlConfig.NodeUID = uid
   737  		couchCtlConfig.NodeRole = string(v1ien.ControlPlane)
   738  		couchCtlConfig.NodeClass = string(v1ien.Server)
   739  		return couchCtlConfig.Validate()
   740  	}
   741  
   742  	// f2 framework does not parse ENV variables
   743  	// TODO update f2 framework to parse ENV variables
   744  	config, err := rest.InClusterConfig()
   745  	if errors.Is(err, rest.ErrNotInCluster) {
   746  		// for local testing
   747  		config, err = clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
   748  		if err != nil {
   749  			return err
   750  		}
   751  	}
   752  	cl, err := client.New(config, client.Options{})
   753  	if err != nil {
   754  		return err
   755  	}
   756  	ctx := context.Background()
   757  	ei, err := info.FromClient(ctx, cl)
   758  	if err != nil {
   759  		return err
   760  	}
   761  
   762  	couchCtlConfig.ProjectID = ei.ProjectID
   763  	couchCtlConfig.BannerEdgeID = ei.BannerEdgeID
   764  	couchCtlConfig.FleetType = ei.Fleet
   765  	couchCtlConfig.ClusterType = ei.ClusterType
   766  	bi, err := bsl.FromClient(ctx, cl)
   767  	if err == nil { // optional
   768  		couchCtlConfig.SiteID = bi.ID
   769  	}
   770  	return couchCtlConfig.Validate()
   771  }
   772  

View as plain text