...

Source file src/go.etcd.io/etcd/raft/v3/rafttest/node.go

Documentation: go.etcd.io/etcd/raft/v3/rafttest

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafttest
    16  
    17  import (
    18  	"context"
    19  	"log"
    20  	"math/rand"
    21  	"sync"
    22  	"time"
    23  
    24  	"go.etcd.io/etcd/raft/v3"
    25  	"go.etcd.io/etcd/raft/v3/raftpb"
    26  )
    27  
    28  type node struct {
    29  	raft.Node
    30  	id     uint64
    31  	iface  iface
    32  	stopc  chan struct{}
    33  	pausec chan bool
    34  
    35  	// stable
    36  	storage *raft.MemoryStorage
    37  
    38  	mu    sync.Mutex // guards state
    39  	state raftpb.HardState
    40  }
    41  
    42  func startNode(id uint64, peers []raft.Peer, iface iface) *node {
    43  	st := raft.NewMemoryStorage()
    44  	c := &raft.Config{
    45  		ID:                        id,
    46  		ElectionTick:              10,
    47  		HeartbeatTick:             1,
    48  		Storage:                   st,
    49  		MaxSizePerMsg:             1024 * 1024,
    50  		MaxInflightMsgs:           256,
    51  		MaxUncommittedEntriesSize: 1 << 30,
    52  	}
    53  	rn := raft.StartNode(c, peers)
    54  	n := &node{
    55  		Node:    rn,
    56  		id:      id,
    57  		storage: st,
    58  		iface:   iface,
    59  		pausec:  make(chan bool),
    60  	}
    61  	n.start()
    62  	return n
    63  }
    64  
    65  func (n *node) start() {
    66  	n.stopc = make(chan struct{})
    67  	ticker := time.NewTicker(5 * time.Millisecond).C
    68  
    69  	go func() {
    70  		for {
    71  			select {
    72  			case <-ticker:
    73  				n.Tick()
    74  			case rd := <-n.Ready():
    75  				if !raft.IsEmptyHardState(rd.HardState) {
    76  					n.mu.Lock()
    77  					n.state = rd.HardState
    78  					n.mu.Unlock()
    79  					n.storage.SetHardState(n.state)
    80  				}
    81  				n.storage.Append(rd.Entries)
    82  				time.Sleep(time.Millisecond)
    83  
    84  				// simulate async send, more like real world...
    85  				for _, m := range rd.Messages {
    86  					mlocal := m
    87  					go func() {
    88  						time.Sleep(time.Duration(rand.Int63n(10)) * time.Millisecond)
    89  						n.iface.send(mlocal)
    90  					}()
    91  				}
    92  				n.Advance()
    93  			case m := <-n.iface.recv():
    94  				go n.Step(context.TODO(), m)
    95  			case <-n.stopc:
    96  				n.Stop()
    97  				log.Printf("raft.%d: stop", n.id)
    98  				n.Node = nil
    99  				close(n.stopc)
   100  				return
   101  			case p := <-n.pausec:
   102  				recvms := make([]raftpb.Message, 0)
   103  				for p {
   104  					select {
   105  					case m := <-n.iface.recv():
   106  						recvms = append(recvms, m)
   107  					case p = <-n.pausec:
   108  					}
   109  				}
   110  				// step all pending messages
   111  				for _, m := range recvms {
   112  					n.Step(context.TODO(), m)
   113  				}
   114  			}
   115  		}
   116  	}()
   117  }
   118  
   119  // stop stops the node. stop a stopped node might panic.
   120  // All in memory state of node is discarded.
   121  // All stable MUST be unchanged.
   122  func (n *node) stop() {
   123  	n.iface.disconnect()
   124  	n.stopc <- struct{}{}
   125  	// wait for the shutdown
   126  	<-n.stopc
   127  }
   128  
   129  // restart restarts the node. restart a started node
   130  // blocks and might affect the future stop operation.
   131  func (n *node) restart() {
   132  	// wait for the shutdown
   133  	<-n.stopc
   134  	c := &raft.Config{
   135  		ID:                        n.id,
   136  		ElectionTick:              10,
   137  		HeartbeatTick:             1,
   138  		Storage:                   n.storage,
   139  		MaxSizePerMsg:             1024 * 1024,
   140  		MaxInflightMsgs:           256,
   141  		MaxUncommittedEntriesSize: 1 << 30,
   142  	}
   143  	n.Node = raft.RestartNode(c)
   144  	n.start()
   145  	n.iface.connect()
   146  }
   147  
   148  // pause pauses the node.
   149  // The paused node buffers the received messages and replies
   150  // all of them when it resumes.
   151  func (n *node) pause() {
   152  	n.pausec <- true
   153  }
   154  
   155  // resume resumes the paused node.
   156  func (n *node) resume() {
   157  	n.pausec <- false
   158  }
   159  

View as plain text