1 package etcd
2
3 import (
4 "context"
5 "crypto/tls"
6 "crypto/x509"
7 "errors"
8 "io/ioutil"
9 "net"
10 "net/http"
11 "time"
12
13 etcd "go.etcd.io/etcd/client/v2"
14 )
15
16 var (
17
18 ErrNoKey = errors.New("no key provided")
19
20
21 ErrNoValue = errors.New("no value provided")
22 )
23
24
25 type Client interface {
26
27
28
29 GetEntries(prefix string) ([]string, error)
30
31
32
33
34
35
36
37
38 WatchPrefix(prefix string, ch chan struct{})
39
40
41 Register(s Service) error
42
43
44 Deregister(s Service) error
45 }
46
47 type client struct {
48 keysAPI etcd.KeysAPI
49 ctx context.Context
50 }
51
52
53
54 type ClientOptions struct {
55 Cert string
56 Key string
57 CACert string
58 DialTimeout time.Duration
59 DialKeepAlive time.Duration
60 HeaderTimeoutPerRequest time.Duration
61 }
62
63
64
65
66
67 func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
68 if options.DialTimeout == 0 {
69 options.DialTimeout = 3 * time.Second
70 }
71 if options.DialKeepAlive == 0 {
72 options.DialKeepAlive = 3 * time.Second
73 }
74 if options.HeaderTimeoutPerRequest == 0 {
75 options.HeaderTimeoutPerRequest = 3 * time.Second
76 }
77
78 transport := etcd.DefaultTransport
79 if options.Cert != "" && options.Key != "" {
80 tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
81 if err != nil {
82 return nil, err
83 }
84 caCertCt, err := ioutil.ReadFile(options.CACert)
85 if err != nil {
86 return nil, err
87 }
88 caCertPool := x509.NewCertPool()
89 caCertPool.AppendCertsFromPEM(caCertCt)
90 transport = &http.Transport{
91 TLSClientConfig: &tls.Config{
92 Certificates: []tls.Certificate{tlsCert},
93 RootCAs: caCertPool,
94 },
95 Dial: func(network, address string) (net.Conn, error) {
96 return (&net.Dialer{
97 Timeout: options.DialTimeout,
98 KeepAlive: options.DialKeepAlive,
99 }).Dial(network, address)
100 },
101 }
102 }
103
104 ce, err := etcd.New(etcd.Config{
105 Endpoints: machines,
106 Transport: transport,
107 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
108 })
109 if err != nil {
110 return nil, err
111 }
112
113 return &client{
114 keysAPI: etcd.NewKeysAPI(ce),
115 ctx: ctx,
116 }, nil
117 }
118
119
120 func (c *client) GetEntries(key string) ([]string, error) {
121 resp, err := c.keysAPI.Get(c.ctx, key, &etcd.GetOptions{Recursive: true})
122 if err != nil {
123 return nil, err
124 }
125
126
127
128
129 if len(resp.Node.Nodes) == 0 && resp.Node.Value != "" {
130 return []string{resp.Node.Value}, nil
131 }
132
133 entries := make([]string, len(resp.Node.Nodes))
134 for i, node := range resp.Node.Nodes {
135 entries[i] = node.Value
136 }
137 return entries, nil
138 }
139
140
141 func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
142 watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
143 ch <- struct{}{}
144 for {
145 if _, err := watch.Next(c.ctx); err != nil {
146 return
147 }
148 ch <- struct{}{}
149 }
150 }
151
152 func (c *client) Register(s Service) error {
153 if s.Key == "" {
154 return ErrNoKey
155 }
156 if s.Value == "" {
157 return ErrNoValue
158 }
159 var err error
160 if s.TTL != nil {
161 _, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{
162 PrevExist: etcd.PrevIgnore,
163 TTL: s.TTL.ttl,
164 })
165 } else {
166 _, err = c.keysAPI.Create(c.ctx, s.Key, s.Value)
167 }
168 return err
169 }
170
171 func (c *client) Deregister(s Service) error {
172 if s.Key == "" {
173 return ErrNoKey
174 }
175 _, err := c.keysAPI.Delete(c.ctx, s.Key, s.DeleteOptions)
176 return err
177 }
178
View as plain text