...
1
18
19
20
21
22 package weightedtarget
23
24 import (
25 "encoding/json"
26 "fmt"
27 "time"
28
29 "google.golang.org/grpc/balancer"
30 "google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
31 "google.golang.org/grpc/internal/balancergroup"
32 "google.golang.org/grpc/internal/grpclog"
33 "google.golang.org/grpc/internal/hierarchy"
34 "google.golang.org/grpc/internal/pretty"
35 "google.golang.org/grpc/internal/wrr"
36 "google.golang.org/grpc/resolver"
37 "google.golang.org/grpc/serviceconfig"
38 )
39
40
41 const Name = "weighted_target_experimental"
42
43
44
45 var NewRandomWRR = wrr.NewRandom
46
47 func init() {
48 balancer.Register(bb{})
49 }
50
51 type bb struct{}
52
53 func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
54 b := &weightedTargetBalancer{}
55 b.logger = prefixLogger(b)
56 b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
57 b.stateAggregator.Start()
58 b.bg = balancergroup.New(balancergroup.Options{
59 CC: cc,
60 BuildOpts: bOpts,
61 StateAggregator: b.stateAggregator,
62 Logger: b.logger,
63 SubBalancerCloseTimeout: time.Duration(0),
64 })
65 b.bg.Start()
66 b.logger.Infof("Created")
67 return b
68 }
69
70 func (bb) Name() string {
71 return Name
72 }
73
74 func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
75 return parseConfig(c)
76 }
77
78 type weightedTargetBalancer struct {
79 logger *grpclog.PrefixLogger
80
81 bg *balancergroup.BalancerGroup
82 stateAggregator *weightedaggregator.Aggregator
83
84 targets map[string]Target
85 }
86
87
88
89
90 func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
91 b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
92 newConfig, ok := s.BalancerConfig.(*LBConfig)
93 if !ok {
94 return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
95 }
96 addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
97
98 b.stateAggregator.PauseStateUpdates()
99 defer b.stateAggregator.ResumeStateUpdates()
100
101
102 for name := range b.targets {
103 if _, ok := newConfig.Targets[name]; !ok {
104 b.stateAggregator.Remove(name)
105 b.bg.Remove(name)
106 }
107 }
108
109
110
111
112
113
114 for name, newT := range newConfig.Targets {
115 oldT, ok := b.targets[name]
116 if !ok {
117
118 b.stateAggregator.Add(name, newT.Weight)
119
120 b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
121
122
123 } else if newT.ChildPolicy.Name != oldT.ChildPolicy.Name {
124
125
126 b.stateAggregator.Remove(name)
127 b.bg.Remove(name)
128 b.stateAggregator.Add(name, newT.Weight)
129 b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
130 } else if newT.Weight != oldT.Weight {
131
132 b.stateAggregator.UpdateWeight(name, newT.Weight)
133 }
134
135
136
137
138
139
140
141 _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
142 ResolverState: resolver.State{
143 Addresses: addressesSplit[name],
144 ServiceConfig: s.ResolverState.ServiceConfig,
145 Attributes: s.ResolverState.Attributes,
146 },
147 BalancerConfig: newT.ChildPolicy.Config,
148 })
149 }
150
151 b.targets = newConfig.Targets
152
153
154
155
156
157
158
159
160
161 if len(b.targets) == 0 {
162 b.stateAggregator.NeedUpdateStateOnResume()
163 }
164
165 return nil
166 }
167
168 func (b *weightedTargetBalancer) ResolverError(err error) {
169 b.bg.ResolverError(err)
170 }
171
172 func (b *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
173 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
174 }
175
176 func (b *weightedTargetBalancer) Close() {
177 b.stateAggregator.Stop()
178 b.bg.Close()
179 }
180
181 func (b *weightedTargetBalancer) ExitIdle() {
182 b.bg.ExitIdle()
183 }
184
View as plain text