...
1
18
19
20 package xds
21
22 import (
23 "encoding/json"
24 "fmt"
25 "sync"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/balancer/roundrobin"
29 "google.golang.org/grpc/internal/pretty"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/serviceconfig"
32 )
33
34 func init() {
35 balancer.Register(rpcBehaviorBB{})
36 }
37
38 const name = "test.RpcBehaviorLoadBalancer"
39
40 type rpcBehaviorBB struct{}
41
42 func (rpcBehaviorBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
43 b := &rpcBehaviorLB{
44 ClientConn: cc,
45 }
46
47
48 builder := balancer.Get(roundrobin.Name)
49 if builder == nil {
50
51
52 return nil
53 }
54 rr := builder.Build(b, bOpts)
55 if rr == nil {
56
57 return nil
58 }
59 b.Balancer = rr
60 return b
61 }
62
63 func (rpcBehaviorBB) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
64 lbCfg := &lbConfig{}
65 if err := json.Unmarshal(s, lbCfg); err != nil {
66 return nil, fmt.Errorf("rpc-behavior-lb: unable to marshal lbConfig: %s, error: %v", string(s), err)
67 }
68 return lbCfg, nil
69
70 }
71
72 func (rpcBehaviorBB) Name() string {
73 return name
74 }
75
76 type lbConfig struct {
77 serviceconfig.LoadBalancingConfig `json:"-"`
78 RPCBehavior string `json:"rpcBehavior,omitempty"`
79 }
80
81
82
83
84 type rpcBehaviorLB struct {
85
86 balancer.ClientConn
87
88 balancer.Balancer
89
90 mu sync.Mutex
91 cfg *lbConfig
92 }
93
94 func (b *rpcBehaviorLB) UpdateClientConnState(s balancer.ClientConnState) error {
95 lbCfg, ok := s.BalancerConfig.(*lbConfig)
96 if !ok {
97 return fmt.Errorf("test.RpcBehaviorLoadBalancer:received config with unexpected type %T: %s", s.BalancerConfig, pretty.ToJSON(s.BalancerConfig))
98 }
99 b.mu.Lock()
100 b.cfg = lbCfg
101 b.mu.Unlock()
102 return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
103 ResolverState: s.ResolverState,
104 })
105 }
106
107 func (b *rpcBehaviorLB) UpdateState(state balancer.State) {
108 b.mu.Lock()
109 rpcBehavior := b.cfg.RPCBehavior
110 b.mu.Unlock()
111
112 b.ClientConn.UpdateState(balancer.State{
113 ConnectivityState: state.ConnectivityState,
114 Picker: newRPCBehaviorPicker(state.Picker, rpcBehavior),
115 })
116 }
117
118
119
120 type rpcBehaviorPicker struct {
121 childPicker balancer.Picker
122 rpcBehavior string
123 }
124
125
126 func (p *rpcBehaviorPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
127 pr, err := p.childPicker.Pick(info)
128 if err != nil {
129 return balancer.PickResult{}, err
130 }
131 pr.Metadata = metadata.Join(pr.Metadata, metadata.Pairs("rpc-behavior", p.rpcBehavior))
132 return pr, nil
133 }
134
135 func newRPCBehaviorPicker(childPicker balancer.Picker, rpcBehavior string) *rpcBehaviorPicker {
136 return &rpcBehaviorPicker{
137 childPicker: childPicker,
138 rpcBehavior: rpcBehavior,
139 }
140 }
141
View as plain text