...
1
18
19 package priority
20
21 import (
22 "time"
23
24 "google.golang.org/grpc/balancer"
25 "google.golang.org/grpc/balancer/base"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/resolver"
28 "google.golang.org/grpc/serviceconfig"
29 )
30
31 type childBalancer struct {
32 name string
33 parent *priorityBalancer
34 parentCC balancer.ClientConn
35 balancerName string
36 cc *ignoreResolveNowClientConn
37
38 ignoreReresolutionRequests bool
39 config serviceconfig.LoadBalancingConfig
40 rState resolver.State
41
42 started bool
43
44
45
46
47
48 reportedTF bool
49
50 state balancer.State
51
52
53 initTimer *timerWrapper
54 }
55
56
57
58 func newChildBalancer(name string, parent *priorityBalancer, balancerName string, cc balancer.ClientConn) *childBalancer {
59 return &childBalancer{
60 name: name,
61 parent: parent,
62 parentCC: cc,
63 balancerName: balancerName,
64 cc: newIgnoreResolveNowClientConn(cc, false),
65 started: false,
66
67
68
69 state: balancer.State{
70 ConnectivityState: connectivity.Connecting,
71 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
72 },
73 }
74 }
75
76
77
78
79
80 func (cb *childBalancer) updateBalancerName(balancerName string) {
81 cb.balancerName = balancerName
82 cb.cc = newIgnoreResolveNowClientConn(cb.parentCC, cb.ignoreReresolutionRequests)
83 }
84
85
86
87 func (cb *childBalancer) updateConfig(child *Child, rState resolver.State) {
88 cb.ignoreReresolutionRequests = child.IgnoreReresolutionRequests
89 cb.config = child.Config.Config
90 cb.rState = rState
91 if cb.started {
92 cb.sendUpdate()
93 }
94 }
95
96
97
98
99 func (cb *childBalancer) start() {
100 if cb.started {
101 return
102 }
103 cb.started = true
104 cb.parent.bg.AddWithClientConn(cb.name, cb.balancerName, cb.cc)
105 cb.startInitTimer()
106 cb.sendUpdate()
107 }
108
109
110 func (cb *childBalancer) sendUpdate() {
111 cb.cc.updateIgnoreResolveNow(cb.ignoreReresolutionRequests)
112
113 err := cb.parent.bg.UpdateClientConnState(cb.name, balancer.ClientConnState{
114 ResolverState: cb.rState,
115 BalancerConfig: cb.config,
116 })
117 if err != nil {
118 cb.parent.logger.Warningf("Failed to update state for child policy %q: %v", cb.name, err)
119 }
120 }
121
122
123
124
125
126
127 func (cb *childBalancer) stop() {
128 if !cb.started {
129 return
130 }
131 cb.stopInitTimer()
132 cb.parent.bg.Remove(cb.name)
133 cb.started = false
134 cb.state = balancer.State{
135 ConnectivityState: connectivity.Connecting,
136 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
137 }
138
139
140 cb.reportedTF = false
141 }
142
143 func (cb *childBalancer) startInitTimer() {
144 if cb.initTimer != nil {
145 return
146 }
147
148
149 timerW := &timerWrapper{}
150 cb.initTimer = timerW
151 timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
152 cb.parent.mu.Lock()
153 defer cb.parent.mu.Unlock()
154 if timerW.stopped {
155 return
156 }
157 cb.initTimer = nil
158
159
160
161 cb.parent.syncPriority("")
162 })
163 }
164
165 func (cb *childBalancer) stopInitTimer() {
166 timerW := cb.initTimer
167 if timerW == nil {
168 return
169 }
170 cb.initTimer = nil
171 timerW.stopped = true
172 timerW.timer.Stop()
173 }
174
View as plain text