...
1
18
19 package interop
20
21 import (
22 "context"
23 "fmt"
24 "sync"
25 "time"
26
27 v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
28 "google.golang.org/grpc/balancer"
29 "google.golang.org/grpc/balancer/base"
30 "google.golang.org/grpc/connectivity"
31 "google.golang.org/grpc/orca"
32 )
33
34 func init() {
35 balancer.Register(orcabb{})
36 }
37
38 type orcabb struct{}
39
40 func (orcabb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
41 return &orcab{cc: cc}
42 }
43
44 func (orcabb) Name() string {
45 return "test_backend_metrics_load_balancer"
46 }
47
48 type orcab struct {
49 cc balancer.ClientConn
50 sc balancer.SubConn
51 cancelWatch func()
52
53 reportMu sync.Mutex
54 report *v3orcapb.OrcaLoadReport
55 }
56
57 func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
58 if o.sc != nil {
59 o.sc.UpdateAddresses(s.ResolverState.Addresses)
60 return nil
61 }
62
63 if len(s.ResolverState.Addresses) == 0 {
64 o.ResolverError(fmt.Errorf("produced no addresses"))
65 return fmt.Errorf("resolver produced no addresses")
66 }
67 var err error
68 o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{StateListener: o.updateSubConnState})
69 if err != nil {
70 o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
71 return nil
72 }
73 o.cancelWatch = orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
74 o.sc.Connect()
75 o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
76 return nil
77 }
78
79 func (o *orcab) ResolverError(err error) {
80 if o.sc == nil {
81 o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("resolver error: %v", err))})
82 }
83 }
84
85 func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
86 logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
87 }
88
89 func (o *orcab) updateSubConnState(state balancer.SubConnState) {
90 switch state.ConnectivityState {
91 case connectivity.Ready:
92 o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}})
93 case connectivity.TransientFailure:
94 o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))})
95 case connectivity.Connecting:
96
97 case connectivity.Idle:
98 o.sc.Connect()
99 o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
100 case connectivity.Shutdown:
101
102 }
103 }
104
105 func (o *orcab) Close() {
106 o.cancelWatch()
107 }
108
109 func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
110 o.reportMu.Lock()
111 defer o.reportMu.Unlock()
112 logger.Infof("received OOB load report: %v", r)
113 o.report = r
114 }
115
116 type orcaPicker struct {
117 o *orcab
118 }
119
120 func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
121 doneCB := func(di balancer.DoneInfo) {
122 if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
123 (lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
124
125
126
127 setContextCMR(info.Ctx, lr)
128 } else {
129 p.o.reportMu.Lock()
130 defer p.o.reportMu.Unlock()
131 if lr := p.o.report; lr != nil {
132 setContextCMR(info.Ctx, lr)
133 }
134 }
135 }
136 return balancer.PickResult{SubConn: p.o.sc, Done: doneCB}, nil
137 }
138
139 func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {
140 if r := orcaResultFromContext(ctx); r != nil {
141 *r = lr
142 }
143 }
144
145 type orcaKey string
146
147 var orcaCtxKey = orcaKey("orcaResult")
148
149
150
151
152
153
154
155
156 func contextWithORCAResult(ctx context.Context, result **v3orcapb.OrcaLoadReport) context.Context {
157 return context.WithValue(ctx, orcaCtxKey, result)
158 }
159
160
161
162
163 func orcaResultFromContext(ctx context.Context) **v3orcapb.OrcaLoadReport {
164 v := ctx.Value(orcaCtxKey)
165 if v == nil {
166 return nil
167 }
168 return v.(**v3orcapb.OrcaLoadReport)
169 }
170
View as plain text