...

Package etcdv3

import "github.com/go-kit/kit/sd/etcdv3"
Overview
Index
Examples

Overview ▾

Package etcdv3 provides an Instancer and Registrar implementation for etcd v3. If you use etcd v3 as your service discovery system, this package will help you implement the registration and client-side load balancing patterns.

Example

Code:

package etcdv3

import (
    "context"
    "io"
    "time"

    "github.com/go-kit/kit/endpoint"
    "github.com/go-kit/kit/sd"
    "github.com/go-kit/kit/sd/lb"
    "github.com/go-kit/log"
    "google.golang.org/grpc"
)

func Example() {
    // Let's say this is a service that means to register itself.
    // First, we will set up some context.
    var (
        etcdServer = "10.0.0.1:2379"      // in the change from v2 to v3, the schema is no longer necessary if connecting directly to an etcd v3 instance
        prefix     = "/services/foosvc/"  // known at compile time
        instance   = "1.2.3.4:8080"       // taken from runtime or platform, somehow
        key        = prefix + instance    // should be globally unique
        value      = "http://" + instance // based on our transport
        ctx        = context.Background()
    )

    options := ClientOptions{
        // Path to trusted ca file
        CACert: "",

        // Path to certificate
        Cert: "",

        // Path to private key
        Key: "",

        // Username if required
        Username: "",

        // Password if required
        Password: "",

        // If DialTimeout is 0, it defaults to 3s
        DialTimeout: time.Second * 3,

        // If DialKeepAlive is 0, it defaults to 3s
        DialKeepAlive: time.Second * 3,

        // If passing `grpc.WithBlock`, dial connection will block until success.
        DialOptions: []grpc.DialOption{grpc.WithBlock()},
    }

    // Build the client.
    client, err := NewClient(ctx, []string{etcdServer}, options)
    if err != nil {
        panic(err)
    }

    // Build the registrar.
    registrar := NewRegistrar(client, Service{
        Key:   key,
        Value: value,
    }, log.NewNopLogger())

    // Register our instance.
    registrar.Register()

    // At the end of our service lifecycle, for example at the end of func main,
    // we should make sure to deregister ourselves. This is important! Don't
    // accidentally skip this step by invoking a log.Fatal or os.Exit in the
    // interim, which bypasses the defer stack.
    defer registrar.Deregister()

    // It's likely that we'll also want to connect to other services and call
    // their methods. We can build an Instancer to listen for changes from etcd,
    // create Endpointer, wrap it with a load-balancer to pick a single
    // endpoint, and finally wrap it with a retry strategy to get something that
    // can be used as an endpoint directly.
    barPrefix := "/services/barsvc"
    logger := log.NewNopLogger()
    instancer, err := NewInstancer(client, barPrefix, logger)
    if err != nil {
        panic(err)
    }
    endpointer := sd.NewEndpointer(instancer, barFactory, logger)
    balancer := lb.NewRoundRobin(endpointer)
    retry := lb.Retry(3, 3*time.Second, balancer)

    // And now retry can be used like any other endpoint.
    req := struct{}{}
    if _, err = retry(ctx, req); err != nil {
        panic(err)
    }
}

func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil }

Variables

var (
    // ErrNoKey indicates a client method needs a key but receives none.
    ErrNoKey = errors.New("no key provided")

    // ErrNoValue indicates a client method needs a value but receives none.
    ErrNoValue = errors.New("no value provided")
)

type Client

Client is a wrapper around the etcd client.

type Client interface {
    // GetEntries queries the given prefix in etcd and returns a slice
    // containing the values of all keys found, recursively, underneath that
    // prefix.
    GetEntries(prefix string) ([]string, error)

    // WatchPrefix watches the given prefix in etcd for changes. When a change
    // is detected, it will signal on the passed channel. Clients are expected
    // to call GetEntries to update themselves with the latest set of complete
    // values. WatchPrefix will always send an initial sentinel value on the
    // channel after establishing the watch, to ensure that clients always
    // receive the latest set of values. WatchPrefix will block until the
    // context passed to the NewClient constructor is terminated.
    WatchPrefix(prefix string, ch chan struct{})

    // Register a service with etcd.
    Register(s Service) error

    // Deregister a service with etcd.
    Deregister(s Service) error

    // LeaseID returns the lease id created for this service instance
    LeaseID() int64
}

func NewClient

func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error)

NewClient returns Client with a connection to the named machines. It will return an error if a connection to the cluster cannot be made.

type ClientOptions

ClientOptions defines options for the etcd client. All values are optional. If any duration is not specified, a default of 3 seconds will be used.

type ClientOptions struct {
    Cert          string
    Key           string
    CACert        string
    DialTimeout   time.Duration
    DialKeepAlive time.Duration

    // DialOptions is a list of dial options for the gRPC client (e.g., for interceptors).
    // For example, pass grpc.WithBlock() to block until the underlying connection is up.
    // Without this, Dial returns immediately and connecting the server happens in background.
    DialOptions []grpc.DialOption

    Username string
    Password string
}

type Instancer

Instancer yields instances stored in a certain etcd keyspace. Any kind of change in that keyspace is watched and will update the Instancer's Instancers.

type Instancer struct {
    // contains filtered or unexported fields
}

func NewInstancer

func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error)

NewInstancer returns an etcd instancer. It will start watching the given prefix for changes, and update the subscribers.

func (*Instancer) Deregister

func (s *Instancer) Deregister(ch chan<- sd.Event)

Deregister implements Instancer.

func (*Instancer) Register

func (s *Instancer) Register(ch chan<- sd.Event)

Register implements Instancer.

func (*Instancer) Stop

func (s *Instancer) Stop()

Stop terminates the Instancer.

type Registrar

Registrar registers service instance liveness information to etcd.

type Registrar struct {
    // contains filtered or unexported fields
}

func NewRegistrar

func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar

NewRegistrar returns a etcd Registrar acting on the provided catalog registration (service).

func (*Registrar) Deregister

func (r *Registrar) Deregister()

Deregister implements the sd.Registrar interface. Call it when you want your service to be deregistered from etcd, typically just prior to shutdown.

func (*Registrar) Register

func (r *Registrar) Register()

Register implements the sd.Registrar interface. Call it when you want your service to be registered in etcd, typically at startup.

type Service

Service holds the instance identifying data you want to publish to etcd. Key must be unique, and value is the string returned to subscribers, typically called the "instance" string in other parts of package sd.

type Service struct {
    Key   string // unique key, e.g. "/service/foobar/1.2.3.4:8080"
    Value string // returned to subscribers, e.g. "http://1.2.3.4:8080"
    TTL   *TTLOption
}

type TTLOption

TTLOption allow setting a key with a TTL. This option will be used by a loop goroutine which regularly refreshes the lease of the key.

type TTLOption struct {
    // contains filtered or unexported fields
}

func NewTTLOption

func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption

NewTTLOption returns a TTLOption that contains proper TTL settings. Heartbeat is used to refresh the lease of the key periodically; its value should be at least 500ms. TTL defines the lease of the key; its value should be significantly greater than heartbeat.

Good default values might be 3s heartbeat, 10s TTL.