...
1 package consulwatch
2
3 import (
4 "context"
5 "fmt"
6
7 consulapi "github.com/hashicorp/consul/api"
8 "github.com/hashicorp/consul/api/watch"
9
10 "github.com/datawire/dlib/dlog"
11 )
12
13 type ServiceWatcher struct {
14 ServiceName string
15 consul *consulapi.Client
16 plan *watch.Plan
17 }
18
19 func New(client *consulapi.Client, datacenter string, service string, onlyHealthy bool) (*ServiceWatcher, error) {
20
21
22
23
24
25
26
27
28 plan, err := watch.Parse(map[string]interface{}{
29 "type": "service",
30 "datacenter": datacenter,
31 "service": service,
32 "passingonly": onlyHealthy,
33 })
34
35 if err != nil {
36 return nil, err
37 }
38
39 return &ServiceWatcher{consul: client, ServiceName: service, plan: plan}, nil
40 }
41
42 func (w *ServiceWatcher) Watch(handler func(endpoints Endpoints, err error)) {
43 w.plan.HybridHandler = func(val watch.BlockingParamVal, raw interface{}) {
44 endpoints := Endpoints{Service: w.ServiceName, Endpoints: []Endpoint{}}
45
46 if raw == nil {
47 handler(endpoints, fmt.Errorf("unexpected empty/nil response from consul"))
48 return
49 }
50
51 v, ok := raw.([]*consulapi.ServiceEntry)
52 if !ok {
53 handler(endpoints, fmt.Errorf("unexpected raw type expected=%T, actual=%T", []*consulapi.ServiceEntry{}, raw))
54 return
55 }
56
57 endpoints.Endpoints = make([]Endpoint, 0)
58 for _, item := range v {
59 tags := make([]string, 0)
60 if item.Service.Tags != nil {
61 tags = item.Service.Tags
62 }
63
64
65
66 endpointAddress := item.Service.Address
67 if endpointAddress == "" {
68 endpointAddress = item.Node.Address
69 }
70
71 endpoints.Endpoints = append(endpoints.Endpoints, Endpoint{
72 Service: item.Service.Service,
73 SystemID: fmt.Sprintf("consul::%s", item.Node.ID),
74 ID: item.Service.ID,
75 Address: endpointAddress,
76 Port: item.Service.Port,
77 Tags: tags,
78 })
79 }
80
81 handler(endpoints, nil)
82 }
83 }
84
85 func (w *ServiceWatcher) Start(ctx context.Context) error {
86 return w.plan.RunWithClientAndLogger(w.consul, dlog.StdLogger(ctx, dlog.LogLevelInfo))
87 }
88
89 func (w *ServiceWatcher) Stop() {
90 w.plan.Stop()
91 }
92
View as plain text