1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "os"
22 "sync"
23
24 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
25 "go.etcd.io/etcd/client/v3"
26 "go.etcd.io/etcd/client/v3/naming/endpoints"
27 "golang.org/x/time/rate"
28
29 "go.uber.org/zap"
30 )
31
32
33 const resolveRetryRate = 1
34
35 type clusterProxy struct {
36 lg *zap.Logger
37 clus clientv3.Cluster
38 ctx context.Context
39
40
41 advaddr string
42 prefix string
43
44 em endpoints.Manager
45
46 umu sync.RWMutex
47 umap map[string]endpoints.Endpoint
48 }
49
50
51
52
53
54 func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
55 if lg == nil {
56 lg = zap.NewNop()
57 }
58
59 var em endpoints.Manager
60 if advaddr != "" && prefix != "" {
61 var err error
62 if em, err = endpoints.NewManager(c, prefix); err != nil {
63 lg.Error("failed to provision endpointsManager", zap.String("prefix", prefix), zap.Error(err))
64 return nil, nil
65 }
66 }
67
68 cp := &clusterProxy{
69 lg: lg,
70 clus: c.Cluster,
71 ctx: c.Ctx(),
72
73 advaddr: advaddr,
74 prefix: prefix,
75 umap: make(map[string]endpoints.Endpoint),
76 em: em,
77 }
78
79 donec := make(chan struct{})
80 if em != nil {
81 go func() {
82 defer close(donec)
83 cp.establishEndpointWatch(prefix)
84 }()
85 return cp, donec
86 }
87
88 close(donec)
89 return cp, donec
90 }
91
92 func (cp *clusterProxy) establishEndpointWatch(prefix string) {
93 rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
94 for rm.Wait(cp.ctx) == nil {
95 wc, err := cp.em.NewWatchChannel(cp.ctx)
96 if err != nil {
97 cp.lg.Warn("failed to establish endpoint watch", zap.String("prefix", prefix), zap.Error(err))
98 continue
99 }
100 cp.monitor(wc)
101 }
102 }
103
104 func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) {
105 for {
106 select {
107 case <-cp.ctx.Done():
108 cp.lg.Info("watching endpoints interrupted", zap.Error(cp.ctx.Err()))
109 return
110 case updates := <-wa:
111 cp.umu.Lock()
112 for _, up := range updates {
113 switch up.Op {
114 case endpoints.Add:
115 cp.umap[up.Key] = up.Endpoint
116 case endpoints.Delete:
117 delete(cp.umap, up.Key)
118 }
119 }
120 cp.umu.Unlock()
121 }
122 }
123 }
124
125 func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
126 if r.IsLearner {
127 return cp.memberAddAsLearner(ctx, r.PeerURLs)
128 }
129 return cp.memberAdd(ctx, r.PeerURLs)
130 }
131
132 func (cp *clusterProxy) memberAdd(ctx context.Context, peerURLs []string) (*pb.MemberAddResponse, error) {
133 mresp, err := cp.clus.MemberAdd(ctx, peerURLs)
134 if err != nil {
135 return nil, err
136 }
137 resp := (pb.MemberAddResponse)(*mresp)
138 return &resp, err
139 }
140
141 func (cp *clusterProxy) memberAddAsLearner(ctx context.Context, peerURLs []string) (*pb.MemberAddResponse, error) {
142 mresp, err := cp.clus.MemberAddAsLearner(ctx, peerURLs)
143 if err != nil {
144 return nil, err
145 }
146 resp := (pb.MemberAddResponse)(*mresp)
147 return &resp, err
148 }
149
150 func (cp *clusterProxy) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
151 mresp, err := cp.clus.MemberRemove(ctx, r.ID)
152 if err != nil {
153 return nil, err
154 }
155 resp := (pb.MemberRemoveResponse)(*mresp)
156 return &resp, err
157 }
158
159 func (cp *clusterProxy) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
160 mresp, err := cp.clus.MemberUpdate(ctx, r.ID, r.PeerURLs)
161 if err != nil {
162 return nil, err
163 }
164 resp := (pb.MemberUpdateResponse)(*mresp)
165 return &resp, err
166 }
167
168 func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) {
169 cp.umu.RLock()
170 defer cp.umu.RUnlock()
171 mbs := make([]*pb.Member, 0, len(cp.umap))
172 for _, upt := range cp.umap {
173 m, err := decodeMeta(fmt.Sprint(upt.Metadata))
174 if err != nil {
175 return nil, err
176 }
177 mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{upt.Addr}})
178 }
179 return mbs, nil
180 }
181
182
183
184
185
186
187 func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) {
188 if cp.advaddr != "" {
189 if cp.prefix != "" {
190 mbs, err := cp.membersFromUpdates()
191 if err != nil {
192 return nil, err
193 }
194 if len(mbs) > 0 {
195 return &pb.MemberListResponse{Members: mbs}, nil
196 }
197 }
198
199 hostname, _ := os.Hostname()
200 return &pb.MemberListResponse{Members: []*pb.Member{{Name: hostname, ClientURLs: []string{cp.advaddr}}}}, nil
201 }
202 mresp, err := cp.clus.MemberList(ctx)
203 if err != nil {
204 return nil, err
205 }
206 resp := (pb.MemberListResponse)(*mresp)
207 return &resp, err
208 }
209
210 func (cp *clusterProxy) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
211
212 return nil, errors.New("not implemented")
213 }
214
View as plain text