...
1
18
19
20
21 package health
22
23 import (
24 "context"
25 "sync"
26
27 "google.golang.org/grpc/codes"
28 healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
29 healthpb "google.golang.org/grpc/health/grpc_health_v1"
30 "google.golang.org/grpc/status"
31 )
32
33
34 type Server struct {
35 healthgrpc.UnimplementedHealthServer
36 mu sync.RWMutex
37
38
39 shutdown bool
40
41 statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
42 updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
43 }
44
45
46 func NewServer() *Server {
47 return &Server{
48 statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
49 updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
50 }
51 }
52
53
54 func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
55 s.mu.RLock()
56 defer s.mu.RUnlock()
57 if servingStatus, ok := s.statusMap[in.Service]; ok {
58 return &healthpb.HealthCheckResponse{
59 Status: servingStatus,
60 }, nil
61 }
62 return nil, status.Error(codes.NotFound, "unknown service")
63 }
64
65
66 func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
67 service := in.Service
68
69 update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
70 s.mu.Lock()
71
72 if servingStatus, ok := s.statusMap[service]; ok {
73 update <- servingStatus
74 } else {
75 update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
76 }
77
78
79 if _, ok := s.updates[service]; !ok {
80 s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
81 }
82 s.updates[service][stream] = update
83 defer func() {
84 s.mu.Lock()
85 delete(s.updates[service], stream)
86 s.mu.Unlock()
87 }()
88 s.mu.Unlock()
89
90 var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
91 for {
92 select {
93
94 case servingStatus := <-update:
95 if lastSentStatus == servingStatus {
96 continue
97 }
98 lastSentStatus = servingStatus
99 err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
100 if err != nil {
101 return status.Error(codes.Canceled, "Stream has ended.")
102 }
103
104 case <-stream.Context().Done():
105 return status.Error(codes.Canceled, "Stream has ended.")
106 }
107 }
108 }
109
110
111
112 func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
113 s.mu.Lock()
114 defer s.mu.Unlock()
115 if s.shutdown {
116 logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus)
117 return
118 }
119
120 s.setServingStatusLocked(service, servingStatus)
121 }
122
123 func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
124 s.statusMap[service] = servingStatus
125 for _, update := range s.updates[service] {
126
127
128 select {
129 case <-update:
130 default:
131 }
132
133 update <- servingStatus
134 }
135 }
136
137
138
139
140
141
142 func (s *Server) Shutdown() {
143 s.mu.Lock()
144 defer s.mu.Unlock()
145 s.shutdown = true
146 for service := range s.statusMap {
147 s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
148 }
149 }
150
151
152
153
154
155
156 func (s *Server) Resume() {
157 s.mu.Lock()
158 defer s.mu.Unlock()
159 s.shutdown = false
160 for service := range s.statusMap {
161 s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING)
162 }
163 }
164
View as plain text