...
1 package etcd
2
3 import (
4 "sync"
5 "time"
6
7 etcd "go.etcd.io/etcd/client/v2"
8
9 "github.com/go-kit/log"
10 )
11
12 const minHeartBeatTime = 500 * time.Millisecond
13
14
15 type Registrar struct {
16 client Client
17 service Service
18 logger log.Logger
19
20 quitmtx sync.Mutex
21 quit chan struct{}
22 }
23
24
25
26
27 type Service struct {
28 Key string
29 Value string
30 TTL *TTLOption
31 DeleteOptions *etcd.DeleteOptions
32 }
33
34
35
36 type TTLOption struct {
37 heartbeat time.Duration
38 ttl time.Duration
39 }
40
41
42
43
44
45
46
47 func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption {
48 if heartbeat <= minHeartBeatTime {
49 heartbeat = minHeartBeatTime
50 }
51 if ttl <= heartbeat {
52 ttl = 3 * heartbeat
53 }
54 return &TTLOption{
55 heartbeat: heartbeat,
56 ttl: ttl,
57 }
58 }
59
60
61
62 func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar {
63 return &Registrar{
64 client: client,
65 service: service,
66 logger: log.With(logger, "key", service.Key, "value", service.Value),
67 }
68 }
69
70
71
72 func (r *Registrar) Register() {
73 if err := r.client.Register(r.service); err != nil {
74 r.logger.Log("err", err)
75 } else {
76 r.logger.Log("action", "register")
77 }
78 if r.service.TTL != nil {
79 go r.loop()
80 }
81 }
82
83 func (r *Registrar) loop() {
84 r.quitmtx.Lock()
85 if r.quit != nil {
86 return
87 }
88 r.quit = make(chan struct{})
89 r.quitmtx.Unlock()
90
91 tick := time.NewTicker(r.service.TTL.heartbeat)
92 defer tick.Stop()
93 for {
94 select {
95 case <-tick.C:
96 if err := r.client.Register(r.service); err != nil {
97 r.logger.Log("err", err)
98 }
99 case <-r.quit:
100 return
101 }
102 }
103 }
104
105
106
107 func (r *Registrar) Deregister() {
108 if err := r.client.Deregister(r.service); err != nil {
109 r.logger.Log("err", err)
110 } else {
111 r.logger.Log("action", "deregister")
112 }
113
114 r.quitmtx.Lock()
115 defer r.quitmtx.Unlock()
116 if r.quit != nil {
117 close(r.quit)
118 r.quit = nil
119 }
120 }
121
View as plain text