...
1 package embed
2
3 import (
4 "context"
5 "errors"
6 "strings"
7
8 "go.etcd.io/etcd/server/v3/embed"
9 "golang.org/x/sync/errgroup"
10
11 "edge-infra.dev/pkg/lib/fog"
12 )
13
14
15 type Cluster struct {
16 initialized bool
17 members []*Member
18 errCh chan error
19 }
20
21
22
23
24
25
26 func NewCluster(members ...*Member) (*Cluster, error) {
27 if len(members) == 0 {
28 return nil, errors.New("failed to initialize cluster, no members provided")
29 }
30
31 c := &Cluster{
32 members: []*Member{},
33 errCh: make(chan error),
34 }
35 c.members = append(c.members, members...)
36
37 return c, nil
38 }
39
40
41
42 func (c *Cluster) ClientURLs() []string {
43 for _, m := range c.members {
44 if m.etcd.Server == nil {
45 continue
46 }
47 return m.etcd.Server.Cluster().ClientURLs()
48 }
49 return []string{}
50 }
51
52
53
54 func (c *Cluster) CurrentCluster() string {
55 var initialCluster string
56 for _, m := range c.members {
57 initialCluster = strings.Join([]string{initialCluster, m.config.embed.InitialClusterFromName(m.config.Name)}, ",")
58 }
59 return strings.Trim(initialCluster, ",")
60 }
61
62
63 func (c *Cluster) Size() int {
64 for _, m := range c.members {
65 if m.etcd.Server == nil {
66 continue
67 }
68 return len(m.etcd.Server.Cluster().Members())
69 }
70 return 0
71 }
72
73
74
75
76
77
78
79
80
81
82 func (c *Cluster) AddMember(member *Member) (func() error, error) {
83 if err := member.prepare(); err != nil {
84 return nil, err
85 }
86 c.members = append(c.members, member)
87
88 var initialCluster string
89 for _, m := range c.members {
90 initialCluster = strings.Join([]string{initialCluster, m.config.embed.InitialClusterFromName(m.config.Name)}, ",")
91 }
92 initialCluster = strings.Trim(initialCluster, ",")
93 member.config.embed.InitialCluster = initialCluster
94
95 return func() error {
96 member.config.embed.ClusterState = embed.ClusterStateFlagExisting
97 return member.start(c.errCh)
98 }, nil
99 }
100
101
102
103
104
105
106
107 func (c *Cluster) prepare() error {
108 for _, member := range c.members {
109 if err := member.prepare(); err != nil {
110 return err
111 }
112 if c.initialized {
113 member.config.embed.ClusterState = embed.ClusterStateFlagExisting
114 }
115 }
116 return nil
117 }
118
119
120
121
122
123
124
125
126
127 func (c *Cluster) Start(ctx context.Context) error {
128 if err := c.prepare(); err != nil {
129 return err
130 }
131
132 if err := c.start(ctx); err != nil {
133 if err := c.Stop(ctx); err != nil {
134 fog.FromContext(ctx).Error(err, "failed to stop server")
135 }
136 return err
137 }
138
139 c.initialized = true
140 return nil
141 }
142
143
144
145
146 func (c *Cluster) start(ctx context.Context) error {
147 errGroup, _ := errgroup.WithContext(ctx)
148
149 for _, member := range c.members {
150 errGroup.Go(startFunc(c, member))
151 }
152 return errGroup.Wait()
153 }
154
155
156
157 func startFunc(cluster *Cluster, member *Member) func() error {
158 return func() error {
159 member.config.embed.InitialCluster = cluster.CurrentCluster()
160 return member.start(cluster.errCh)
161 }
162 }
163
164
165 func (c *Cluster) Stop(ctx context.Context) error {
166 errGroup, _ := errgroup.WithContext(ctx)
167
168 for _, member := range c.members {
169 errGroup.Go(stopFunc(member))
170 }
171
172 return errGroup.Wait()
173 }
174
175
176 func stopFunc(member *Member) func() error {
177 return func() error {
178 return member.stop()
179 }
180 }
181
182
183 func (c *Cluster) Close(ctx context.Context) error {
184 errGroup, _ := errgroup.WithContext(ctx)
185
186 for _, member := range c.members {
187 errGroup.Go(closeFunc(member))
188 }
189
190 return errGroup.Wait()
191 }
192
193
194 func closeFunc(member *Member) func() error {
195 return func() error {
196 return member.close()
197 }
198 }
199
200
201
202 func (c *Cluster) Err() <-chan error {
203 return c.errCh
204 }
205
View as plain text