...
1
18
19
20
21
22
23
24
25
26
27
28 package service
29
30 import (
31 "context"
32 "errors"
33 "sync"
34
35 "google.golang.org/grpc"
36 "google.golang.org/grpc/grpclog"
37 "google.golang.org/grpc/internal/profiling"
38 ppb "google.golang.org/grpc/profiling/proto"
39 )
40
41 var logger = grpclog.Component("profiling")
42
43
44 type ProfilingConfig struct {
45
46 Enabled bool
47
48
49
50
51
52
53
54 StreamStatsSize uint32
55
56
57
58 Server *grpc.Server
59 }
60
61 var errorNilServer = errors.New("profiling: no grpc.Server provided")
62
63
64
65
66 func Init(pc *ProfilingConfig) error {
67 if pc.Server == nil {
68 return errorNilServer
69 }
70
71 if err := profiling.InitStats(pc.StreamStatsSize); err != nil {
72 return err
73 }
74
75 ppb.RegisterProfilingServer(pc.Server, getProfilingServerInstance())
76
77
78 profiling.Enable(pc.Enabled)
79
80 return nil
81 }
82
83 type profilingServer struct {
84 ppb.UnimplementedProfilingServer
85 drainMutex sync.Mutex
86 }
87
88 var profilingServerInstance *profilingServer
89 var profilingServerOnce sync.Once
90
91
92
93
94 func getProfilingServerInstance() *profilingServer {
95 profilingServerOnce.Do(func() {
96 profilingServerInstance = &profilingServer{}
97 })
98
99 return profilingServerInstance
100 }
101
102 func (s *profilingServer) Enable(ctx context.Context, req *ppb.EnableRequest) (*ppb.EnableResponse, error) {
103 if req.Enabled {
104 logger.Infof("profilingServer: Enable: enabling profiling")
105 } else {
106 logger.Infof("profilingServer: Enable: disabling profiling")
107 }
108 profiling.Enable(req.Enabled)
109
110 return &ppb.EnableResponse{}, nil
111 }
112
113 func timerToProtoTimer(timer *profiling.Timer) *ppb.Timer {
114 return &ppb.Timer{
115 Tags: timer.Tags,
116 BeginSec: timer.Begin.Unix(),
117 BeginNsec: int32(timer.Begin.Nanosecond()),
118 EndSec: timer.End.Unix(),
119 EndNsec: int32(timer.End.Nanosecond()),
120 GoId: timer.GoID,
121 }
122 }
123
124 func statToProtoStat(stat *profiling.Stat) *ppb.Stat {
125 protoStat := &ppb.Stat{
126 Tags: stat.Tags,
127 Timers: make([]*ppb.Timer, 0, len(stat.Timers)),
128 Metadata: stat.Metadata,
129 }
130 for _, t := range stat.Timers {
131 protoStat.Timers = append(protoStat.Timers, timerToProtoTimer(t))
132 }
133 return protoStat
134 }
135
136 func (s *profilingServer) GetStreamStats(ctx context.Context, req *ppb.GetStreamStatsRequest) (*ppb.GetStreamStatsResponse, error) {
137
138
139 logger.Infof("profilingServer: GetStreamStats: processing request")
140 s.drainMutex.Lock()
141 results := profiling.StreamStats.Drain()
142 s.drainMutex.Unlock()
143
144 logger.Infof("profilingServer: GetStreamStats: returning %v records", len(results))
145 streamStats := make([]*ppb.Stat, 0)
146 for _, stat := range results {
147 streamStats = append(streamStats, statToProtoStat(stat.(*profiling.Stat)))
148 }
149 return &ppb.GetStreamStatsResponse{StreamStats: streamStats}, nil
150 }
151
View as plain text