1 package etcdv3
2
3 import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "time"
8
9 "go.etcd.io/etcd/client/pkg/v3/transport"
10 clientv3 "go.etcd.io/etcd/client/v3"
11 "google.golang.org/grpc"
12 )
13
14 var (
15
16 ErrNoKey = errors.New("no key provided")
17
18
19 ErrNoValue = errors.New("no value provided")
20 )
21
22
23 type Client interface {
24
25
26
27 GetEntries(prefix string) ([]string, error)
28
29
30
31
32
33
34
35
36 WatchPrefix(prefix string, ch chan struct{})
37
38
39 Register(s Service) error
40
41
42 Deregister(s Service) error
43
44
45 LeaseID() int64
46 }
47
48 type client struct {
49 cli *clientv3.Client
50 ctx context.Context
51
52 kv clientv3.KV
53
54
55 watcher clientv3.Watcher
56
57 wctx context.Context
58
59 wcf context.CancelFunc
60
61
62 leaseID clientv3.LeaseID
63
64 hbch <-chan *clientv3.LeaseKeepAliveResponse
65
66 leaser clientv3.Lease
67 }
68
69
70
71 type ClientOptions struct {
72 Cert string
73 Key string
74 CACert string
75 DialTimeout time.Duration
76 DialKeepAlive time.Duration
77
78
79
80
81 DialOptions []grpc.DialOption
82
83 Username string
84 Password string
85 }
86
87
88
89 func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
90 if options.DialTimeout == 0 {
91 options.DialTimeout = 3 * time.Second
92 }
93 if options.DialKeepAlive == 0 {
94 options.DialKeepAlive = 3 * time.Second
95 }
96
97 var err error
98 var tlscfg *tls.Config
99
100 if options.Cert != "" && options.Key != "" {
101 tlsInfo := transport.TLSInfo{
102 CertFile: options.Cert,
103 KeyFile: options.Key,
104 TrustedCAFile: options.CACert,
105 }
106 tlscfg, err = tlsInfo.ClientConfig()
107 if err != nil {
108 return nil, err
109 }
110 }
111
112 cli, err := clientv3.New(clientv3.Config{
113 Context: ctx,
114 Endpoints: machines,
115 DialTimeout: options.DialTimeout,
116 DialKeepAliveTime: options.DialKeepAlive,
117 DialOptions: options.DialOptions,
118 TLS: tlscfg,
119 Username: options.Username,
120 Password: options.Password,
121 })
122 if err != nil {
123 return nil, err
124 }
125
126 return &client{
127 cli: cli,
128 ctx: ctx,
129 kv: clientv3.NewKV(cli),
130 }, nil
131 }
132
133 func (c *client) LeaseID() int64 { return int64(c.leaseID) }
134
135
136 func (c *client) GetEntries(key string) ([]string, error) {
137 resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix())
138 if err != nil {
139 return nil, err
140 }
141
142 entries := make([]string, len(resp.Kvs))
143 for i, kv := range resp.Kvs {
144 entries[i] = string(kv.Value)
145 }
146
147 return entries, nil
148 }
149
150
151 func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
152 c.wctx, c.wcf = context.WithCancel(c.ctx)
153 c.watcher = clientv3.NewWatcher(c.cli)
154
155 wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0))
156 ch <- struct{}{}
157 for wr := range wch {
158 if wr.Canceled {
159 return
160 }
161 ch <- struct{}{}
162 }
163 }
164
165 func (c *client) Register(s Service) error {
166 var err error
167
168 if s.Key == "" {
169 return ErrNoKey
170 }
171 if s.Value == "" {
172 return ErrNoValue
173 }
174
175 if c.leaser != nil {
176 c.leaser.Close()
177 }
178 c.leaser = clientv3.NewLease(c.cli)
179
180 if c.watcher != nil {
181 c.watcher.Close()
182 }
183 c.watcher = clientv3.NewWatcher(c.cli)
184 if c.kv == nil {
185 c.kv = clientv3.NewKV(c.cli)
186 }
187
188 if s.TTL == nil {
189 s.TTL = NewTTLOption(time.Second*3, time.Second*10)
190 }
191
192 grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds()))
193 if err != nil {
194 return err
195 }
196 c.leaseID = grantResp.ID
197
198 _, err = c.kv.Put(
199 c.ctx,
200 s.Key,
201 s.Value,
202 clientv3.WithLease(c.leaseID),
203 )
204 if err != nil {
205 return err
206 }
207
208
209
210 c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID)
211 if err != nil {
212 return err
213 }
214
215
216
217 go func() {
218 for {
219 select {
220 case r := <-c.hbch:
221
222 if r == nil {
223 return
224 }
225 case <-c.ctx.Done():
226 return
227 }
228 }
229 }()
230
231 return nil
232 }
233
234 func (c *client) Deregister(s Service) error {
235 defer c.close()
236
237 if s.Key == "" {
238 return ErrNoKey
239 }
240 if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil {
241 return err
242 }
243
244 return nil
245 }
246
247
248
249 func (c *client) close() {
250 if c.leaser != nil {
251 c.leaser.Close()
252 }
253 if c.watcher != nil {
254 c.watcher.Close()
255 }
256 if c.wcf != nil {
257 c.wcf()
258 }
259 }
260
View as plain text