...

Source file src/go.etcd.io/etcd/client/v3/concurrency/election.go

Documentation: go.etcd.io/etcd/client/v3/concurrency

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package concurrency
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  
    22  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    23  	"go.etcd.io/etcd/api/v3/mvccpb"
    24  	v3 "go.etcd.io/etcd/client/v3"
    25  )
    26  
    27  var (
    28  	ErrElectionNotLeader = errors.New("election: not leader")
    29  	ErrElectionNoLeader  = errors.New("election: no leader")
    30  )
    31  
    32  type Election struct {
    33  	session *Session
    34  
    35  	keyPrefix string
    36  
    37  	leaderKey     string
    38  	leaderRev     int64
    39  	leaderSession *Session
    40  	hdr           *pb.ResponseHeader
    41  }
    42  
    43  // NewElection returns a new election on a given key prefix.
    44  func NewElection(s *Session, pfx string) *Election {
    45  	return &Election{session: s, keyPrefix: pfx + "/"}
    46  }
    47  
    48  // ResumeElection initializes an election with a known leader.
    49  func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
    50  	return &Election{
    51  		keyPrefix:     pfx,
    52  		session:       s,
    53  		leaderKey:     leaderKey,
    54  		leaderRev:     leaderRev,
    55  		leaderSession: s,
    56  	}
    57  }
    58  
    59  // Campaign puts a value as eligible for the election on the prefix
    60  // key.
    61  // Multiple sessions can participate in the election for the
    62  // same prefix, but only one can be the leader at a time.
    63  //
    64  // If the context is 'context.TODO()/context.Background()', the Campaign
    65  // will continue to be blocked for other keys to be deleted, unless server
    66  // returns a non-recoverable error (e.g. ErrCompacted).
    67  // Otherwise, until the context is not cancelled or timed-out, Campaign will
    68  // continue to be blocked until it becomes the leader.
    69  func (e *Election) Campaign(ctx context.Context, val string) error {
    70  	s := e.session
    71  	client := e.session.Client()
    72  
    73  	k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
    74  	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
    75  	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
    76  	txn = txn.Else(v3.OpGet(k))
    77  	resp, err := txn.Commit()
    78  	if err != nil {
    79  		return err
    80  	}
    81  	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
    82  	if !resp.Succeeded {
    83  		kv := resp.Responses[0].GetResponseRange().Kvs[0]
    84  		e.leaderRev = kv.CreateRevision
    85  		if string(kv.Value) != val {
    86  			if err = e.Proclaim(ctx, val); err != nil {
    87  				e.Resign(ctx)
    88  				return err
    89  			}
    90  		}
    91  	}
    92  
    93  	_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
    94  	if err != nil {
    95  		// clean up in case of context cancel
    96  		select {
    97  		case <-ctx.Done():
    98  			e.Resign(client.Ctx())
    99  		default:
   100  			e.leaderSession = nil
   101  		}
   102  		return err
   103  	}
   104  	e.hdr = resp.Header
   105  
   106  	return nil
   107  }
   108  
   109  // Proclaim lets the leader announce a new value without another election.
   110  func (e *Election) Proclaim(ctx context.Context, val string) error {
   111  	if e.leaderSession == nil {
   112  		return ErrElectionNotLeader
   113  	}
   114  	client := e.session.Client()
   115  	cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
   116  	txn := client.Txn(ctx).If(cmp)
   117  	txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
   118  	tresp, terr := txn.Commit()
   119  	if terr != nil {
   120  		return terr
   121  	}
   122  	if !tresp.Succeeded {
   123  		e.leaderKey = ""
   124  		return ErrElectionNotLeader
   125  	}
   126  
   127  	e.hdr = tresp.Header
   128  	return nil
   129  }
   130  
   131  // Resign lets a leader start a new election.
   132  func (e *Election) Resign(ctx context.Context) (err error) {
   133  	if e.leaderSession == nil {
   134  		return nil
   135  	}
   136  	client := e.session.Client()
   137  	cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
   138  	resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
   139  	if err == nil {
   140  		e.hdr = resp.Header
   141  	}
   142  	e.leaderKey = ""
   143  	e.leaderSession = nil
   144  	return err
   145  }
   146  
   147  // Leader returns the leader value for the current election.
   148  func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
   149  	client := e.session.Client()
   150  	resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
   151  	if err != nil {
   152  		return nil, err
   153  	} else if len(resp.Kvs) == 0 {
   154  		// no leader currently elected
   155  		return nil, ErrElectionNoLeader
   156  	}
   157  	return resp, nil
   158  }
   159  
   160  // Observe returns a channel that reliably observes ordered leader proposals
   161  // as GetResponse values on every current elected leader key. It will not
   162  // necessarily fetch all historical leader updates, but will always post the
   163  // most recent leader value.
   164  //
   165  // The channel closes when the context is canceled or the underlying watcher
   166  // is otherwise disrupted.
   167  func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
   168  	retc := make(chan v3.GetResponse)
   169  	go e.observe(ctx, retc)
   170  	return retc
   171  }
   172  
   173  func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
   174  	client := e.session.Client()
   175  
   176  	defer close(ch)
   177  	for {
   178  		resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
   179  		if err != nil {
   180  			return
   181  		}
   182  
   183  		var kv *mvccpb.KeyValue
   184  		var hdr *pb.ResponseHeader
   185  
   186  		if len(resp.Kvs) == 0 {
   187  			cctx, cancel := context.WithCancel(ctx)
   188  			// wait for first key put on prefix
   189  			opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
   190  			wch := client.Watch(cctx, e.keyPrefix, opts...)
   191  			for kv == nil {
   192  				wr, ok := <-wch
   193  				if !ok || wr.Err() != nil {
   194  					cancel()
   195  					return
   196  				}
   197  				// only accept puts; a delete will make observe() spin
   198  				for _, ev := range wr.Events {
   199  					if ev.Type == mvccpb.PUT {
   200  						hdr, kv = &wr.Header, ev.Kv
   201  						// may have multiple revs; hdr.rev = the last rev
   202  						// set to kv's rev in case batch has multiple Puts
   203  						hdr.Revision = kv.ModRevision
   204  						break
   205  					}
   206  				}
   207  			}
   208  			cancel()
   209  		} else {
   210  			hdr, kv = resp.Header, resp.Kvs[0]
   211  		}
   212  
   213  		select {
   214  		case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
   215  		case <-ctx.Done():
   216  			return
   217  		}
   218  
   219  		cctx, cancel := context.WithCancel(ctx)
   220  		wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
   221  		keyDeleted := false
   222  		for !keyDeleted {
   223  			wr, ok := <-wch
   224  			if !ok {
   225  				cancel()
   226  				return
   227  			}
   228  			for _, ev := range wr.Events {
   229  				if ev.Type == mvccpb.DELETE {
   230  					keyDeleted = true
   231  					break
   232  				}
   233  				resp.Header = &wr.Header
   234  				resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
   235  				select {
   236  				case ch <- *resp:
   237  				case <-cctx.Done():
   238  					cancel()
   239  					return
   240  				}
   241  			}
   242  		}
   243  		cancel()
   244  	}
   245  }
   246  
   247  // Key returns the leader key if elected, empty string otherwise.
   248  func (e *Election) Key() string { return e.leaderKey }
   249  
   250  // Rev returns the leader key's creation revision, if elected.
   251  func (e *Election) Rev() int64 { return e.leaderRev }
   252  
   253  // Header is the response header from the last successful election proposal.
   254  func (e *Election) Header() *pb.ResponseHeader { return e.hdr }
   255  

View as plain text