...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package healthcheck
17
18 import (
19 "context"
20 "errors"
21 "net"
22 "net/http"
23 "sync"
24
25 "github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
26 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
27 )
28
29 const (
30 startupPath = "/startup"
31 livenessPath = "/liveness"
32 readinessPath = "/readiness"
33 )
34
35
36 type Server struct {
37
38
39
40 started chan struct{}
41
42 once *sync.Once
43
44 port string
45
46 srv *http.Server
47
48 instances []string
49 }
50
51
52
53 func NewServer(c *proxy.Client, port string, staticInst []string) (*Server, error) {
54 mux := http.NewServeMux()
55
56 srv := &http.Server{
57 Addr: ":" + port,
58 Handler: mux,
59 }
60
61 hcServer := &Server{
62 started: make(chan struct{}),
63 once: &sync.Once{},
64 port: port,
65 srv: srv,
66 instances: staticInst,
67 }
68
69 mux.HandleFunc(startupPath, func(w http.ResponseWriter, _ *http.Request) {
70 if !hcServer.proxyStarted() {
71 w.WriteHeader(http.StatusServiceUnavailable)
72 w.Write([]byte("error"))
73 return
74 }
75 w.WriteHeader(http.StatusOK)
76 w.Write([]byte("ok"))
77 })
78
79 mux.HandleFunc(readinessPath, func(w http.ResponseWriter, _ *http.Request) {
80 ctx, cancel := context.WithCancel(context.Background())
81 defer cancel()
82 if !isReady(ctx, c, hcServer) {
83 w.WriteHeader(http.StatusServiceUnavailable)
84 w.Write([]byte("error"))
85 return
86 }
87 w.WriteHeader(http.StatusOK)
88 w.Write([]byte("ok"))
89 })
90
91 mux.HandleFunc(livenessPath, func(w http.ResponseWriter, _ *http.Request) {
92 if !isLive(c) {
93 w.WriteHeader(http.StatusServiceUnavailable)
94 w.Write([]byte("error"))
95 return
96 }
97 w.WriteHeader(http.StatusOK)
98 w.Write([]byte("ok"))
99 })
100
101 ln, err := net.Listen("tcp", srv.Addr)
102 if err != nil {
103 return nil, err
104 }
105
106 go func() {
107 if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
108 logging.Errorf("[Health Check] Failed to serve: %v", err)
109 }
110 }()
111
112 return hcServer, nil
113 }
114
115
116 func (s *Server) Close(ctx context.Context) error {
117 return s.srv.Shutdown(ctx)
118 }
119
120
121 func (s *Server) NotifyStarted() {
122 s.once.Do(func() { close(s.started) })
123 }
124
125
126 func (s *Server) proxyStarted() bool {
127 select {
128 case <-s.started:
129 return true
130 default:
131 return false
132 }
133 }
134
135
136 func isLive(c *proxy.Client) bool {
137 invalid := c.InvalidInstances()
138 alive := len(invalid) == 0
139 if !alive {
140 for _, err := range invalid {
141 logging.Errorf("[Health Check] Liveness failed: %v", err)
142 }
143 }
144 return alive
145 }
146
147
148
149
150
151 func isReady(ctx context.Context, c *proxy.Client, s *Server) bool {
152
153 if !s.proxyStarted() {
154 logging.Errorf("[Health Check] Readiness failed because proxy has not finished starting up.")
155 return false
156 }
157
158
159 if !c.AvailableConn() {
160 logging.Errorf("[Health Check] Readiness failed because proxy has reached the maximum connections limit (%v).", c.MaxConnections)
161 return false
162 }
163
164
165 instances := s.instances
166 if s.instances == nil {
167 instances = c.GetInstances()
168 }
169
170 canDial := true
171 var once sync.Once
172 var wg sync.WaitGroup
173
174 for _, inst := range instances {
175 wg.Add(1)
176 go func(inst string) {
177 defer wg.Done()
178 conn, err := c.DialContext(ctx, inst)
179 if err != nil {
180 logging.Errorf("[Health Check] Readiness failed because proxy couldn't connect to %q: %v", inst, err)
181 once.Do(func() { canDial = false })
182 return
183 }
184
185 err = conn.Close()
186 if err != nil {
187 logging.Errorf("[Health Check] Readiness: error while closing connection: %v", err)
188 }
189 }(inst)
190 }
191 wg.Wait()
192
193 return canDial
194 }
195
View as plain text