...
1
18
19
20
21
22
23 package wrrlocality
24
25 import (
26 "encoding/json"
27 "errors"
28 "fmt"
29
30 "google.golang.org/grpc/balancer"
31 "google.golang.org/grpc/balancer/weightedtarget"
32 "google.golang.org/grpc/internal/grpclog"
33 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
34 "google.golang.org/grpc/resolver"
35 "google.golang.org/grpc/serviceconfig"
36 "google.golang.org/grpc/xds/internal"
37 )
38
39
40 const Name = "xds_wrr_locality_experimental"
41
42 func init() {
43 balancer.Register(bb{})
44 }
45
46 type bb struct{}
47
48 func (bb) Name() string {
49 return Name
50 }
51
52
53 type LBConfig struct {
54 serviceconfig.LoadBalancingConfig `json:"-"`
55
56 ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
57 }
58
59
60 var weightedTargetName = weightedtarget.Name
61
62 func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
63 builder := balancer.Get(weightedTargetName)
64 if builder == nil {
65
66
67 return nil
68 }
69
70
71
72 wtb := builder.Build(cc, bOpts)
73 if wtb == nil {
74
75 return nil
76 }
77 wtbCfgParser, ok := builder.(balancer.ConfigParser)
78 if !ok {
79
80 return nil
81 }
82 wrrL := &wrrLocalityBalancer{
83 child: wtb,
84 childParser: wtbCfgParser,
85 }
86
87 wrrL.logger = prefixLogger(wrrL)
88 wrrL.logger.Infof("Created")
89 return wrrL
90 }
91
92 func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
93 var lbCfg *LBConfig
94 if err := json.Unmarshal(s, &lbCfg); err != nil {
95 return nil, fmt.Errorf("xds_wrr_locality: invalid LBConfig: %s, error: %v", string(s), err)
96 }
97 if lbCfg == nil || lbCfg.ChildPolicy == nil {
98 return nil, errors.New("xds_wrr_locality: invalid LBConfig: child policy field must be set")
99 }
100 return lbCfg, nil
101 }
102
103 type attributeKey struct{}
104
105
106 func (a AddrInfo) Equal(o any) bool {
107 oa, ok := o.(AddrInfo)
108 return ok && oa.LocalityWeight == a.LocalityWeight
109 }
110
111
112 type AddrInfo struct {
113 LocalityWeight uint32
114 }
115
116
117
118 func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
119 addr.BalancerAttributes = addr.BalancerAttributes.WithValue(attributeKey{}, addrInfo)
120 return addr
121 }
122
123 func (a AddrInfo) String() string {
124 return fmt.Sprintf("Locality Weight: %d", a.LocalityWeight)
125 }
126
127
128
129 func getAddrInfo(addr resolver.Address) (AddrInfo, bool) {
130 v := addr.BalancerAttributes.Value(attributeKey{})
131 ai, ok := v.(AddrInfo)
132 return ai, ok
133 }
134
135
136
137
138
139 type wrrLocalityBalancer struct {
140
141
142
143 child balancer.Balancer
144
145 childParser balancer.ConfigParser
146
147 logger *grpclog.PrefixLogger
148 }
149
150 func (b *wrrLocalityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
151 lbCfg, ok := s.BalancerConfig.(*LBConfig)
152 if !ok {
153 b.logger.Errorf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
154 return balancer.ErrBadResolverState
155 }
156
157 weightedTargets := make(map[string]weightedtarget.Target)
158 for _, addr := range s.ResolverState.Addresses {
159
160
161
162
163 locality, err := internal.GetLocalityID(addr).ToString()
164 if err != nil {
165
166 logger.Errorf("Failed to marshal LocalityID: %v, skipping this locality in weighted target")
167 }
168 ai, ok := getAddrInfo(addr)
169 if !ok {
170 return fmt.Errorf("xds_wrr_locality: missing locality weight information in address %q", addr)
171 }
172 weightedTargets[locality] = weightedtarget.Target{Weight: ai.LocalityWeight, ChildPolicy: lbCfg.ChildPolicy}
173 }
174 wtCfg := &weightedtarget.LBConfig{Targets: weightedTargets}
175 wtCfgJSON, err := json.Marshal(wtCfg)
176 if err != nil {
177
178 return fmt.Errorf("xds_wrr_locality: error marshalling prepared config: %v", wtCfg)
179 }
180 var sc serviceconfig.LoadBalancingConfig
181 if sc, err = b.childParser.ParseConfig(wtCfgJSON); err != nil {
182 return fmt.Errorf("xds_wrr_locality: config generated %v is invalid: %v", wtCfgJSON, err)
183 }
184
185 return b.child.UpdateClientConnState(balancer.ClientConnState{
186 ResolverState: s.ResolverState,
187 BalancerConfig: sc,
188 })
189 }
190
191 func (b *wrrLocalityBalancer) ResolverError(err error) {
192 b.child.ResolverError(err)
193 }
194
195 func (b *wrrLocalityBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
196 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
197 }
198
199 func (b *wrrLocalityBalancer) Close() {
200 b.child.Close()
201 }
202
View as plain text