...

Source file src/github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go

Documentation: github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck

     1  // Copyright 2021 Google LLC All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package healthcheck tests and communicates the health of the Cloud SQL Auth proxy.
    16  package healthcheck
    17  
    18  import (
    19  	"context"
    20  	"errors"
    21  	"net"
    22  	"net/http"
    23  	"sync"
    24  
    25  	"github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
    26  	"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
    27  )
    28  
    29  const (
    30  	startupPath   = "/startup"
    31  	livenessPath  = "/liveness"
    32  	readinessPath = "/readiness"
    33  )
    34  
    35  // Server is a type used to implement health checks for the proxy.
    36  type Server struct {
    37  	// started is used to indicate whether the proxy has finished starting up.
    38  	// If started is open, startup has not finished. If started is closed,
    39  	// startup is complete.
    40  	started chan struct{}
    41  	// once ensures that started can only be closed once.
    42  	once *sync.Once
    43  	// port designates the port number on which Server listens and serves.
    44  	port string
    45  	// srv is a pointer to the HTTP server used to communicate proxy health.
    46  	srv *http.Server
    47  	// instances is a list of all instances specified statically (e.g. as flags to the binary)
    48  	instances []string
    49  }
    50  
    51  // NewServer initializes a Server and exposes HTTP endpoints used to
    52  // communicate proxy health.
    53  func NewServer(c *proxy.Client, port string, staticInst []string) (*Server, error) {
    54  	mux := http.NewServeMux()
    55  
    56  	srv := &http.Server{
    57  		Addr:    ":" + port,
    58  		Handler: mux,
    59  	}
    60  
    61  	hcServer := &Server{
    62  		started:   make(chan struct{}),
    63  		once:      &sync.Once{},
    64  		port:      port,
    65  		srv:       srv,
    66  		instances: staticInst,
    67  	}
    68  
    69  	mux.HandleFunc(startupPath, func(w http.ResponseWriter, _ *http.Request) {
    70  		if !hcServer.proxyStarted() {
    71  			w.WriteHeader(http.StatusServiceUnavailable)
    72  			w.Write([]byte("error"))
    73  			return
    74  		}
    75  		w.WriteHeader(http.StatusOK)
    76  		w.Write([]byte("ok"))
    77  	})
    78  
    79  	mux.HandleFunc(readinessPath, func(w http.ResponseWriter, _ *http.Request) {
    80  		ctx, cancel := context.WithCancel(context.Background())
    81  		defer cancel()
    82  		if !isReady(ctx, c, hcServer) {
    83  			w.WriteHeader(http.StatusServiceUnavailable)
    84  			w.Write([]byte("error"))
    85  			return
    86  		}
    87  		w.WriteHeader(http.StatusOK)
    88  		w.Write([]byte("ok"))
    89  	})
    90  
    91  	mux.HandleFunc(livenessPath, func(w http.ResponseWriter, _ *http.Request) {
    92  		if !isLive(c) {
    93  			w.WriteHeader(http.StatusServiceUnavailable)
    94  			w.Write([]byte("error"))
    95  			return
    96  		}
    97  		w.WriteHeader(http.StatusOK)
    98  		w.Write([]byte("ok"))
    99  	})
   100  
   101  	ln, err := net.Listen("tcp", srv.Addr)
   102  	if err != nil {
   103  		return nil, err
   104  	}
   105  
   106  	go func() {
   107  		if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
   108  			logging.Errorf("[Health Check] Failed to serve: %v", err)
   109  		}
   110  	}()
   111  
   112  	return hcServer, nil
   113  }
   114  
   115  // Close gracefully shuts down the HTTP server belonging to the Server.
   116  func (s *Server) Close(ctx context.Context) error {
   117  	return s.srv.Shutdown(ctx)
   118  }
   119  
   120  // NotifyStarted tells the Server that the proxy has finished startup.
   121  func (s *Server) NotifyStarted() {
   122  	s.once.Do(func() { close(s.started) })
   123  }
   124  
   125  // proxyStarted returns true if started is closed, false otherwise.
   126  func (s *Server) proxyStarted() bool {
   127  	select {
   128  	case <-s.started:
   129  		return true
   130  	default:
   131  		return false
   132  	}
   133  }
   134  
   135  // isLive returns true as long as the proxy Client has all valid connections.
   136  func isLive(c *proxy.Client) bool {
   137  	invalid := c.InvalidInstances()
   138  	alive := len(invalid) == 0
   139  	if !alive {
   140  		for _, err := range invalid {
   141  			logging.Errorf("[Health Check] Liveness failed: %v", err)
   142  		}
   143  	}
   144  	return alive
   145  }
   146  
   147  // isReady will check the following criteria:
   148  // 1. Finished starting up / been sent the 'Ready for Connections' log.
   149  // 2. Not yet hit the MaxConnections limit, if set.
   150  // 3. Able to dial all specified instances without error.
   151  func isReady(ctx context.Context, c *proxy.Client, s *Server) bool {
   152  	// Not ready until we reach the 'Ready for Connections' log.
   153  	if !s.proxyStarted() {
   154  		logging.Errorf("[Health Check] Readiness failed because proxy has not finished starting up.")
   155  		return false
   156  	}
   157  
   158  	// Not ready if the proxy is at the optional MaxConnections limit.
   159  	if !c.AvailableConn() {
   160  		logging.Errorf("[Health Check] Readiness failed because proxy has reached the maximum connections limit (%v).", c.MaxConnections)
   161  		return false
   162  	}
   163  
   164  	// Not ready if one or more instances cannot be dialed.
   165  	instances := s.instances
   166  	if s.instances == nil { // Proxy is in fuse mode.
   167  		instances = c.GetInstances()
   168  	}
   169  
   170  	canDial := true
   171  	var once sync.Once
   172  	var wg sync.WaitGroup
   173  
   174  	for _, inst := range instances {
   175  		wg.Add(1)
   176  		go func(inst string) {
   177  			defer wg.Done()
   178  			conn, err := c.DialContext(ctx, inst)
   179  			if err != nil {
   180  				logging.Errorf("[Health Check] Readiness failed because proxy couldn't connect to %q: %v", inst, err)
   181  				once.Do(func() { canDial = false })
   182  				return
   183  			}
   184  
   185  			err = conn.Close()
   186  			if err != nil {
   187  				logging.Errorf("[Health Check] Readiness: error while closing connection: %v", err)
   188  			}
   189  		}(inst)
   190  	}
   191  	wg.Wait()
   192  
   193  	return canDial
   194  }
   195  

View as plain text