...
1
16
17 package orca
18
19 import (
20 "context"
21 "sync"
22 "time"
23
24 "google.golang.org/grpc"
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/internal/backoff"
28 "google.golang.org/grpc/internal/grpcsync"
29 "google.golang.org/grpc/orca/internal"
30 "google.golang.org/grpc/status"
31
32 v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
33 v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
34 v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
35 "google.golang.org/protobuf/types/known/durationpb"
36 )
37
38 type producerBuilder struct{}
39
40
41 func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
42 p := &producer{
43 client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
44 intervals: make(map[time.Duration]int),
45 listeners: make(map[OOBListener]struct{}),
46 backoff: internal.DefaultBackoffFunc,
47 }
48 return p, func() {
49 <-p.stopped
50 }
51 }
52
53 var producerBuilderSingleton = &producerBuilder{}
54
55
56 type OOBListener interface {
57
58 OnLoadReport(*v3orcapb.OrcaLoadReport)
59 }
60
61
62 type OOBListenerOptions struct {
63
64
65
66
67 ReportInterval time.Duration
68 }
69
70
71
72
73
74 func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
75 pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
76 p := pr.(*producer)
77
78 p.registerListener(l, opts.ReportInterval)
79
80
81
82
83
84
85 return grpcsync.OnceFunc(func() {
86 p.unregisterListener(l, opts.ReportInterval)
87 close()
88 })
89 }
90
91 type producer struct {
92 client v3orcaservicegrpc.OpenRcaServiceClient
93
94
95
96
97
98 backoff func(int) time.Duration
99
100 mu sync.Mutex
101 intervals map[time.Duration]int
102 listeners map[OOBListener]struct{}
103 minInterval time.Duration
104 stop func()
105 stopped chan struct{}
106 }
107
108
109
110 func (p *producer) registerListener(l OOBListener, interval time.Duration) {
111 p.mu.Lock()
112 defer p.mu.Unlock()
113
114 p.listeners[l] = struct{}{}
115 p.intervals[interval]++
116 if len(p.listeners) == 1 || interval < p.minInterval {
117 p.minInterval = interval
118 p.updateRunLocked()
119 }
120 }
121
122
123
124 func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
125 p.mu.Lock()
126 defer p.mu.Unlock()
127
128 delete(p.listeners, l)
129 p.intervals[interval]--
130 if p.intervals[interval] == 0 {
131 delete(p.intervals, interval)
132
133 if p.minInterval == interval {
134 p.recomputeMinInterval()
135 p.updateRunLocked()
136 }
137 }
138 }
139
140
141
142 func (p *producer) recomputeMinInterval() {
143 first := true
144 for interval := range p.intervals {
145 if first || interval < p.minInterval {
146 p.minInterval = interval
147 first = false
148 }
149 }
150 }
151
152
153
154
155
156 func (p *producer) updateRunLocked() {
157 if p.stop != nil {
158 p.stop()
159 p.stop = nil
160 }
161 if len(p.listeners) > 0 {
162 var ctx context.Context
163 ctx, p.stop = context.WithCancel(context.Background())
164 p.stopped = make(chan struct{})
165 go p.run(ctx, p.stopped, p.minInterval)
166 }
167 }
168
169
170 func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) {
171 defer close(done)
172
173 runStream := func() error {
174 resetBackoff, err := p.runStream(ctx, interval)
175 if status.Code(err) == codes.Unimplemented {
176
177 logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
178 return err
179 }
180
181 if code := status.Code(err); code != codes.Unavailable && code != codes.Canceled {
182
183
184
185
186
187
188 logger.Error("Received unexpected stream error:", err)
189 }
190 if resetBackoff {
191 return backoff.ErrResetBackoff
192 }
193 return nil
194 }
195 backoff.RunF(ctx, runStream, p.backoff)
196 }
197
198
199
200
201 func (p *producer) runStream(ctx context.Context, interval time.Duration) (resetBackoff bool, err error) {
202 streamCtx, cancel := context.WithCancel(ctx)
203 defer cancel()
204 stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{
205 ReportInterval: durationpb.New(interval),
206 })
207 if err != nil {
208 return false, err
209 }
210
211 for {
212 report, err := stream.Recv()
213 if err != nil {
214 return resetBackoff, err
215 }
216 resetBackoff = true
217 p.mu.Lock()
218 for l := range p.listeners {
219 l.OnLoadReport(report)
220 }
221 p.mu.Unlock()
222 }
223 }
224
View as plain text