1
18
19
20
21
22
23 package converter
24
25 import (
26 "encoding/json"
27 "fmt"
28 "strings"
29
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/balancer/leastrequest"
33 "google.golang.org/grpc/balancer/roundrobin"
34 "google.golang.org/grpc/balancer/weightedroundrobin"
35 "google.golang.org/grpc/internal/envconfig"
36 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
37 "google.golang.org/grpc/xds/internal/balancer/ringhash"
38 "google.golang.org/grpc/xds/internal/balancer/wrrlocality"
39 "google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry"
40 "google.golang.org/protobuf/proto"
41 "google.golang.org/protobuf/types/known/structpb"
42
43 v1xdsudpatypepb "github.com/cncf/xds/go/udpa/type/v1"
44 v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
45 v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
46 v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
47 v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3"
48 v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
49 v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
50 )
51
52 func init() {
53 xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin", convertWeightedRoundRobinProtoToServiceConfig)
54 xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash", convertRingHashProtoToServiceConfig)
55 xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst", convertPickFirstProtoToServiceConfig)
56 xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin", convertRoundRobinProtoToServiceConfig)
57 xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality", convertWRRLocalityProtoToServiceConfig)
58 xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest", convertLeastRequestProtoToServiceConfig)
59 xdslbregistry.Register("type.googleapis.com/udpa.type.v1.TypedStruct", convertV1TypedStructToServiceConfig)
60 xdslbregistry.Register("type.googleapis.com/xds.type.v3.TypedStruct", convertV3TypedStructToServiceConfig)
61 }
62
63 const (
64 defaultRingHashMinSize = 1024
65 defaultRingHashMaxSize = 8 * 1024 * 1024
66 defaultLeastRequestChoiceCount = 2
67 )
68
69 func convertRingHashProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
70 rhProto := &v3ringhashpb.RingHash{}
71 if err := proto.Unmarshal(rawProto, rhProto); err != nil {
72 return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
73 }
74 if rhProto.GetHashFunction() != v3ringhashpb.RingHash_XX_HASH {
75 return nil, fmt.Errorf("unsupported ring_hash hash function %v", rhProto.GetHashFunction())
76 }
77
78 var minSize, maxSize uint64 = defaultRingHashMinSize, defaultRingHashMaxSize
79 if min := rhProto.GetMinimumRingSize(); min != nil {
80 minSize = min.GetValue()
81 }
82 if max := rhProto.GetMaximumRingSize(); max != nil {
83 maxSize = max.GetValue()
84 }
85
86 rhCfg := &ringhash.LBConfig{
87 MinRingSize: minSize,
88 MaxRingSize: maxSize,
89 }
90
91 rhCfgJSON, err := json.Marshal(rhCfg)
92 if err != nil {
93 return nil, fmt.Errorf("error marshaling JSON for type %T: %v", rhCfg, err)
94 }
95 return makeBalancerConfigJSON(ringhash.Name, rhCfgJSON), nil
96 }
97
98 type pfConfig struct {
99 ShuffleAddressList bool `json:"shuffleAddressList"`
100 }
101
102 func convertPickFirstProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
103 pfProto := &v3pickfirstpb.PickFirst{}
104 if err := proto.Unmarshal(rawProto, pfProto); err != nil {
105 return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
106 }
107
108 pfCfg := &pfConfig{ShuffleAddressList: pfProto.GetShuffleAddressList()}
109 js, err := json.Marshal(pfCfg)
110 if err != nil {
111 return nil, fmt.Errorf("error marshaling JSON for type %T: %v", pfCfg, err)
112 }
113 return makeBalancerConfigJSON(grpc.PickFirstBalancerName, js), nil
114 }
115
116 func convertRoundRobinProtoToServiceConfig([]byte, int) (json.RawMessage, error) {
117 return makeBalancerConfigJSON(roundrobin.Name, json.RawMessage("{}")), nil
118 }
119
120 type wrrLocalityLBConfig struct {
121 ChildPolicy json.RawMessage `json:"childPolicy,omitempty"`
122 }
123
124 func convertWRRLocalityProtoToServiceConfig(rawProto []byte, depth int) (json.RawMessage, error) {
125 wrrlProto := &v3wrrlocalitypb.WrrLocality{}
126 if err := proto.Unmarshal(rawProto, wrrlProto); err != nil {
127 return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
128 }
129 epJSON, err := xdslbregistry.ConvertToServiceConfig(wrrlProto.GetEndpointPickingPolicy(), depth+1)
130 if err != nil {
131 return nil, fmt.Errorf("error converting endpoint picking policy: %v for %+v", err, wrrlProto)
132 }
133 wrrLCfg := wrrLocalityLBConfig{
134 ChildPolicy: epJSON,
135 }
136
137 lbCfgJSON, err := json.Marshal(wrrLCfg)
138 if err != nil {
139 return nil, fmt.Errorf("error marshaling JSON for type %T: %v", wrrLCfg, err)
140 }
141 return makeBalancerConfigJSON(wrrlocality.Name, lbCfgJSON), nil
142 }
143
144 func convertWeightedRoundRobinProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
145 cswrrProto := &v3clientsideweightedroundrobinpb.ClientSideWeightedRoundRobin{}
146 if err := proto.Unmarshal(rawProto, cswrrProto); err != nil {
147 return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
148 }
149 wrrLBCfg := &wrrLBConfig{}
150
151
152 if enableOOBLoadReportCfg := cswrrProto.GetEnableOobLoadReport(); enableOOBLoadReportCfg != nil {
153 wrrLBCfg.EnableOOBLoadReport = enableOOBLoadReportCfg.GetValue()
154 }
155 if oobReportingPeriodCfg := cswrrProto.GetOobReportingPeriod(); oobReportingPeriodCfg != nil {
156 wrrLBCfg.OOBReportingPeriod = internalserviceconfig.Duration(oobReportingPeriodCfg.AsDuration())
157 }
158 if blackoutPeriodCfg := cswrrProto.GetBlackoutPeriod(); blackoutPeriodCfg != nil {
159 wrrLBCfg.BlackoutPeriod = internalserviceconfig.Duration(blackoutPeriodCfg.AsDuration())
160 }
161 if weightExpirationPeriodCfg := cswrrProto.GetBlackoutPeriod(); weightExpirationPeriodCfg != nil {
162 wrrLBCfg.WeightExpirationPeriod = internalserviceconfig.Duration(weightExpirationPeriodCfg.AsDuration())
163 }
164 if weightUpdatePeriodCfg := cswrrProto.GetWeightUpdatePeriod(); weightUpdatePeriodCfg != nil {
165 wrrLBCfg.WeightUpdatePeriod = internalserviceconfig.Duration(weightUpdatePeriodCfg.AsDuration())
166 }
167 if errorUtilizationPenaltyCfg := cswrrProto.GetErrorUtilizationPenalty(); errorUtilizationPenaltyCfg != nil {
168 wrrLBCfg.ErrorUtilizationPenalty = float64(errorUtilizationPenaltyCfg.GetValue())
169 }
170
171 lbCfgJSON, err := json.Marshal(wrrLBCfg)
172 if err != nil {
173 return nil, fmt.Errorf("error marshaling JSON for type %T: %v", wrrLBCfg, err)
174 }
175 return makeBalancerConfigJSON(weightedroundrobin.Name, lbCfgJSON), nil
176 }
177
178 func convertLeastRequestProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
179 if !envconfig.LeastRequestLB {
180 return nil, nil
181 }
182 lrProto := &v3leastrequestpb.LeastRequest{}
183 if err := proto.Unmarshal(rawProto, lrProto); err != nil {
184 return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
185 }
186
187
188
189 choiceCount := uint32(defaultLeastRequestChoiceCount)
190 if cc := lrProto.GetChoiceCount(); cc != nil {
191 choiceCount = cc.GetValue()
192 }
193 lrCfg := &leastrequest.LBConfig{ChoiceCount: choiceCount}
194 js, err := json.Marshal(lrCfg)
195 if err != nil {
196 return nil, fmt.Errorf("error marshaling JSON for type %T: %v", lrCfg, err)
197 }
198 return makeBalancerConfigJSON(leastrequest.Name, js), nil
199 }
200
201 func convertV1TypedStructToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
202 tsProto := &v1xdsudpatypepb.TypedStruct{}
203 if err := proto.Unmarshal(rawProto, tsProto); err != nil {
204 return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
205 }
206 return convertCustomPolicy(tsProto.GetTypeUrl(), tsProto.GetValue())
207 }
208
209 func convertV3TypedStructToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
210 tsProto := &v3xdsxdstypepb.TypedStruct{}
211 if err := proto.Unmarshal(rawProto, tsProto); err != nil {
212 return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
213 }
214 return convertCustomPolicy(tsProto.GetTypeUrl(), tsProto.GetValue())
215 }
216
217
218
219
220
221
222
223 func convertCustomPolicy(typeURL string, s *structpb.Struct) (json.RawMessage, error) {
224
225
226
227 pos := strings.LastIndex(typeURL, "/")
228 name := typeURL[pos+1:]
229
230 if balancer.Get(name) == nil {
231 return nil, nil
232 }
233
234 rawJSON, err := json.Marshal(s)
235 if err != nil {
236 return nil, fmt.Errorf("error converting custom lb policy %v: %v for %+v", err, typeURL, s)
237 }
238
239
240
241 return makeBalancerConfigJSON(name, rawJSON), nil
242 }
243
244 type wrrLBConfig struct {
245 EnableOOBLoadReport bool `json:"enableOobLoadReport,omitempty"`
246 OOBReportingPeriod internalserviceconfig.Duration `json:"oobReportingPeriod,omitempty"`
247 BlackoutPeriod internalserviceconfig.Duration `json:"blackoutPeriod,omitempty"`
248 WeightExpirationPeriod internalserviceconfig.Duration `json:"weightExpirationPeriod,omitempty"`
249 WeightUpdatePeriod internalserviceconfig.Duration `json:"weightUpdatePeriod,omitempty"`
250 ErrorUtilizationPenalty float64 `json:"errorUtilizationPenalty,omitempty"`
251 }
252
253 func makeBalancerConfigJSON(name string, value json.RawMessage) []byte {
254 return []byte(fmt.Sprintf(`[{%q: %s}]`, name, value))
255 }
256
View as plain text