...

Source file src/github.com/datawire/ambassador/v2/pkg/consulwatch/servicewatcher.go

Documentation: github.com/datawire/ambassador/v2/pkg/consulwatch

     1  package consulwatch
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	consulapi "github.com/hashicorp/consul/api"
     8  	"github.com/hashicorp/consul/api/watch"
     9  
    10  	"github.com/datawire/dlib/dlog"
    11  )
    12  
    13  type ServiceWatcher struct {
    14  	ServiceName string
    15  	consul      *consulapi.Client
    16  	plan        *watch.Plan
    17  }
    18  
    19  func New(client *consulapi.Client, datacenter string, service string, onlyHealthy bool) (*ServiceWatcher, error) {
    20  	// NOTE plombardi@datawire.io, 2019-03-04
    21  	// ======================================
    22  	//
    23  	// Technically we can watch on a specific "tag" for a Consul service. And in theory Consul via its CLI allows
    24  	// watching multiple tags, however, the watch API only allows watching one specific tag which makes it kind of
    25  	// useless unless you want to setup a watch per tag. The better approach and the reason the "tag" argument is not
    26  	// supplied below is because it is conceptually simpler to post-process the array of Endpoints returned during a
    27  	// watch and construct a map of tag names to an array of endpoints.
    28  	plan, err := watch.Parse(map[string]interface{}{
    29  		"type":        "service",
    30  		"datacenter":  datacenter,
    31  		"service":     service,
    32  		"passingonly": onlyHealthy,
    33  	})
    34  
    35  	if err != nil {
    36  		return nil, err
    37  	}
    38  
    39  	return &ServiceWatcher{consul: client, ServiceName: service, plan: plan}, nil
    40  }
    41  
    42  func (w *ServiceWatcher) Watch(handler func(endpoints Endpoints, err error)) {
    43  	w.plan.HybridHandler = func(val watch.BlockingParamVal, raw interface{}) {
    44  		endpoints := Endpoints{Service: w.ServiceName, Endpoints: []Endpoint{}}
    45  
    46  		if raw == nil {
    47  			handler(endpoints, fmt.Errorf("unexpected empty/nil response from consul"))
    48  			return
    49  		}
    50  
    51  		v, ok := raw.([]*consulapi.ServiceEntry)
    52  		if !ok {
    53  			handler(endpoints, fmt.Errorf("unexpected raw type expected=%T, actual=%T", []*consulapi.ServiceEntry{}, raw))
    54  			return
    55  		}
    56  
    57  		endpoints.Endpoints = make([]Endpoint, 0)
    58  		for _, item := range v {
    59  			tags := make([]string, 0)
    60  			if item.Service.Tags != nil {
    61  				tags = item.Service.Tags
    62  			}
    63  
    64  			// Some Consul services, especially those outside of Kubernetes, will not be registered with a `ServiceAddress`.
    65  			// Per Consul HTTP API documentation, this okay and we should fallback to the IP of the node in the `Address` field.
    66  			endpointAddress := item.Service.Address
    67  			if endpointAddress == "" {
    68  				endpointAddress = item.Node.Address
    69  			}
    70  
    71  			endpoints.Endpoints = append(endpoints.Endpoints, Endpoint{
    72  				Service:  item.Service.Service,
    73  				SystemID: fmt.Sprintf("consul::%s", item.Node.ID),
    74  				ID:       item.Service.ID,
    75  				Address:  endpointAddress,
    76  				Port:     item.Service.Port,
    77  				Tags:     tags,
    78  			})
    79  		}
    80  
    81  		handler(endpoints, nil)
    82  	}
    83  }
    84  
    85  func (w *ServiceWatcher) Start(ctx context.Context) error {
    86  	return w.plan.RunWithClientAndLogger(w.consul, dlog.StdLogger(ctx, dlog.LogLevelInfo))
    87  }
    88  
    89  func (w *ServiceWatcher) Stop() {
    90  	w.plan.Stop()
    91  }
    92  

View as plain text