1 package endpoints
2
3
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "strings"
10
11 clientv3 "go.etcd.io/etcd/client/v3"
12 "go.etcd.io/etcd/client/v3/naming/endpoints/internal"
13
14 "go.uber.org/zap"
15 "google.golang.org/grpc/codes"
16 "google.golang.org/grpc/status"
17 )
18
19 type endpointManager struct {
20
21 client *clientv3.Client
22 target string
23 }
24
25
26 func NewManager(client *clientv3.Client, target string) (Manager, error) {
27 if client == nil {
28 return nil, errors.New("invalid etcd client")
29 }
30
31 if target == "" {
32 return nil, errors.New("invalid target")
33 }
34
35 em := &endpointManager{
36 client: client,
37 target: target,
38 }
39 return em, nil
40 }
41
42 func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) {
43 ops := make([]clientv3.Op, 0, len(updates))
44 for _, update := range updates {
45 if !strings.HasPrefix(update.Key, m.target+"/") {
46 return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
47 }
48
49 switch update.Op {
50 case Add:
51 internalUpdate := &internal.Update{
52 Op: internal.Add,
53 Addr: update.Endpoint.Addr,
54 Metadata: update.Endpoint.Metadata,
55 }
56
57 var v []byte
58 if v, err = json.Marshal(internalUpdate); err != nil {
59 return status.Error(codes.InvalidArgument, err.Error())
60 }
61 ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...))
62 case Delete:
63 ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...))
64 default:
65 return status.Error(codes.InvalidArgument, "endpoints: bad update op")
66 }
67 }
68 _, err = m.client.KV.Txn(ctx).Then(ops...).Commit()
69 return err
70 }
71
72 func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
73 return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
74 }
75
76 func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
77 return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
78 }
79
80 func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
81 key := m.target + "/"
82 resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
83 if err != nil {
84 return nil, err
85 }
86
87 lg := m.client.GetLogger()
88 initUpdates := make([]*Update, 0, len(resp.Kvs))
89 for _, kv := range resp.Kvs {
90 var iup internal.Update
91 if err := json.Unmarshal(kv.Value, &iup); err != nil {
92 lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
93 continue
94 }
95 up := &Update{
96 Op: Add,
97 Key: string(kv.Key),
98 Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
99 }
100 initUpdates = append(initUpdates, up)
101 }
102
103 upch := make(chan []*Update, 1)
104 if len(initUpdates) > 0 {
105 upch <- initUpdates
106 }
107 go m.watch(ctx, resp.Header.Revision+1, upch)
108 return upch, nil
109 }
110
111 func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
112 defer close(upch)
113
114 lg := m.client.GetLogger()
115 opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
116 key := m.target + "/"
117 wch := m.client.Watch(ctx, key, opts...)
118 for {
119 select {
120 case <-ctx.Done():
121 return
122 case wresp, ok := <-wch:
123 if !ok {
124 lg.Warn("watch closed", zap.String("target", m.target))
125 return
126 }
127 if wresp.Err() != nil {
128 lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
129 return
130 }
131
132 deltaUps := make([]*Update, 0, len(wresp.Events))
133 for _, e := range wresp.Events {
134 var iup internal.Update
135 var err error
136 var op Operation
137 switch e.Type {
138 case clientv3.EventTypePut:
139 err = json.Unmarshal(e.Kv.Value, &iup)
140 op = Add
141 if err != nil {
142 lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
143 continue
144 }
145 case clientv3.EventTypeDelete:
146 iup = internal.Update{Op: internal.Delete}
147 op = Delete
148 default:
149 continue
150 }
151 up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
152 deltaUps = append(deltaUps, up)
153 }
154 if len(deltaUps) > 0 {
155 upch <- deltaUps
156 }
157 }
158 }
159 }
160
161 func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
162 key := m.target + "/"
163 resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
164 if err != nil {
165 return nil, err
166 }
167
168 eps := make(Key2EndpointMap)
169 for _, kv := range resp.Kvs {
170 var iup internal.Update
171 if err := json.Unmarshal(kv.Value, &iup); err != nil {
172 continue
173 }
174
175 eps[string(kv.Key)] = Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}
176 }
177 return eps, nil
178 }
179
View as plain text