1 // Copyright 2017 The etcd Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package ordering 16 17 import ( 18 "errors" 19 "sync/atomic" 20 21 "go.etcd.io/etcd/client/v3" 22 ) 23 24 type OrderViolationFunc func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error 25 26 var ErrNoGreaterRev = errors.New("etcdclient: no cluster members have a revision higher than the previously received revision") 27 28 func NewOrderViolationSwitchEndpointClosure(c *clientv3.Client) OrderViolationFunc { 29 violationCount := int32(0) 30 return func(_ clientv3.Op, _ clientv3.OpResponse, _ int64) error { 31 // Each request is assigned by round-robin load-balancer's picker to a different 32 // endpoints. If we cycled them 5 times (even with some level of concurrency), 33 // with high probability no endpoint points on a member with fresh data. 34 // TODO: Ideally we should track members (resp.opp.Header) that returned 35 // stale result and explicitly temporarily disable them in 'picker'. 36 if atomic.LoadInt32(&violationCount) > int32(5*len(c.Endpoints())) { 37 return ErrNoGreaterRev 38 } 39 atomic.AddInt32(&violationCount, 1) 40 return nil 41 } 42 } 43