...
1 package agent
2
3 import (
4 "context"
5 "io"
6 "net"
7
8 "google.golang.org/grpc"
9
10 envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
11 "github.com/datawire/dlib/dhttp"
12 "github.com/datawire/dlib/dlog"
13 )
14
15 type StreamHandler func(ctx context.Context, in *envoyMetrics.StreamMetricsMessage)
16
17 type metricsServer struct {
18 envoyMetrics.MetricsServiceServer
19 handler StreamHandler
20 }
21
22
23 func NewMetricsServer(handler StreamHandler) *metricsServer {
24 return &metricsServer{
25 handler: handler,
26 }
27 }
28
29
30
31 func (s *metricsServer) Serve(ctx context.Context, listener net.Listener) error {
32 grpcServer := grpc.NewServer()
33 envoyMetrics.RegisterMetricsServiceServer(grpcServer, s)
34
35 sc := &dhttp.ServerConfig{
36 Handler: grpcServer,
37 }
38
39 return sc.Serve(ctx, listener)
40 }
41
42
43
44 func (s *metricsServer) StreamMetrics(stream envoyMetrics.MetricsService_StreamMetricsServer) error {
45 ctx := stream.Context()
46 dlog.Debug(ctx, "started stream")
47 for {
48 in, err := stream.Recv()
49 if err == io.EOF {
50 return nil
51 }
52 if err != nil {
53 return err
54 }
55 s.handler(ctx, in)
56 }
57 }
58
View as plain text