1
2
3
4
5
6
7
8
9
10
11
12
13
14 package cluster
15
16 import (
17 "context"
18 "fmt"
19 "math/rand"
20 "net"
21 "sort"
22 "strconv"
23 "strings"
24 "sync"
25 "time"
26
27 "github.com/go-kit/log"
28 "github.com/go-kit/log/level"
29 "github.com/hashicorp/memberlist"
30 "github.com/oklog/ulid"
31 "github.com/pkg/errors"
32 "github.com/prometheus/client_golang/prometheus"
33 )
34
35
36 type ClusterPeer interface {
37
38 Name() string
39
40 Status() string
41
42 Peers() []ClusterMember
43 }
44
45
46 type ClusterMember interface {
47
48 Name() string
49
50 Address() string
51 }
52
53
54 type ClusterChannel interface {
55 Broadcast([]byte)
56 }
57
58
59 type Peer struct {
60 mlist *memberlist.Memberlist
61 delegate *delegate
62
63 resolvedPeers []string
64
65 mtx sync.RWMutex
66 states map[string]State
67 stopc chan struct{}
68 readyc chan struct{}
69
70 peerLock sync.RWMutex
71 peers map[string]peer
72 failedPeers []peer
73
74 knownPeers []string
75 advertiseAddr string
76
77 failedReconnectionsCounter prometheus.Counter
78 reconnectionsCounter prometheus.Counter
79 failedRefreshCounter prometheus.Counter
80 refreshCounter prometheus.Counter
81 peerLeaveCounter prometheus.Counter
82 peerUpdateCounter prometheus.Counter
83 peerJoinCounter prometheus.Counter
84
85 logger log.Logger
86 }
87
88
89
90 type peer struct {
91 status PeerStatus
92 leaveTime time.Time
93
94 *memberlist.Node
95 }
96
97
98 type PeerStatus int
99
100 const (
101 StatusNone PeerStatus = iota
102 StatusAlive
103 StatusFailed
104 )
105
106 func (s PeerStatus) String() string {
107 switch s {
108 case StatusNone:
109 return "none"
110 case StatusAlive:
111 return "alive"
112 case StatusFailed:
113 return "failed"
114 default:
115 panic(fmt.Sprintf("unknown PeerStatus: %d", s))
116 }
117 }
118
119 const (
120 DefaultPushPullInterval = 60 * time.Second
121 DefaultGossipInterval = 200 * time.Millisecond
122 DefaultTCPTimeout = 10 * time.Second
123 DefaultProbeTimeout = 500 * time.Millisecond
124 DefaultProbeInterval = 1 * time.Second
125 DefaultReconnectInterval = 10 * time.Second
126 DefaultReconnectTimeout = 6 * time.Hour
127 DefaultRefreshInterval = 15 * time.Second
128 MaxGossipPacketSize = 1400
129 )
130
131 func Create(
132 l log.Logger,
133 reg prometheus.Registerer,
134 bindAddr string,
135 advertiseAddr string,
136 knownPeers []string,
137 waitIfEmpty bool,
138 pushPullInterval time.Duration,
139 gossipInterval time.Duration,
140 tcpTimeout time.Duration,
141 probeTimeout time.Duration,
142 probeInterval time.Duration,
143 tlsTransportConfig *TLSTransportConfig,
144 allowInsecureAdvertise bool,
145 ) (*Peer, error) {
146 bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
147 if err != nil {
148 return nil, errors.Wrap(err, "invalid listen address")
149 }
150 bindPort, err := strconv.Atoi(bindPortStr)
151 if err != nil {
152 return nil, errors.Wrapf(err, "address %s: invalid port", bindAddr)
153 }
154
155 var advertiseHost string
156 var advertisePort int
157 if advertiseAddr != "" {
158 var advertisePortStr string
159 advertiseHost, advertisePortStr, err = net.SplitHostPort(advertiseAddr)
160 if err != nil {
161 return nil, errors.Wrap(err, "invalid advertise address")
162 }
163 advertisePort, err = strconv.Atoi(advertisePortStr)
164 if err != nil {
165 return nil, errors.Wrapf(err, "address %s: invalid port", advertiseAddr)
166 }
167 }
168
169 resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, &net.Resolver{}, waitIfEmpty)
170 if err != nil {
171 return nil, errors.Wrap(err, "resolve peers")
172 }
173 level.Debug(l).Log("msg", "resolved peers to following addresses", "peers", strings.Join(resolvedPeers, ","))
174
175
176 addr, err := calculateAdvertiseAddress(bindHost, advertiseHost, allowInsecureAdvertise)
177 if err != nil {
178 level.Warn(l).Log("err", "couldn't deduce an advertise address: "+err.Error())
179 } else if hasNonlocal(resolvedPeers) && isUnroutable(addr.String()) {
180 level.Warn(l).Log("err", "this node advertises itself on an unroutable address", "addr", addr.String())
181 level.Warn(l).Log("err", "this node will be unreachable in the cluster")
182 level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
183 } else if isAny(bindAddr) && advertiseHost == "" {
184
185 level.Info(l).Log("msg", "setting advertise address explicitly", "addr", addr.String(), "port", bindPort)
186 advertiseHost = addr.String()
187 advertisePort = bindPort
188 }
189
190
191 name, err := ulid.New(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))
192 if err != nil {
193 return nil, err
194 }
195
196 p := &Peer{
197 states: map[string]State{},
198 stopc: make(chan struct{}),
199 readyc: make(chan struct{}),
200 logger: l,
201 peers: map[string]peer{},
202 resolvedPeers: resolvedPeers,
203 knownPeers: knownPeers,
204 }
205
206 p.register(reg, name.String())
207
208 retransmit := len(knownPeers) / 2
209 if retransmit < 3 {
210 retransmit = 3
211 }
212 p.delegate = newDelegate(l, reg, p, retransmit)
213
214 cfg := memberlist.DefaultLANConfig()
215 cfg.Name = name.String()
216 cfg.BindAddr = bindHost
217 cfg.BindPort = bindPort
218 cfg.Delegate = p.delegate
219 cfg.Ping = p.delegate
220 cfg.Alive = p.delegate
221 cfg.Events = p.delegate
222 cfg.GossipInterval = gossipInterval
223 cfg.PushPullInterval = pushPullInterval
224 cfg.TCPTimeout = tcpTimeout
225 cfg.ProbeTimeout = probeTimeout
226 cfg.ProbeInterval = probeInterval
227 cfg.LogOutput = &logWriter{l: l}
228 cfg.GossipNodes = retransmit
229 cfg.UDPBufferSize = MaxGossipPacketSize
230
231 if advertiseHost != "" {
232 cfg.AdvertiseAddr = advertiseHost
233 cfg.AdvertisePort = advertisePort
234 p.setInitialFailed(resolvedPeers, fmt.Sprintf("%s:%d", advertiseHost, advertisePort))
235 } else {
236 p.setInitialFailed(resolvedPeers, bindAddr)
237 }
238
239 if tlsTransportConfig != nil {
240 level.Info(l).Log("msg", "using TLS for gossip")
241 cfg.Transport, err = NewTLSTransport(context.Background(), l, reg, cfg.BindAddr, cfg.BindPort, tlsTransportConfig)
242 if err != nil {
243 return nil, errors.Wrap(err, "tls transport")
244 }
245 }
246
247 ml, err := memberlist.Create(cfg)
248 if err != nil {
249 return nil, errors.Wrap(err, "create memberlist")
250 }
251 p.mlist = ml
252 return p, nil
253 }
254
255 func (p *Peer) Join(
256 reconnectInterval time.Duration,
257 reconnectTimeout time.Duration,
258 ) error {
259 n, err := p.mlist.Join(p.resolvedPeers)
260 if err != nil {
261 level.Warn(p.logger).Log("msg", "failed to join cluster", "err", err)
262 if reconnectInterval != 0 {
263 level.Info(p.logger).Log("msg", fmt.Sprintf("will retry joining cluster every %v", reconnectInterval.String()))
264 }
265 } else {
266 level.Debug(p.logger).Log("msg", "joined cluster", "peers", n)
267 }
268
269 if reconnectInterval != 0 {
270 go p.runPeriodicTask(
271 reconnectInterval,
272 p.reconnect,
273 )
274 }
275 if reconnectTimeout != 0 {
276 go p.runPeriodicTask(
277 5*time.Minute,
278 func() { p.removeFailedPeers(reconnectTimeout) },
279 )
280 }
281 go p.runPeriodicTask(
282 DefaultRefreshInterval,
283 p.refresh,
284 )
285
286 return err
287 }
288
289
290
291 func (p *Peer) setInitialFailed(peers []string, myAddr string) {
292 if len(peers) == 0 {
293 return
294 }
295
296 p.peerLock.Lock()
297 defer p.peerLock.Unlock()
298
299 now := time.Now()
300 for _, peerAddr := range peers {
301 if peerAddr == myAddr {
302
303
304 continue
305 }
306 host, port, err := net.SplitHostPort(peerAddr)
307 if err != nil {
308 continue
309 }
310 ip := net.ParseIP(host)
311 if ip == nil {
312
313
314 continue
315 }
316 portUint, err := strconv.ParseUint(port, 10, 16)
317 if err != nil {
318 continue
319 }
320
321 pr := peer{
322 status: StatusFailed,
323 leaveTime: now,
324 Node: &memberlist.Node{
325 Addr: ip,
326 Port: uint16(portUint),
327 },
328 }
329 p.failedPeers = append(p.failedPeers, pr)
330 p.peers[peerAddr] = pr
331 }
332 }
333
334 type logWriter struct {
335 l log.Logger
336 }
337
338 func (l *logWriter) Write(b []byte) (int, error) {
339 return len(b), level.Debug(l.l).Log("memberlist", string(b))
340 }
341
342 func (p *Peer) register(reg prometheus.Registerer, name string) {
343 peerInfo := prometheus.NewGauge(
344 prometheus.GaugeOpts{
345 Name: "alertmanager_cluster_peer_info",
346 Help: "A metric with a constant '1' value labeled by peer name.",
347 ConstLabels: prometheus.Labels{"peer": name},
348 },
349 )
350 peerInfo.Set(1)
351 clusterFailedPeers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
352 Name: "alertmanager_cluster_failed_peers",
353 Help: "Number indicating the current number of failed peers in the cluster.",
354 }, func() float64 {
355 p.peerLock.RLock()
356 defer p.peerLock.RUnlock()
357
358 return float64(len(p.failedPeers))
359 })
360 p.failedReconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
361 Name: "alertmanager_cluster_reconnections_failed_total",
362 Help: "A counter of the number of failed cluster peer reconnection attempts.",
363 })
364
365 p.reconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
366 Name: "alertmanager_cluster_reconnections_total",
367 Help: "A counter of the number of cluster peer reconnections.",
368 })
369
370 p.failedRefreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
371 Name: "alertmanager_cluster_refresh_join_failed_total",
372 Help: "A counter of the number of failed cluster peer joined attempts via refresh.",
373 })
374 p.refreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
375 Name: "alertmanager_cluster_refresh_join_total",
376 Help: "A counter of the number of cluster peer joined via refresh.",
377 })
378
379 p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{
380 Name: "alertmanager_cluster_peers_left_total",
381 Help: "A counter of the number of peers that have left.",
382 })
383 p.peerUpdateCounter = prometheus.NewCounter(prometheus.CounterOpts{
384 Name: "alertmanager_cluster_peers_update_total",
385 Help: "A counter of the number of peers that have updated metadata.",
386 })
387 p.peerJoinCounter = prometheus.NewCounter(prometheus.CounterOpts{
388 Name: "alertmanager_cluster_peers_joined_total",
389 Help: "A counter of the number of peers that have joined.",
390 })
391
392 reg.MustRegister(peerInfo, clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter,
393 p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter)
394 }
395
396 func (p *Peer) runPeriodicTask(d time.Duration, f func()) {
397 tick := time.NewTicker(d)
398 defer tick.Stop()
399
400 for {
401 select {
402 case <-p.stopc:
403 return
404 case <-tick.C:
405 f()
406 }
407 }
408 }
409
410 func (p *Peer) removeFailedPeers(timeout time.Duration) {
411 p.peerLock.Lock()
412 defer p.peerLock.Unlock()
413
414 now := time.Now()
415
416 keep := make([]peer, 0, len(p.failedPeers))
417 for _, pr := range p.failedPeers {
418 if pr.leaveTime.Add(timeout).After(now) {
419 keep = append(keep, pr)
420 } else {
421 level.Debug(p.logger).Log("msg", "failed peer has timed out", "peer", pr.Node, "addr", pr.Address())
422 delete(p.peers, pr.Name)
423 }
424 }
425
426 p.failedPeers = keep
427 }
428
429 func (p *Peer) reconnect() {
430 p.peerLock.RLock()
431 failedPeers := p.failedPeers
432 p.peerLock.RUnlock()
433
434 logger := log.With(p.logger, "msg", "reconnect")
435 for _, pr := range failedPeers {
436
437
438
439 if _, err := p.mlist.Join([]string{pr.Address()}); err != nil {
440 p.failedReconnectionsCounter.Inc()
441 level.Debug(logger).Log("result", "failure", "peer", pr.Node, "addr", pr.Address(), "err", err)
442 } else {
443 p.reconnectionsCounter.Inc()
444 level.Debug(logger).Log("result", "success", "peer", pr.Node, "addr", pr.Address())
445 }
446 }
447 }
448
449 func (p *Peer) refresh() {
450 logger := log.With(p.logger, "msg", "refresh")
451
452 resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, &net.Resolver{}, false)
453 if err != nil {
454 level.Debug(logger).Log("peers", p.knownPeers, "err", err)
455 return
456 }
457
458 members := p.mlist.Members()
459 for _, peer := range resolvedPeers {
460 var isPeerFound bool
461 for _, member := range members {
462 if member.Address() == peer {
463 isPeerFound = true
464 break
465 }
466 }
467
468 if !isPeerFound {
469 if _, err := p.mlist.Join([]string{peer}); err != nil {
470 p.failedRefreshCounter.Inc()
471 level.Warn(logger).Log("result", "failure", "addr", peer, "err", err)
472 } else {
473 p.refreshCounter.Inc()
474 level.Debug(logger).Log("result", "success", "addr", peer)
475 }
476 }
477 }
478 }
479
480 func (p *Peer) peerJoin(n *memberlist.Node) {
481 p.peerLock.Lock()
482 defer p.peerLock.Unlock()
483
484 var oldStatus PeerStatus
485 pr, ok := p.peers[n.Address()]
486 if !ok {
487 oldStatus = StatusNone
488 pr = peer{
489 status: StatusAlive,
490 Node: n,
491 }
492 } else {
493 oldStatus = pr.status
494 pr.Node = n
495 pr.status = StatusAlive
496 pr.leaveTime = time.Time{}
497 }
498
499 p.peers[n.Address()] = pr
500 p.peerJoinCounter.Inc()
501
502 if oldStatus == StatusFailed {
503 level.Debug(p.logger).Log("msg", "peer rejoined", "peer", pr.Node)
504 p.failedPeers = removeOldPeer(p.failedPeers, pr.Address())
505 }
506 }
507
508 func (p *Peer) peerLeave(n *memberlist.Node) {
509 p.peerLock.Lock()
510 defer p.peerLock.Unlock()
511
512 pr, ok := p.peers[n.Address()]
513 if !ok {
514
515
516 return
517 }
518
519 pr.status = StatusFailed
520 pr.leaveTime = time.Now()
521 p.failedPeers = append(p.failedPeers, pr)
522 p.peers[n.Address()] = pr
523
524 p.peerLeaveCounter.Inc()
525 level.Debug(p.logger).Log("msg", "peer left", "peer", pr.Node)
526 }
527
528 func (p *Peer) peerUpdate(n *memberlist.Node) {
529 p.peerLock.Lock()
530 defer p.peerLock.Unlock()
531
532 pr, ok := p.peers[n.Address()]
533 if !ok {
534
535
536 return
537 }
538
539 pr.Node = n
540 p.peers[n.Address()] = pr
541
542 p.peerUpdateCounter.Inc()
543 level.Debug(p.logger).Log("msg", "peer updated", "peer", pr.Node)
544 }
545
546
547
548 func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel {
549 p.mtx.Lock()
550 p.states[key] = s
551 p.mtx.Unlock()
552
553 send := func(b []byte) {
554 p.delegate.bcast.QueueBroadcast(simpleBroadcast(b))
555 }
556 peers := func() []*memberlist.Node {
557 nodes := p.mlist.Members()
558 for i, n := range nodes {
559 if n.String() == p.Self().Name {
560 nodes = append(nodes[:i], nodes[i+1:]...)
561 break
562 }
563 }
564 return nodes
565 }
566 sendOversize := func(n *memberlist.Node, b []byte) error {
567 return p.mlist.SendReliable(n, b)
568 }
569 return NewChannel(key, send, peers, sendOversize, p.logger, p.stopc, reg)
570 }
571
572
573 func (p *Peer) Leave(timeout time.Duration) error {
574 close(p.stopc)
575 level.Debug(p.logger).Log("msg", "leaving cluster")
576 return p.mlist.Leave(timeout)
577 }
578
579
580 func (p *Peer) Name() string {
581 return p.mlist.LocalNode().Name
582 }
583
584
585 func (p *Peer) ClusterSize() int {
586 return p.mlist.NumMembers()
587 }
588
589
590 func (p *Peer) Ready() bool {
591 select {
592 case <-p.readyc:
593 return true
594 default:
595 }
596 return false
597 }
598
599
600 func (p *Peer) WaitReady(ctx context.Context) error {
601 select {
602 case <-ctx.Done():
603 return ctx.Err()
604 case <-p.readyc:
605 return nil
606 }
607 }
608
609
610 func (p *Peer) Status() string {
611 if p.Ready() {
612 return "ready"
613 }
614
615 return "settling"
616 }
617
618
619
620 func (p *Peer) Info() map[string]interface{} {
621 p.mtx.RLock()
622 defer p.mtx.RUnlock()
623
624 return map[string]interface{}{
625 "self": p.mlist.LocalNode(),
626 "members": p.mlist.Members(),
627 }
628 }
629
630
631 func (p *Peer) Self() *memberlist.Node {
632 return p.mlist.LocalNode()
633 }
634
635
636 type Member struct {
637 node *memberlist.Node
638 }
639
640
641 func (m Member) Name() string { return m.node.Name }
642
643
644 func (m Member) Address() string { return m.node.Address() }
645
646
647 func (p *Peer) Peers() []ClusterMember {
648 peers := make([]ClusterMember, 0, len(p.mlist.Members()))
649 for _, member := range p.mlist.Members() {
650 peers = append(peers, Member{
651 node: member,
652 })
653 }
654 return peers
655 }
656
657
658 func (p *Peer) Position() int {
659 all := p.mlist.Members()
660 sort.Slice(all, func(i, j int) bool {
661 return all[i].Name < all[j].Name
662 })
663
664 k := 0
665 for _, n := range all {
666 if n.Name == p.Self().Name {
667 break
668 }
669 k++
670 }
671 return k
672 }
673
674
675
676
677
678
679 func (p *Peer) Settle(ctx context.Context, interval time.Duration) {
680 const NumOkayRequired = 3
681 level.Info(p.logger).Log("msg", "Waiting for gossip to settle...", "interval", interval)
682 start := time.Now()
683 nPeers := 0
684 nOkay := 0
685 totalPolls := 0
686 for {
687 select {
688 case <-ctx.Done():
689 elapsed := time.Since(start)
690 level.Info(p.logger).Log("msg", "gossip not settled but continuing anyway", "polls", totalPolls, "elapsed", elapsed)
691 close(p.readyc)
692 return
693 case <-time.After(interval):
694 }
695 elapsed := time.Since(start)
696 n := len(p.Peers())
697 if nOkay >= NumOkayRequired {
698 level.Info(p.logger).Log("msg", "gossip settled; proceeding", "elapsed", elapsed)
699 break
700 }
701 if n == nPeers {
702 nOkay++
703 level.Debug(p.logger).Log("msg", "gossip looks settled", "elapsed", elapsed)
704 } else {
705 nOkay = 0
706 level.Info(p.logger).Log("msg", "gossip not settled", "polls", totalPolls, "before", nPeers, "now", n, "elapsed", elapsed)
707 }
708 nPeers = n
709 totalPolls++
710 }
711 close(p.readyc)
712 }
713
714
715
716 type State interface {
717
718 MarshalBinary() ([]byte, error)
719
720
721 Merge(b []byte) error
722 }
723
724
725 type simpleBroadcast []byte
726
727 func (b simpleBroadcast) Message() []byte { return []byte(b) }
728 func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false }
729 func (b simpleBroadcast) Finished() {}
730
731 func resolvePeers(ctx context.Context, peers []string, myAddress string, res *net.Resolver, waitIfEmpty bool) ([]string, error) {
732 var resolvedPeers []string
733
734 for _, peer := range peers {
735 host, port, err := net.SplitHostPort(peer)
736 if err != nil {
737 return nil, errors.Wrapf(err, "split host/port for peer %s", peer)
738 }
739
740 retryCtx, cancel := context.WithCancel(ctx)
741 defer cancel()
742
743 ips, err := res.LookupIPAddr(ctx, host)
744 if err != nil {
745
746 resolvedPeers = append(resolvedPeers, peer)
747 continue
748 }
749
750 if len(ips) == 0 {
751 var lookupErrSpotted bool
752
753 err := retry(2*time.Second, retryCtx.Done(), func() error {
754 if lookupErrSpotted {
755
756 cancel()
757 }
758
759 ips, err = res.LookupIPAddr(retryCtx, host)
760 if err != nil {
761 lookupErrSpotted = true
762 return errors.Wrapf(err, "IP Addr lookup for peer %s", peer)
763 }
764
765 ips = removeMyAddr(ips, port, myAddress)
766 if len(ips) == 0 {
767 if !waitIfEmpty {
768 return nil
769 }
770 return errors.New("empty IPAddr result. Retrying")
771 }
772
773 return nil
774 })
775 if err != nil {
776 return nil, err
777 }
778 }
779
780 for _, ip := range ips {
781 resolvedPeers = append(resolvedPeers, net.JoinHostPort(ip.String(), port))
782 }
783 }
784
785 return resolvedPeers, nil
786 }
787
788 func removeMyAddr(ips []net.IPAddr, targetPort, myAddr string) []net.IPAddr {
789 var result []net.IPAddr
790
791 for _, ip := range ips {
792 if net.JoinHostPort(ip.String(), targetPort) == myAddr {
793 continue
794 }
795 result = append(result, ip)
796 }
797
798 return result
799 }
800
801 func hasNonlocal(clusterPeers []string) bool {
802 for _, peer := range clusterPeers {
803 if host, _, err := net.SplitHostPort(peer); err == nil {
804 peer = host
805 }
806 if ip := net.ParseIP(peer); ip != nil && !ip.IsLoopback() {
807 return true
808 } else if ip == nil && strings.ToLower(peer) != "localhost" {
809 return true
810 }
811 }
812 return false
813 }
814
815 func isUnroutable(addr string) bool {
816 if host, _, err := net.SplitHostPort(addr); err == nil {
817 addr = host
818 }
819 if ip := net.ParseIP(addr); ip != nil && (ip.IsUnspecified() || ip.IsLoopback()) {
820 return true
821 } else if ip == nil && strings.ToLower(addr) == "localhost" {
822 return true
823 }
824 return false
825 }
826
827 func isAny(addr string) bool {
828 if host, _, err := net.SplitHostPort(addr); err == nil {
829 addr = host
830 }
831 return addr == "" || net.ParseIP(addr).IsUnspecified()
832 }
833
834
835 func retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
836 tick := time.NewTicker(interval)
837 defer tick.Stop()
838
839 var err error
840 for {
841 if err = f(); err == nil {
842 return nil
843 }
844 select {
845 case <-stopc:
846 return err
847 case <-tick.C:
848 }
849 }
850 }
851
852 func removeOldPeer(old []peer, addr string) []peer {
853 new := make([]peer, 0, len(old))
854 for _, p := range old {
855 if p.Address() != addr {
856 new = append(new, p)
857 }
858 }
859
860 return new
861 }
862
View as plain text