...

Source file src/github.com/go-kit/kit/sd/etcdv3/integration_test.go

Documentation: github.com/go-kit/kit/sd/etcdv3

     1  //go:build flaky_integration
     2  // +build flaky_integration
     3  
     4  package etcdv3
     5  
     6  import (
     7  	"context"
     8  	"io"
     9  	"os"
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/go-kit/kit/endpoint"
    14  	"github.com/go-kit/kit/sd"
    15  	"github.com/go-kit/log"
    16  )
    17  
    18  func runIntegration(settings integrationSettings, client Client, service Service, t *testing.T) {
    19  	// Verify test data is initially empty.
    20  	entries, err := client.GetEntries(settings.key)
    21  	if err != nil {
    22  		t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
    23  	}
    24  	if len(entries) > 0 {
    25  		t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
    26  	}
    27  	t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
    28  
    29  	// Instantiate a new Registrar, passing in test data.
    30  	registrar := NewRegistrar(
    31  		client,
    32  		service,
    33  		log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
    34  	)
    35  
    36  	// Register our instance.
    37  	registrar.Register()
    38  	t.Log("Registered")
    39  
    40  	// Retrieve entries from etcd manually.
    41  	entries, err = client.GetEntries(settings.key)
    42  	if err != nil {
    43  		t.Fatalf("client.GetEntries(%q): %v", settings.key, err)
    44  	}
    45  	if want, have := 1, len(entries); want != have {
    46  		t.Fatalf("client.GetEntries(%q): want %d, have %d", settings.key, want, have)
    47  	}
    48  	if want, have := settings.value, entries[0]; want != have {
    49  		t.Fatalf("want %q, have %q", want, have)
    50  	}
    51  
    52  	instancer, err := NewInstancer(
    53  		client,
    54  		settings.prefix,
    55  		log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
    56  	)
    57  	if err != nil {
    58  		t.Fatalf("NewInstancer: %v", err)
    59  	}
    60  	t.Log("Constructed Instancer OK")
    61  	defer instancer.Stop()
    62  
    63  	endpointer := sd.NewEndpointer(
    64  		instancer,
    65  		func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
    66  		log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
    67  	)
    68  	t.Log("Constructed Endpointer OK")
    69  	defer endpointer.Close()
    70  
    71  	if !within(time.Second, func() bool {
    72  		endpoints, err := endpointer.Endpoints()
    73  		return err == nil && len(endpoints) == 1
    74  	}) {
    75  		t.Fatal("Endpointer didn't see Register in time")
    76  	}
    77  	t.Log("Endpointer saw Register OK")
    78  
    79  	// Deregister first instance of test data.
    80  	registrar.Deregister()
    81  	t.Log("Deregistered")
    82  
    83  	// Check it was deregistered.
    84  	if !within(time.Second, func() bool {
    85  		endpoints, err := endpointer.Endpoints()
    86  		t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err)
    87  		return err == nil && len(endpoints) == 0
    88  	}) {
    89  		t.Fatalf("Endpointer didn't see Deregister in time")
    90  	}
    91  
    92  	// Verify test data no longer exists in etcd.
    93  	entries, err = client.GetEntries(settings.key)
    94  	if err != nil {
    95  		t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
    96  	}
    97  	if len(entries) > 0 {
    98  		t.Fatalf("GetEntries(%q): expected no entries, got %v", settings.key, entries)
    99  	}
   100  	t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
   101  }
   102  
   103  type integrationSettings struct {
   104  	addr     string
   105  	prefix   string
   106  	instance string
   107  	key      string
   108  	value    string
   109  }
   110  
   111  func testIntegrationSettings(t *testing.T) integrationSettings {
   112  	var settings integrationSettings
   113  
   114  	settings.addr = os.Getenv("ETCD_ADDR")
   115  	if settings.addr == "" {
   116  		t.Skip("ETCD_ADDR not set; skipping integration test")
   117  	}
   118  
   119  	settings.prefix = "/services/foosvc/" // known at compile time
   120  	settings.instance = "1.2.3.4:8080"    // taken from runtime or platform, somehow
   121  	settings.key = settings.prefix + settings.instance
   122  	settings.value = "http://" + settings.instance // based on our transport
   123  
   124  	return settings
   125  }
   126  
   127  // Package sd/etcd provides a wrapper around the etcd key/value store. This
   128  // example assumes the user has an instance of etcd installed and running
   129  // locally on port 2379.
   130  func TestIntegration(t *testing.T) {
   131  	settings := testIntegrationSettings(t)
   132  	client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
   133  		DialTimeout:   2 * time.Second,
   134  		DialKeepAlive: 2 * time.Second,
   135  	})
   136  	if err != nil {
   137  		t.Fatalf("NewClient(%q): %v", settings.addr, err)
   138  	}
   139  
   140  	service := Service{
   141  		Key:   settings.key,
   142  		Value: settings.value,
   143  	}
   144  
   145  	runIntegration(settings, client, service, t)
   146  }
   147  
   148  func TestIntegrationTTL(t *testing.T) {
   149  	settings := testIntegrationSettings(t)
   150  	client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
   151  		DialTimeout:   2 * time.Second,
   152  		DialKeepAlive: 2 * time.Second,
   153  	})
   154  	if err != nil {
   155  		t.Fatalf("NewClient(%q): %v", settings.addr, err)
   156  	}
   157  
   158  	service := Service{
   159  		Key:   settings.key,
   160  		Value: settings.value,
   161  		TTL:   NewTTLOption(time.Second*3, time.Second*10),
   162  	}
   163  	defer client.Deregister(service)
   164  
   165  	runIntegration(settings, client, service, t)
   166  }
   167  
   168  func TestIntegrationRegistrarOnly(t *testing.T) {
   169  	settings := testIntegrationSettings(t)
   170  	client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
   171  		DialTimeout:   2 * time.Second,
   172  		DialKeepAlive: 2 * time.Second,
   173  	})
   174  	if err != nil {
   175  		t.Fatalf("NewClient(%q): %v", settings.addr, err)
   176  	}
   177  
   178  	service := Service{
   179  		Key:   settings.key,
   180  		Value: settings.value,
   181  		TTL:   NewTTLOption(time.Second*3, time.Second*10),
   182  	}
   183  	defer client.Deregister(service)
   184  
   185  	// Verify test data is initially empty.
   186  	entries, err := client.GetEntries(settings.key)
   187  	if err != nil {
   188  		t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
   189  	}
   190  	if len(entries) > 0 {
   191  		t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
   192  	}
   193  	t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
   194  
   195  	// Instantiate a new Registrar, passing in test data.
   196  	registrar := NewRegistrar(
   197  		client,
   198  		service,
   199  		log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
   200  	)
   201  
   202  	// Register our instance.
   203  	registrar.Register()
   204  	t.Log("Registered")
   205  
   206  	// Deregister our instance. (so we test registrar only scenario)
   207  	registrar.Deregister()
   208  	t.Log("Deregistered")
   209  
   210  }
   211  
   212  func within(d time.Duration, f func() bool) bool {
   213  	deadline := time.Now().Add(d)
   214  	for time.Now().Before(deadline) {
   215  		if f() {
   216  			return true
   217  		}
   218  		time.Sleep(d / 10)
   219  	}
   220  	return false
   221  }
   222  

View as plain text