...

Source file src/go.etcd.io/etcd/server/v3/proxy/grpcproxy/leader.go

Documentation: go.etcd.io/etcd/server/v3/proxy/grpcproxy

     1  // Copyright 2017 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package grpcproxy
    16  
    17  import (
    18  	"context"
    19  	"math"
    20  	"sync"
    21  
    22  	"go.etcd.io/etcd/client/v3"
    23  
    24  	"golang.org/x/time/rate"
    25  )
    26  
    27  const (
    28  	lostLeaderKey  = "__lostleader" // watched to detect leader loss
    29  	retryPerSecond = 10
    30  )
    31  
    32  type leader struct {
    33  	ctx context.Context
    34  	w   clientv3.Watcher
    35  	// mu protects leaderc updates.
    36  	mu       sync.RWMutex
    37  	leaderc  chan struct{}
    38  	disconnc chan struct{}
    39  	donec    chan struct{}
    40  }
    41  
    42  func newLeader(ctx context.Context, w clientv3.Watcher) *leader {
    43  	l := &leader{
    44  		ctx:      clientv3.WithRequireLeader(ctx),
    45  		w:        w,
    46  		leaderc:  make(chan struct{}),
    47  		disconnc: make(chan struct{}),
    48  		donec:    make(chan struct{}),
    49  	}
    50  	// begin assuming leader is lost
    51  	close(l.leaderc)
    52  	go l.recvLoop()
    53  	return l
    54  }
    55  
    56  func (l *leader) recvLoop() {
    57  	defer close(l.donec)
    58  
    59  	limiter := rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond)
    60  	rev := int64(math.MaxInt64 - 2)
    61  	for limiter.Wait(l.ctx) == nil {
    62  		wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify())
    63  		cresp, ok := <-wch
    64  		if !ok {
    65  			l.loseLeader()
    66  			continue
    67  		}
    68  		if cresp.Err() != nil {
    69  			l.loseLeader()
    70  			if clientv3.IsConnCanceled(cresp.Err()) {
    71  				close(l.disconnc)
    72  				return
    73  			}
    74  			continue
    75  		}
    76  		l.gotLeader()
    77  		<-wch
    78  		l.loseLeader()
    79  	}
    80  }
    81  
    82  func (l *leader) loseLeader() {
    83  	l.mu.RLock()
    84  	defer l.mu.RUnlock()
    85  	select {
    86  	case <-l.leaderc:
    87  	default:
    88  		close(l.leaderc)
    89  	}
    90  }
    91  
    92  // gotLeader will force update the leadership status to having a leader.
    93  func (l *leader) gotLeader() {
    94  	l.mu.Lock()
    95  	defer l.mu.Unlock()
    96  	select {
    97  	case <-l.leaderc:
    98  		l.leaderc = make(chan struct{})
    99  	default:
   100  	}
   101  }
   102  
   103  func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc }
   104  
   105  func (l *leader) stopNotify() <-chan struct{} { return l.donec }
   106  
   107  // lostNotify returns a channel that is closed if there has been
   108  // a leader loss not yet followed by a leader reacquire.
   109  func (l *leader) lostNotify() <-chan struct{} {
   110  	l.mu.RLock()
   111  	defer l.mu.RUnlock()
   112  	return l.leaderc
   113  }
   114  

View as plain text