...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/entrypoint/testutil_fake_test.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"io/ioutil"
     8  	"net/http"
     9  	"os"
    10  	"reflect"
    11  	"sync"
    12  	"sync/atomic"
    13  	"testing"
    14  	"time"
    15  
    16  	"github.com/datawire/dlib/dexec"
    17  	"github.com/datawire/dlib/dgroup"
    18  	"github.com/datawire/dlib/dlog"
    19  	"github.com/emissary-ingress/emissary/v3/cmd/entrypoint/internal/testqueue"
    20  	"github.com/emissary-ingress/emissary/v3/pkg/ambex"
    21  	v3bootstrap "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/bootstrap/v3"
    22  	amb "github.com/emissary-ingress/emissary/v3/pkg/api/getambassador.io/v3alpha1"
    23  	"github.com/emissary-ingress/emissary/v3/pkg/consulwatch"
    24  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    25  	"github.com/emissary-ingress/emissary/v3/pkg/snapshot/v1"
    26  )
    27  
    28  // The Fake struct is a test harness for edgestack. Its goals are to help us fill out our test
    29  // pyramid by making it super easy to create unit-like tests directly from the snapshots, bug
    30  // reports, and other inputs provided by users who find regressions and/or encounter other problems
    31  // in the field. Since we have no shortage of these reports, if we make it easy to create tests from
    32  // them, we will fill out our test pyramid quickly and hopefully reduce our rate of
    33  // regressions. This also means the tests produced this way need to scale well both in terms of
    34  // execution time/parallelism as well as flakiness since we will quickly have a large number of
    35  // these tests.
    36  //
    37  // The way this works is by isolating via dependency injection the key portions of the control plane
    38  // where the bulk of our business logic is implemented. The Fake utilities directly feed this
    39  // lightweight control plane its input as specified by the test code without passing the resources
    40  // all the way through a real kubernetes API server and/or a real consul deployment. This is not
    41  // only significantly more efficient than spinning up real kubernetes and/or consul deployments, but
    42  // it also lets us precisely control the order of events thereby a) removing the nondeterminism that
    43  // leads to flaky tests, and b) also allowing us to deliberately create/recreate the sort of low
    44  // probability sequence of events that are often at the root of heisenbugs.
    45  //
    46  // The key to being able to build tests this way is expressing our business logic as "hermetically
    47  // sealed" libraries, i.e. libraries with no/few hardcoded dependencies. This doesn't have to be
    48  // done in a fancy/elegant way, it is well worth practicing "stupidly mechanical dependency
    49  // injection" in order to quickly excise some business logic of its hardcoded dependencies and
    50  // enable this sort of testing.
    51  //
    52  // See TestFakeHello, TestFakeHelloWithEnvoyConfig, and TestFakeHelloConsul for examples of how to
    53  // get started using this struct to write tests.
    54  type Fake struct {
    55  	// These are all read only fields. They implement the dependencies that get injected into
    56  	// the watcher loop.
    57  	config FakeConfig
    58  	T      *testing.T
    59  	group  *dgroup.Group
    60  	cancel context.CancelFunc
    61  
    62  	k8sSource       *fakeK8sSource
    63  	watcher         *fakeWatcher
    64  	istioCertSource *fakeIstioCertSource
    65  	// This group of fields are used to store kubernetes resources and consul endpoint data and
    66  	// provide explicit control over when changes to that data are sent to the control plane.
    67  	k8sStore       *K8sStore
    68  	consulStore    *ConsulStore
    69  	k8sNotifier    *Notifier
    70  	consulNotifier *Notifier
    71  
    72  	// This holds the current snapshot.
    73  	currentSnapshot *atomic.Value
    74  
    75  	fastpath     *testqueue.Queue // All fastpath snapshots that have been produced.
    76  	snapshots    *testqueue.Queue // All snapshots that have been produced.
    77  	envoyConfigs *testqueue.Queue // All envoyConfigs that have been produced.
    78  
    79  	// This is used to make Teardown idempotent.
    80  	teardownOnce sync.Once
    81  
    82  	ambassadorMeta *snapshot.AmbassadorMetaInfo
    83  
    84  	DiagdBindPort string
    85  }
    86  
    87  // FakeConfig provides option when constructing a new Fake.
    88  type FakeConfig struct {
    89  	EnvoyConfig bool          // If true then the Fake will produce envoy configs in addition to Snapshots.
    90  	DiagdDebug  bool          // If true then diagd will have debugging enabled
    91  	Timeout     time.Duration // How long to wait for snapshots and/or envoy configs to become available.
    92  }
    93  
    94  func (fc *FakeConfig) fillDefaults() {
    95  	if fc.Timeout == 0 {
    96  		fc.Timeout = 10 * time.Second
    97  	}
    98  }
    99  
   100  // NewFake will construct a new Fake object. See RunFake for a convenient way to handle construct,
   101  // Setup, and Teardown of a Fake with one line of code.
   102  func NewFake(t *testing.T, config FakeConfig) *Fake {
   103  	config.fillDefaults()
   104  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   105  	k8sStore := NewK8sStore()
   106  	consulStore := NewConsulStore()
   107  
   108  	fake := &Fake{
   109  		config: config,
   110  		T:      t,
   111  		cancel: cancel,
   112  		group:  dgroup.NewGroup(ctx, dgroup.GroupConfig{EnableWithSoftness: true}),
   113  
   114  		k8sStore:       k8sStore,
   115  		consulStore:    consulStore,
   116  		k8sNotifier:    NewNotifier(),
   117  		consulNotifier: NewNotifier(),
   118  
   119  		currentSnapshot: &atomic.Value{},
   120  
   121  		fastpath:     testqueue.NewQueue(t, config.Timeout),
   122  		snapshots:    testqueue.NewQueue(t, config.Timeout),
   123  		envoyConfigs: testqueue.NewQueue(t, config.Timeout),
   124  	}
   125  
   126  	fake.k8sSource = &fakeK8sSource{fake: fake, store: k8sStore}
   127  	fake.watcher = &fakeWatcher{fake: fake, store: consulStore}
   128  	fake.istioCertSource = &fakeIstioCertSource{}
   129  
   130  	return fake
   131  }
   132  
   133  // RunFake will create a new fake, invoke its Setup method and register its Teardown method as a
   134  // Cleanup function with the test object.
   135  func RunFake(t *testing.T, config FakeConfig, ambMeta *snapshot.AmbassadorMetaInfo) *Fake {
   136  	fake := NewFake(t, config)
   137  	fake.SetAmbassadorMeta(ambMeta)
   138  	fake.Setup()
   139  	fake.T.Cleanup(fake.Teardown)
   140  	return fake
   141  }
   142  
   143  // Setup will start up all the goroutines needed for this fake edgestack instance. Depending on the
   144  // FakeConfig supplied wen constructing the Fake, this may also involve launching external
   145  // processes, you should therefore ensure that you call Teardown whenever you call Setup.
   146  func (f *Fake) Setup() {
   147  	if f.config.EnvoyConfig {
   148  		_, err := dexec.LookPath("diagd")
   149  		if err != nil {
   150  			f.T.Fatal("unable to find diagd, cannot run")
   151  		}
   152  
   153  		f.group.Go("snapshot_server", func(ctx context.Context) error {
   154  			return snapshotServer(ctx, f.currentSnapshot)
   155  		})
   156  
   157  		f.DiagdBindPort = GetDiagdBindPort()
   158  
   159  		f.group.Go("diagd", func(ctx context.Context) error {
   160  			args := []string{
   161  				"diagd",
   162  				"/tmp",
   163  				"/tmp/bootstrap-ads.json",
   164  				"/tmp/envoy.json",
   165  				"--no-envoy",
   166  				"--host", "127.0.0.1",
   167  				"--port", f.DiagdBindPort,
   168  			}
   169  
   170  			if f.config.DiagdDebug {
   171  				args = append(args, "--debug")
   172  			}
   173  
   174  			cmd := dexec.CommandContext(ctx, args[0], args[1:]...)
   175  			if envbool("DEV_SHUTUP_DIAGD") {
   176  				devnull, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
   177  				cmd.Stdout = devnull
   178  				cmd.Stderr = devnull
   179  			}
   180  			err := cmd.Run()
   181  			if err != nil {
   182  				exErr, ok := err.(*dexec.ExitError)
   183  				if ok {
   184  					f.T.Logf("diagd exited with error: %+v", exErr)
   185  					return nil
   186  				}
   187  			}
   188  			return err
   189  		})
   190  	}
   191  	f.group.Go("fake-watcher", f.runWatcher)
   192  
   193  }
   194  
   195  // GetFeatures grabs features from diagd. Yup, it's a little ugly.
   196  func (f *Fake) GetFeatures(ctx context.Context, features interface{}) error {
   197  	// If EnvoyConfig isn't set, we're not running diagd, so there's no way to get the
   198  	// features. Just return an error immediately.
   199  	if !f.config.EnvoyConfig {
   200  		return fmt.Errorf("Features are not available with EnvoyConfig false")
   201  	}
   202  
   203  	// The way we get the features is by making a request to diagd. Why, you ask, is the
   204  	// features dict not just always part of the IR dump? It's basically a performance thing
   205  	// at present.
   206  	//
   207  	// TODO(Flynn): That's a stupid reason and we should fix it.
   208  	featuresURL := fmt.Sprintf("http://localhost:%s/_internal/v0/features", f.DiagdBindPort)
   209  
   210  	req, err := http.NewRequestWithContext(ctx, "GET", featuresURL, nil)
   211  
   212  	if err != nil {
   213  		return err
   214  	}
   215  
   216  	// This is test code, so we can just always force X-Ambassador-Diag-IP to 127.0.0.1,
   217  	// so that diagd will trust the request.
   218  	req.Header.Set("X-Ambassador-Diag-IP", "127.0.0.1")
   219  	req.Header.Set("content-type", "application/json")
   220  	resp, err := http.DefaultClient.Do(req)
   221  
   222  	if err != nil {
   223  		return err
   224  	}
   225  
   226  	defer resp.Body.Close()
   227  
   228  	if resp.StatusCode != 200 {
   229  		return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
   230  	}
   231  
   232  	body, err := ioutil.ReadAll(resp.Body)
   233  
   234  	if err != nil {
   235  		return err
   236  	}
   237  
   238  	// err = ioutil.WriteFile("/tmp/ambassador-features.json", body, 0644)
   239  	//
   240  	// if err != nil { return err }
   241  
   242  	// Trust our caller to have passed in something that we can unmarshal into. This is
   243  	// particularly relevant right now because there isn't a real Go type for the Features
   244  	// dict, so our caller is probably handing in something to look at just a subset.
   245  	//
   246  	// TODO(Flynn): Really, we should just have an IR Features type...
   247  	return json.Unmarshal(body, features)
   248  }
   249  
   250  // Teardown will clean up anything that Setup has started. It is idempotent. Note that if you use
   251  // RunFake Setup will be called and Teardown will be automatically registered as a Cleanup function
   252  // with the supplied testing.T
   253  func (f *Fake) Teardown() {
   254  	f.teardownOnce.Do(func() {
   255  		f.cancel()
   256  		err := f.group.Wait()
   257  		if err != nil && err != context.Canceled {
   258  			f.T.Fatalf("fake edgestack errored out: %+v", err)
   259  		}
   260  	})
   261  }
   262  
   263  func (f *Fake) runWatcher(ctx context.Context) error {
   264  	interestingTypes := GetInterestingTypes(ctx, nil)
   265  	queries := GetQueries(ctx, interestingTypes)
   266  
   267  	return watchAllTheThingsInternal(
   268  		ctx,
   269  		f.currentSnapshot, // encoded
   270  		f.k8sSource,
   271  		queries,
   272  		f.watcher.Watch, // watchConsulFunc
   273  		f.istioCertSource,
   274  		f.notifySnapshot,
   275  		f.notifyFastpath,
   276  		f.ambassadorMeta,
   277  	)
   278  }
   279  
   280  func (f *Fake) notifyFastpath(ctx context.Context, fastpath *ambex.FastpathSnapshot) {
   281  	f.fastpath.Add(f.T, fastpath)
   282  }
   283  
   284  func (f *Fake) GetEndpoints(predicate func(*ambex.Endpoints) bool) (*ambex.Endpoints, error) {
   285  	f.T.Helper()
   286  	untyped, err := f.fastpath.Get(f.T, func(obj interface{}) bool {
   287  		fastpath := obj.(*ambex.FastpathSnapshot)
   288  		return predicate(fastpath.Endpoints)
   289  	})
   290  	if err != nil {
   291  		return nil, err
   292  	}
   293  	return untyped.(*ambex.FastpathSnapshot).Endpoints, nil
   294  }
   295  
   296  func (f *Fake) AssertEndpointsEmpty(timeout time.Duration) {
   297  	f.T.Helper()
   298  	f.fastpath.AssertEmpty(f.T, timeout, "endpoints queue not empty")
   299  }
   300  
   301  type SnapshotEntry struct {
   302  	Disposition SnapshotDisposition
   303  	Snapshot    *snapshot.Snapshot
   304  }
   305  
   306  func (entry SnapshotEntry) String() string {
   307  	snapshot := "nil"
   308  	if entry.Snapshot != nil {
   309  		snapshot = fmt.Sprintf("&%#v", *entry.Snapshot)
   310  	}
   311  	return fmt.Sprintf("{Disposition: %v, Snapshot: %s}", entry.Disposition, snapshot)
   312  }
   313  
   314  // We pass this into the watcher loop to get notified when a snapshot is produced.
   315  func (f *Fake) notifySnapshot(ctx context.Context, disp SnapshotDisposition, snapJSON []byte) error {
   316  	if disp == SnapshotReady && f.config.EnvoyConfig {
   317  		if err := notifyReconfigWebhooksFunc(ctx, &noopNotable{}, false); err != nil {
   318  			return err
   319  		}
   320  		f.appendEnvoyConfig(ctx)
   321  	}
   322  
   323  	var snap *snapshot.Snapshot
   324  	err := json.Unmarshal(snapJSON, &snap)
   325  	if err != nil {
   326  		f.T.Fatalf("error decoding snapshot: %+v", err)
   327  	}
   328  
   329  	f.snapshots.Add(f.T, SnapshotEntry{disp, snap})
   330  	return nil
   331  }
   332  
   333  // GetSnapshotEntry will return the next SnapshotEntry that satisfies the supplied predicate.
   334  func (f *Fake) GetSnapshotEntry(predicate func(SnapshotEntry) bool) (SnapshotEntry, error) {
   335  	f.T.Helper()
   336  	untyped, err := f.snapshots.Get(f.T, func(obj interface{}) bool {
   337  		entry := obj.(SnapshotEntry)
   338  		return predicate(entry)
   339  	})
   340  	if err != nil {
   341  		return SnapshotEntry{}, err
   342  	}
   343  	return untyped.(SnapshotEntry), nil
   344  }
   345  
   346  // GetSnapshot will return the next snapshot that satisfies the supplied predicate.
   347  func (f *Fake) GetSnapshot(predicate func(*snapshot.Snapshot) bool) (*snapshot.Snapshot, error) {
   348  	f.T.Helper()
   349  	entry, err := f.GetSnapshotEntry(func(entry SnapshotEntry) bool {
   350  		return entry.Disposition == SnapshotReady && predicate(entry.Snapshot)
   351  	})
   352  	if err != nil {
   353  		return nil, err
   354  	}
   355  	return entry.Snapshot, nil
   356  }
   357  
   358  func (f *Fake) appendEnvoyConfig(ctx context.Context) {
   359  	msg, err := ambex.Decode(ctx, "/tmp/envoy.json")
   360  	if err != nil {
   361  		f.T.Fatalf("error decoding envoy.json after sending snapshot to python: %+v", err)
   362  	}
   363  	bs := msg.(*v3bootstrap.Bootstrap)
   364  	f.envoyConfigs.Add(f.T, bs)
   365  }
   366  
   367  // GetEnvoyConfig will return the next envoy config that satisfies the supplied predicate.
   368  func (f *Fake) GetEnvoyConfig(predicate func(*v3bootstrap.Bootstrap) bool) (*v3bootstrap.Bootstrap, error) {
   369  	f.T.Helper()
   370  	untyped, err := f.envoyConfigs.Get(f.T, func(obj interface{}) bool {
   371  		return predicate(obj.(*v3bootstrap.Bootstrap))
   372  	})
   373  	if err != nil {
   374  		return nil, err
   375  	}
   376  	return untyped.(*v3bootstrap.Bootstrap), nil
   377  }
   378  
   379  // AutoFlush will cause a flush whenever any inputs are modified.
   380  func (f *Fake) AutoFlush(enabled bool) {
   381  	f.k8sNotifier.AutoNotify(enabled)
   382  	f.consulNotifier.AutoNotify(enabled)
   383  }
   384  
   385  // Feed will cause inputs from all datasources to be delivered to the control plane.
   386  func (f *Fake) Flush() {
   387  	f.k8sNotifier.Notify()
   388  	f.consulNotifier.Notify()
   389  }
   390  
   391  // sets the ambassador meta info that should get sent in each snapshot
   392  func (f *Fake) SetAmbassadorMeta(ambMeta *snapshot.AmbassadorMetaInfo) {
   393  	f.ambassadorMeta = ambMeta
   394  }
   395  
   396  // UpsertFile will parse the contents of the file as yaml and feed them into the control plane
   397  // created or updating any overlapping resources that exist.
   398  func (f *Fake) UpsertFile(filename string) error {
   399  	if err := f.k8sStore.UpsertFile(filename); err != nil {
   400  		return err
   401  	}
   402  	f.k8sNotifier.Changed()
   403  	return nil
   404  }
   405  
   406  // UpsertYAML will parse the provided YAML and feed the resources in it into the control plane,
   407  // creating or updating any overlapping resources that exist.
   408  func (f *Fake) UpsertYAML(yaml string) error {
   409  	if err := f.k8sStore.UpsertYAML(yaml); err != nil {
   410  		return err
   411  	}
   412  	f.k8sNotifier.Changed()
   413  	return nil
   414  }
   415  
   416  // Upsert will update (or if necessary create) the supplied resource in the fake k8s datastore.
   417  func (f *Fake) Upsert(resource kates.Object) error {
   418  	if err := f.k8sStore.Upsert(resource); err != nil {
   419  		return err
   420  	}
   421  	f.k8sNotifier.Changed()
   422  	return nil
   423  }
   424  
   425  // Delete will removes the specified resource from the fake k8s datastore.
   426  func (f *Fake) Delete(kind, namespace, name string) error {
   427  	if err := f.k8sStore.Delete(kind, namespace, name); err != nil {
   428  		return err
   429  	}
   430  	f.k8sNotifier.Changed()
   431  	return nil
   432  }
   433  
   434  // ConsulEndpoint stores the supplied consul endpoint data.
   435  func (f *Fake) ConsulEndpoint(datacenter, service, address string, port int, tags ...string) {
   436  	f.consulStore.ConsulEndpoint(datacenter, service, address, port, tags...)
   437  	f.consulNotifier.Changed()
   438  }
   439  
   440  // SendIstioCertUpdate sends the supplied Istio certificate update.
   441  func (f *Fake) SendIstioCertUpdate(update IstioCertUpdate) {
   442  	f.istioCertSource.updateChannel <- update
   443  }
   444  
   445  type fakeK8sSource struct {
   446  	fake  *Fake
   447  	store *K8sStore
   448  }
   449  
   450  func (fs *fakeK8sSource) Watch(ctx context.Context, queries ...kates.Query) (K8sWatcher, error) {
   451  	fw := &fakeK8sWatcher{fs.store.Cursor(), make(chan struct{}), queries}
   452  	fs.fake.k8sNotifier.Listen(func() {
   453  		go func() {
   454  			fw.notifyCh <- struct{}{}
   455  		}()
   456  	})
   457  	return fw, nil
   458  }
   459  
   460  type fakeK8sWatcher struct {
   461  	cursor   *K8sStoreCursor
   462  	notifyCh chan struct{}
   463  	queries  []kates.Query
   464  }
   465  
   466  func (f *fakeK8sWatcher) Changed() <-chan struct{} {
   467  	return f.notifyCh
   468  }
   469  
   470  func (f *fakeK8sWatcher) FilteredUpdate(_ context.Context, target interface{}, deltas *[]*kates.Delta, predicate func(*kates.Unstructured) bool) (bool, error) {
   471  	byname := map[string][]kates.Object{}
   472  	resources, newDeltas, err := f.cursor.Get()
   473  	if err != nil {
   474  		return false, err
   475  	}
   476  	for _, obj := range resources {
   477  		for _, q := range f.queries {
   478  			var un *kates.Unstructured
   479  			err := convert(obj, &un)
   480  			if err != nil {
   481  				return false, err
   482  			}
   483  			doesMatch, err := matches(q, obj)
   484  			if err != nil {
   485  				return false, err
   486  			}
   487  			if doesMatch && predicate(un) {
   488  				byname[q.Name] = append(byname[q.Name], obj)
   489  			}
   490  		}
   491  	}
   492  
   493  	// XXX: this stuff is copied from kates/accumulator.go
   494  	targetVal := reflect.ValueOf(target)
   495  	targetType := targetVal.Type().Elem()
   496  	for _, q := range f.queries {
   497  		name := q.Name
   498  		v := byname[q.Name]
   499  		fieldEntry, ok := targetType.FieldByName(name)
   500  		if !ok {
   501  			return false, fmt.Errorf("no such field: %q", name)
   502  		}
   503  		val := reflect.New(fieldEntry.Type)
   504  		err := convert(v, val.Interface())
   505  		if err != nil {
   506  			return false, err
   507  		}
   508  		targetVal.Elem().FieldByName(name).Set(reflect.Indirect(val))
   509  	}
   510  
   511  	*deltas = newDeltas
   512  
   513  	return len(newDeltas) > 0, nil
   514  }
   515  
   516  func matches(query kates.Query, obj kates.Object) (bool, error) {
   517  	queryKind, err := canon(query.Kind)
   518  	if err != nil {
   519  		return false, err
   520  	}
   521  	objKind, err := canon(obj.GetObjectKind().GroupVersionKind().Kind)
   522  	if err != nil {
   523  		return false, err
   524  	}
   525  	return queryKind == objKind, nil
   526  }
   527  
   528  type fakeWatcher struct {
   529  	fake  *Fake
   530  	store *ConsulStore
   531  }
   532  
   533  func (f *fakeWatcher) Watch(ctx context.Context, resolver *amb.ConsulResolver, svc string, endpoints chan consulwatch.Endpoints) (Stopper, error) {
   534  	var sent consulwatch.Endpoints
   535  	stop := f.fake.consulNotifier.Listen(func() {
   536  		ep, ok := f.store.Get(resolver.Spec.Datacenter, svc)
   537  		if ok && !reflect.DeepEqual(ep, sent) {
   538  			endpoints <- ep
   539  			sent = ep
   540  		}
   541  	})
   542  	return &fakeStopper{stop}, nil
   543  }
   544  
   545  type fakeStopper struct {
   546  	stop StopFunc
   547  }
   548  
   549  func (f *fakeStopper) Stop() {
   550  	f.stop()
   551  }
   552  
   553  type fakeIstioCertSource struct {
   554  	updateChannel chan IstioCertUpdate
   555  }
   556  
   557  func (src *fakeIstioCertSource) Watch(ctx context.Context) (IstioCertWatcher, error) {
   558  	src.updateChannel = make(chan IstioCertUpdate)
   559  
   560  	return &istioCertWatcher{
   561  		updateChannel: src.updateChannel,
   562  	}, nil
   563  }
   564  

View as plain text