1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3election
16
17 import (
18 "context"
19 "errors"
20
21 "go.etcd.io/etcd/client/v3"
22 "go.etcd.io/etcd/client/v3/concurrency"
23 epb "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
24 )
25
26
27
28 var ErrMissingLeaderKey = errors.New(`"leader" field must be provided`)
29
30 type electionServer struct {
31 c *clientv3.Client
32 }
33
34 func NewElectionServer(c *clientv3.Client) epb.ElectionServer {
35 return &electionServer{c}
36 }
37
38 func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest) (*epb.CampaignResponse, error) {
39 s, err := es.session(ctx, req.Lease)
40 if err != nil {
41 return nil, err
42 }
43 e := concurrency.NewElection(s, string(req.Name))
44 if err = e.Campaign(ctx, string(req.Value)); err != nil {
45 return nil, err
46 }
47 return &epb.CampaignResponse{
48 Header: e.Header(),
49 Leader: &epb.LeaderKey{
50 Name: req.Name,
51 Key: []byte(e.Key()),
52 Rev: e.Rev(),
53 Lease: int64(s.Lease()),
54 },
55 }, nil
56 }
57
58 func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) {
59 if req.Leader == nil {
60 return nil, ErrMissingLeaderKey
61 }
62 s, err := es.session(ctx, req.Leader.Lease)
63 if err != nil {
64 return nil, err
65 }
66 e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
67 if err := e.Proclaim(ctx, string(req.Value)); err != nil {
68 return nil, err
69 }
70 return &epb.ProclaimResponse{Header: e.Header()}, nil
71 }
72
73 func (es *electionServer) Observe(req *epb.LeaderRequest, stream epb.Election_ObserveServer) error {
74 s, err := es.session(stream.Context(), -1)
75 if err != nil {
76 return err
77 }
78 e := concurrency.NewElection(s, string(req.Name))
79 ch := e.Observe(stream.Context())
80 for stream.Context().Err() == nil {
81 select {
82 case <-stream.Context().Done():
83 case resp, ok := <-ch:
84 if !ok {
85 return nil
86 }
87 lresp := &epb.LeaderResponse{Header: resp.Header, Kv: resp.Kvs[0]}
88 if err := stream.Send(lresp); err != nil {
89 return err
90 }
91 }
92 }
93 return stream.Context().Err()
94 }
95
96 func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*epb.LeaderResponse, error) {
97 s, err := es.session(ctx, -1)
98 if err != nil {
99 return nil, err
100 }
101 l, lerr := concurrency.NewElection(s, string(req.Name)).Leader(ctx)
102 if lerr != nil {
103 return nil, lerr
104 }
105 return &epb.LeaderResponse{Header: l.Header, Kv: l.Kvs[0]}, nil
106 }
107
108 func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) {
109 if req.Leader == nil {
110 return nil, ErrMissingLeaderKey
111 }
112 s, err := es.session(ctx, req.Leader.Lease)
113 if err != nil {
114 return nil, err
115 }
116 e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
117 if err := e.Resign(ctx); err != nil {
118 return nil, err
119 }
120 return &epb.ResignResponse{Header: e.Header()}, nil
121 }
122
123 func (es *electionServer) session(ctx context.Context, lease int64) (*concurrency.Session, error) {
124 s, err := concurrency.NewSession(
125 es.c,
126 concurrency.WithLease(clientv3.LeaseID(lease)),
127 concurrency.WithContext(ctx),
128 )
129 if err != nil {
130 return nil, err
131 }
132 s.Orphan()
133 return s, nil
134 }
135
View as plain text