...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/k8s/watcher_test.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/k8s

     1  package k8s_test
     2  
     3  import (
     4  	"context"
     5  	"testing"
     6  	"time"
     7  
     8  	"github.com/stretchr/testify/require"
     9  
    10  	"github.com/datawire/dlib/dlog"
    11  	"github.com/emissary-ingress/emissary/v3/pkg/dtest"
    12  	"github.com/emissary-ingress/emissary/v3/pkg/k8s"
    13  )
    14  
    15  const (
    16  	delay = 10 * time.Second
    17  )
    18  
    19  func fetch(ctx context.Context, w *k8s.Watcher, resource, qname string) (result k8s.Resource, err error) {
    20  	go func() {
    21  		time.Sleep(delay)
    22  		w.Stop()
    23  	}()
    24  
    25  	err = w.WatchQuery(k8s.Query{Kind: resource, Namespace: k8s.NamespaceAll}, func(w *k8s.Watcher) error {
    26  		list, err := w.List(resource)
    27  		if err != nil {
    28  			return err
    29  		}
    30  		for _, r := range list {
    31  			if r.QName() == qname {
    32  				result = r
    33  				w.Stop()
    34  			}
    35  		}
    36  		return nil
    37  	})
    38  	if err != nil {
    39  		return nil, err
    40  	}
    41  
    42  	if err := w.Wait(ctx); err != nil {
    43  		return nil, err
    44  	}
    45  	return result, nil
    46  }
    47  
    48  func info(ctx context.Context) *k8s.KubeInfo {
    49  	return k8s.NewKubeInfo(dtest.KubeVersionConfig(ctx, dtest.Kube22), "", "")
    50  }
    51  
    52  func TestUpdateStatus(t *testing.T) {
    53  	t.Parallel()
    54  	ctx := dlog.NewTestContext(t, false)
    55  	w, err := k8s.NewWatcher(info(ctx))
    56  	require.NoError(t, err)
    57  
    58  	svc, err := fetch(ctx, w, "services", "kubernetes.default")
    59  	require.NoError(t, err)
    60  	svc.Status()["loadBalancer"].(map[string]interface{})["ingress"] = []map[string]interface{}{{"hostname": "foo", "ip": "1.2.3.4"}}
    61  	result, err := w.UpdateStatus(ctx, svc)
    62  	if err != nil {
    63  		t.Error(err)
    64  		return
    65  	} else {
    66  		t.Logf("updated %s status, result: %v\n", svc.QName(), result.ResourceVersion())
    67  	}
    68  
    69  	w2, err := k8s.NewWatcher(info(ctx))
    70  	require.NoError(t, err)
    71  	svc, err = fetch(ctx, w2, "services", "kubernetes.default")
    72  	require.NoError(t, err)
    73  	ingresses := svc.Status()["loadBalancer"].(map[string]interface{})["ingress"].([]interface{})
    74  	ingress := ingresses[0].(map[string]interface{})
    75  	if ingress["hostname"] != "foo" {
    76  		t.Error("expected foo")
    77  	}
    78  
    79  	if ingress["ip"] != "1.2.3.4" {
    80  		t.Error("expected 1.2.3.4")
    81  	}
    82  }
    83  
    84  func TestWatchCustom(t *testing.T) {
    85  	t.Parallel()
    86  	ctx := dlog.NewTestContext(t, false)
    87  	w, err := k8s.NewWatcher(info(ctx))
    88  	require.NoError(t, err)
    89  
    90  	// XXX: we can only watch custom resources... k8s doesn't
    91  	// support status for CRDs until 1.12
    92  	xmas, err := fetch(ctx, w, "customs", "xmas.default")
    93  	require.NoError(t, err)
    94  	if xmas == nil {
    95  		t.Error("couldn't find xmas")
    96  	} else {
    97  		spec := xmas.Spec()
    98  		if spec["deck"] != "the halls" {
    99  			t.Errorf("expected the halls, got %v", spec["deck"])
   100  		}
   101  	}
   102  }
   103  
   104  func TestWatchCustomCollision(t *testing.T) {
   105  	t.Parallel()
   106  	ctx := dlog.NewTestContext(t, false)
   107  	w, err := k8s.NewWatcher(info(ctx))
   108  	require.NoError(t, err)
   109  
   110  	easter, err := fetch(ctx, w, "csrv", "easter.default")
   111  	require.NoError(t, err)
   112  	require.NotNil(t, easter)
   113  	t.Logf("easter: %#v", easter)
   114  	require.Equal(t, "the lawn", easter.Spec()["deck"])
   115  }
   116  
   117  func TestWatchQuery(t *testing.T) {
   118  	t.Skip("FIXME(lukeshu): This test is notoriously flakey, and the code under test hasn't changed in ages.  Write better tests!")
   119  	t.Parallel()
   120  	ctx := dlog.NewTestContext(t, false)
   121  	w, err := k8s.NewWatcher(info(ctx))
   122  	require.NoError(t, err)
   123  
   124  	services := []string{}
   125  	err = w.WatchQuery(k8s.Query{
   126  		Kind:          "services",
   127  		Namespace:     k8s.NamespaceAll,
   128  		FieldSelector: "metadata.name=kubernetes",
   129  	}, func(w *k8s.Watcher) error {
   130  		list, err := w.List("services")
   131  		if err != nil {
   132  			return err
   133  		}
   134  		for _, r := range list {
   135  			services = append(services, r.QName())
   136  		}
   137  		return nil
   138  	})
   139  	require.NoError(t, err)
   140  	time.AfterFunc(1*time.Second, func() {
   141  		w.Stop()
   142  	})
   143  	require.NoError(t, w.Wait(ctx))
   144  	require.Equal(t, services, []string{"kubernetes.default"})
   145  }
   146  

View as plain text