...

Source file src/go.etcd.io/etcd/raft/v3/tracker/inflights.go

Documentation: go.etcd.io/etcd/raft/v3/tracker

     1  // Copyright 2019 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package tracker
    16  
    17  // Inflights limits the number of MsgApp (represented by the largest index
    18  // contained within) sent to followers but not yet acknowledged by them. Callers
    19  // use Full() to check whether more messages can be sent, call Add() whenever
    20  // they are sending a new append, and release "quota" via FreeLE() whenever an
    21  // ack is received.
    22  type Inflights struct {
    23  	// the starting index in the buffer
    24  	start int
    25  	// number of inflights in the buffer
    26  	count int
    27  
    28  	// the size of the buffer
    29  	size int
    30  
    31  	// buffer contains the index of the last entry
    32  	// inside one message.
    33  	buffer []uint64
    34  }
    35  
    36  // NewInflights sets up an Inflights that allows up to 'size' inflight messages.
    37  func NewInflights(size int) *Inflights {
    38  	return &Inflights{
    39  		size: size,
    40  	}
    41  }
    42  
    43  // Clone returns an *Inflights that is identical to but shares no memory with
    44  // the receiver.
    45  func (in *Inflights) Clone() *Inflights {
    46  	ins := *in
    47  	ins.buffer = append([]uint64(nil), in.buffer...)
    48  	return &ins
    49  }
    50  
    51  // Add notifies the Inflights that a new message with the given index is being
    52  // dispatched. Full() must be called prior to Add() to verify that there is room
    53  // for one more message, and consecutive calls to add Add() must provide a
    54  // monotonic sequence of indexes.
    55  func (in *Inflights) Add(inflight uint64) {
    56  	if in.Full() {
    57  		panic("cannot add into a Full inflights")
    58  	}
    59  	next := in.start + in.count
    60  	size := in.size
    61  	if next >= size {
    62  		next -= size
    63  	}
    64  	if next >= len(in.buffer) {
    65  		in.grow()
    66  	}
    67  	in.buffer[next] = inflight
    68  	in.count++
    69  }
    70  
    71  // grow the inflight buffer by doubling up to inflights.size. We grow on demand
    72  // instead of preallocating to inflights.size to handle systems which have
    73  // thousands of Raft groups per process.
    74  func (in *Inflights) grow() {
    75  	newSize := len(in.buffer) * 2
    76  	if newSize == 0 {
    77  		newSize = 1
    78  	} else if newSize > in.size {
    79  		newSize = in.size
    80  	}
    81  	newBuffer := make([]uint64, newSize)
    82  	copy(newBuffer, in.buffer)
    83  	in.buffer = newBuffer
    84  }
    85  
    86  // FreeLE frees the inflights smaller or equal to the given `to` flight.
    87  func (in *Inflights) FreeLE(to uint64) {
    88  	if in.count == 0 || to < in.buffer[in.start] {
    89  		// out of the left side of the window
    90  		return
    91  	}
    92  
    93  	idx := in.start
    94  	var i int
    95  	for i = 0; i < in.count; i++ {
    96  		if to < in.buffer[idx] { // found the first large inflight
    97  			break
    98  		}
    99  
   100  		// increase index and maybe rotate
   101  		size := in.size
   102  		if idx++; idx >= size {
   103  			idx -= size
   104  		}
   105  	}
   106  	// free i inflights and set new start index
   107  	in.count -= i
   108  	in.start = idx
   109  	if in.count == 0 {
   110  		// inflights is empty, reset the start index so that we don't grow the
   111  		// buffer unnecessarily.
   112  		in.start = 0
   113  	}
   114  }
   115  
   116  // FreeFirstOne releases the first inflight. This is a no-op if nothing is
   117  // inflight.
   118  func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
   119  
   120  // Full returns true if no more messages can be sent at the moment.
   121  func (in *Inflights) Full() bool {
   122  	return in.count == in.size
   123  }
   124  
   125  // Count returns the number of inflight messages.
   126  func (in *Inflights) Count() int { return in.count }
   127  
   128  // reset frees all inflights.
   129  func (in *Inflights) reset() {
   130  	in.count = 0
   131  	in.start = 0
   132  }
   133  

View as plain text