1 // Copyright 2019 The etcd Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package tracker 16 17 // Inflights limits the number of MsgApp (represented by the largest index 18 // contained within) sent to followers but not yet acknowledged by them. Callers 19 // use Full() to check whether more messages can be sent, call Add() whenever 20 // they are sending a new append, and release "quota" via FreeLE() whenever an 21 // ack is received. 22 type Inflights struct { 23 // the starting index in the buffer 24 start int 25 // number of inflights in the buffer 26 count int 27 28 // the size of the buffer 29 size int 30 31 // buffer contains the index of the last entry 32 // inside one message. 33 buffer []uint64 34 } 35 36 // NewInflights sets up an Inflights that allows up to 'size' inflight messages. 37 func NewInflights(size int) *Inflights { 38 return &Inflights{ 39 size: size, 40 } 41 } 42 43 // Clone returns an *Inflights that is identical to but shares no memory with 44 // the receiver. 45 func (in *Inflights) Clone() *Inflights { 46 ins := *in 47 ins.buffer = append([]uint64(nil), in.buffer...) 48 return &ins 49 } 50 51 // Add notifies the Inflights that a new message with the given index is being 52 // dispatched. Full() must be called prior to Add() to verify that there is room 53 // for one more message, and consecutive calls to add Add() must provide a 54 // monotonic sequence of indexes. 55 func (in *Inflights) Add(inflight uint64) { 56 if in.Full() { 57 panic("cannot add into a Full inflights") 58 } 59 next := in.start + in.count 60 size := in.size 61 if next >= size { 62 next -= size 63 } 64 if next >= len(in.buffer) { 65 in.grow() 66 } 67 in.buffer[next] = inflight 68 in.count++ 69 } 70 71 // grow the inflight buffer by doubling up to inflights.size. We grow on demand 72 // instead of preallocating to inflights.size to handle systems which have 73 // thousands of Raft groups per process. 74 func (in *Inflights) grow() { 75 newSize := len(in.buffer) * 2 76 if newSize == 0 { 77 newSize = 1 78 } else if newSize > in.size { 79 newSize = in.size 80 } 81 newBuffer := make([]uint64, newSize) 82 copy(newBuffer, in.buffer) 83 in.buffer = newBuffer 84 } 85 86 // FreeLE frees the inflights smaller or equal to the given `to` flight. 87 func (in *Inflights) FreeLE(to uint64) { 88 if in.count == 0 || to < in.buffer[in.start] { 89 // out of the left side of the window 90 return 91 } 92 93 idx := in.start 94 var i int 95 for i = 0; i < in.count; i++ { 96 if to < in.buffer[idx] { // found the first large inflight 97 break 98 } 99 100 // increase index and maybe rotate 101 size := in.size 102 if idx++; idx >= size { 103 idx -= size 104 } 105 } 106 // free i inflights and set new start index 107 in.count -= i 108 in.start = idx 109 if in.count == 0 { 110 // inflights is empty, reset the start index so that we don't grow the 111 // buffer unnecessarily. 112 in.start = 0 113 } 114 } 115 116 // FreeFirstOne releases the first inflight. This is a no-op if nothing is 117 // inflight. 118 func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) } 119 120 // Full returns true if no more messages can be sent at the moment. 121 func (in *Inflights) Full() bool { 122 return in.count == in.size 123 } 124 125 // Count returns the number of inflight messages. 126 func (in *Inflights) Count() int { return in.count } 127 128 // reset frees all inflights. 129 func (in *Inflights) reset() { 130 in.count = 0 131 in.start = 0 132 } 133