...

Source file src/go.etcd.io/etcd/raft/v3/read_only.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import pb "go.etcd.io/etcd/raft/v3/raftpb"
    18  
    19  // ReadState provides state for read only query.
    20  // It's caller's responsibility to call ReadIndex first before getting
    21  // this state from ready, it's also caller's duty to differentiate if this
    22  // state is what it requests through RequestCtx, eg. given a unique id as
    23  // RequestCtx
    24  type ReadState struct {
    25  	Index      uint64
    26  	RequestCtx []byte
    27  }
    28  
    29  type readIndexStatus struct {
    30  	req   pb.Message
    31  	index uint64
    32  	// NB: this never records 'false', but it's more convenient to use this
    33  	// instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If
    34  	// this becomes performance sensitive enough (doubtful), quorum.VoteResult
    35  	// can change to an API that is closer to that of CommittedIndex.
    36  	acks map[uint64]bool
    37  }
    38  
    39  type readOnly struct {
    40  	option           ReadOnlyOption
    41  	pendingReadIndex map[string]*readIndexStatus
    42  	readIndexQueue   []string
    43  }
    44  
    45  func newReadOnly(option ReadOnlyOption) *readOnly {
    46  	return &readOnly{
    47  		option:           option,
    48  		pendingReadIndex: make(map[string]*readIndexStatus),
    49  	}
    50  }
    51  
    52  // addRequest adds a read only request into readonly struct.
    53  // `index` is the commit index of the raft state machine when it received
    54  // the read only request.
    55  // `m` is the original read only request message from the local or remote node.
    56  func (ro *readOnly) addRequest(index uint64, m pb.Message) {
    57  	s := string(m.Entries[0].Data)
    58  	if _, ok := ro.pendingReadIndex[s]; ok {
    59  		return
    60  	}
    61  	ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
    62  	ro.readIndexQueue = append(ro.readIndexQueue, s)
    63  }
    64  
    65  // recvAck notifies the readonly struct that the raft state machine received
    66  // an acknowledgment of the heartbeat that attached with the read only request
    67  // context.
    68  func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
    69  	rs, ok := ro.pendingReadIndex[string(context)]
    70  	if !ok {
    71  		return nil
    72  	}
    73  
    74  	rs.acks[id] = true
    75  	return rs.acks
    76  }
    77  
    78  // advance advances the read only request queue kept by the readonly struct.
    79  // It dequeues the requests until it finds the read only request that has
    80  // the same context as the given `m`.
    81  func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
    82  	var (
    83  		i     int
    84  		found bool
    85  	)
    86  
    87  	ctx := string(m.Context)
    88  	rss := []*readIndexStatus{}
    89  
    90  	for _, okctx := range ro.readIndexQueue {
    91  		i++
    92  		rs, ok := ro.pendingReadIndex[okctx]
    93  		if !ok {
    94  			panic("cannot find corresponding read state from pending map")
    95  		}
    96  		rss = append(rss, rs)
    97  		if okctx == ctx {
    98  			found = true
    99  			break
   100  		}
   101  	}
   102  
   103  	if found {
   104  		ro.readIndexQueue = ro.readIndexQueue[i:]
   105  		for _, rs := range rss {
   106  			delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
   107  		}
   108  		return rss
   109  	}
   110  
   111  	return nil
   112  }
   113  
   114  // lastPendingRequestCtx returns the context of the last pending read only
   115  // request in readonly struct.
   116  func (ro *readOnly) lastPendingRequestCtx() string {
   117  	if len(ro.readIndexQueue) == 0 {
   118  		return ""
   119  	}
   120  	return ro.readIndexQueue[len(ro.readIndexQueue)-1]
   121  }
   122  

View as plain text