...
1
18
19 package ringhash
20
21 import (
22 "fmt"
23
24 "google.golang.org/grpc/balancer"
25 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/internal/grpclog"
28 "google.golang.org/grpc/status"
29 )
30
31 type picker struct {
32 ring *ring
33 logger *grpclog.PrefixLogger
34 subConnStates map[*subConn]connectivity.State
35 }
36
37 func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
38 states := make(map[*subConn]connectivity.State)
39 for _, e := range ring.items {
40 states[e.sc] = e.sc.effectiveState()
41 }
42 return &picker{ring: ring, logger: logger, subConnStates: states}
43 }
44
45
46
47
48
49 type handleRICSResult struct {
50 pr balancer.PickResult
51 err error
52 }
53
54
55
56
57
58
59
60
61 func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
62 switch state := p.subConnStates[e.sc]; state {
63 case connectivity.Ready:
64 return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true
65 case connectivity.Idle:
66
67 e.sc.queueConnect()
68 return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
69 case connectivity.Connecting:
70 return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
71 case connectivity.TransientFailure:
72
73 return handleRICSResult{}, false
74 case connectivity.Shutdown:
75
76
77 return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
78 default:
79
80
81 p.logger.Errorf("SubConn has undefined connectivity state: %v", state)
82 return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true
83 }
84 }
85
86 func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
87 e := p.ring.pick(getRequestHash(info.Ctx))
88 if hr, ok := p.handleRICS(e); ok {
89 return hr.pr, hr.err
90 }
91
92 return p.handleTransientFailure(e)
93 }
94
95 func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) {
96
97 e.sc.queueConnect()
98
99
100 e2 := nextSkippingDuplicates(p.ring, e)
101 if e2 == nil {
102
103 return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure")
104 }
105
106
107
108 if hr, ok := p.handleRICS(e2); ok {
109 return hr.pr, hr.err
110 }
111
112
113 e2.sc.queueConnect()
114
115
116
117
118
119
120
121
122
123
124 var firstNonFailedFound bool
125 for ee := nextSkippingDuplicates(p.ring, e2); ee != e; ee = nextSkippingDuplicates(p.ring, ee) {
126 scState := p.subConnStates[ee.sc]
127 if scState == connectivity.Ready {
128 return balancer.PickResult{SubConn: ee.sc.sc}, nil
129 }
130 if firstNonFailedFound {
131 continue
132 }
133 if scState == connectivity.TransientFailure {
134
135 ee.sc.queueConnect()
136 continue
137 }
138
139
140
141 firstNonFailedFound = true
142 if scState == connectivity.Idle {
143
144
145 ee.sc.queueConnect()
146 }
147 }
148 return balancer.PickResult{}, fmt.Errorf("no connection is Ready")
149 }
150
151
152
153 func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
154 for next := ring.next(entry); next != entry; next = ring.next(next) {
155 if next.sc != entry.sc {
156 return next
157 }
158 }
159
160 return nil
161 }
162
163
164
165 func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn {
166 var entry *ringEntry
167 for _, it := range ring.items {
168 if it.sc == sc {
169 entry = it
170 break
171 }
172 }
173 if entry == nil {
174
175
176 if len(ring.items) > 0 {
177 return ring.items[0].sc
178 }
179 return nil
180 }
181 ee := nextSkippingDuplicates(ring, entry)
182 if ee == nil {
183 return nil
184 }
185 return ee.sc
186 }
187
View as plain text