...

Source file src/edge-infra.dev/pkg/edge/psqlinjector/ps_server.go

Documentation: edge-infra.dev/pkg/edge/psqlinjector

     1  package server
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net/http"
     7  	"time"
     8  
     9  	"edge-infra.dev/pkg/lib/fog"
    10  	"edge-infra.dev/pkg/lib/runtime/healthz"
    11  	"edge-infra.dev/pkg/lib/runtime/manager"
    12  )
    13  
    14  type PSQLInjector struct {
    15  	cfg *Config
    16  	sql *DBHandle
    17  }
    18  
    19  func New(cfg *Config) (*PSQLInjector, error) {
    20  	return &PSQLInjector{cfg: cfg}, cfg.Validate()
    21  }
    22  
    23  // Run starts psqlinjector.
    24  //
    25  // This function returns nil when either the passed-in context was canceled, or the banners table changed.
    26  func (p *PSQLInjector) Run(ctx context.Context) error {
    27  	log := fog.FromContext(ctx)
    28  	ctx, cancel := context.WithCancel(ctx)
    29  	defer cancel()
    30  
    31  	if err := p.initSQLConn(ctx); err != nil {
    32  		log.Error(err, "failed to initialize database connection")
    33  		return err
    34  	}
    35  	defer p.sql.Close()
    36  
    37  	// garbage collect outdated watched_field_objects before processing messages.
    38  	if p.cfg.GarbageCollectDeletedWatchedFieldObjects {
    39  		deletedCount, err := p.sql.GarbageCollectDeletedWatchedFieldObjects(ctx)
    40  		if err != nil {
    41  			// log the error and move on
    42  			log.Error(err, "failed to garbage collect deleted watched field objects at startup")
    43  		} else {
    44  			log.Info("garbage collected deleted watched field objects", "deleted_count", deletedCount)
    45  		}
    46  	}
    47  
    48  	mgr, err := p.createManager(ctx)
    49  	if err != nil {
    50  		log.Error(err, "failed to run manager")
    51  		return err
    52  	}
    53  	log.Info("manager starting")
    54  	go func() {
    55  		defer cancel()
    56  		if err := mgr.Start(ctx); err != nil {
    57  			log.Error(err, "manager failed")
    58  			return
    59  		}
    60  		log.Info("manager stopped")
    61  	}()
    62  
    63  	rm, err := p.receiverMux()
    64  	if err != nil {
    65  		log.Error(err, "failed to create receiver mux")
    66  		return err
    67  	}
    68  
    69  	go func() {
    70  		// TODO remove when using prometheus metrics
    71  		for {
    72  			select {
    73  			case <-time.After(2*time.Minute + 13*time.Second):
    74  				log.Info("health check", "health_check", rm.HealthCheck())
    75  			case <-ctx.Done():
    76  				return
    77  			}
    78  		}
    79  	}()
    80  
    81  	if err := rm.Run(ctx); err != nil {
    82  		log.Error(err, "receiver mux quit")
    83  		return err
    84  	}
    85  	log.Info("receiver mux stopped")
    86  	return nil
    87  }
    88  
    89  func (p *PSQLInjector) createManager(ctx context.Context) (manager.Manager, error) {
    90  	log := fog.FromContext(ctx)
    91  	var mgrOpts = manager.Options{
    92  		MetricsBindAddress:     p.cfg.MetricsAddr,
    93  		HealthProbeBindAddress: p.cfg.HealthzAddr,
    94  		BaseContext:            func() context.Context { return ctx },
    95  	}
    96  
    97  	mgr, err := manager.New(mgrOpts)
    98  	if err != nil {
    99  		log.Error(err, "failed to create new manager")
   100  		return nil, fmt.Errorf("error creating new manager: %w", err)
   101  	}
   102  
   103  	if err := mgr.AddLivezCheck("alive", healthz.Ping); err != nil {
   104  		log.Error(err, "failed to add livez check")
   105  		return nil, fmt.Errorf("error adding livez check: %w", err)
   106  	}
   107  
   108  	if err := mgr.AddReadyzCheck("db", p.dbReadyz); err != nil {
   109  		log.Error(err, "failed to add readyz check")
   110  		return nil, fmt.Errorf("error adding readyz check: %w", err)
   111  	}
   112  
   113  	return mgr, nil
   114  }
   115  
   116  func (p *PSQLInjector) initSQLConn(ctx context.Context) error {
   117  	var log = fog.FromContext(ctx)
   118  	log.Info("initializing database connection")
   119  
   120  	db, err := p.cfg.ConnectToDB()
   121  	if err != nil {
   122  		log.Error(err, "failed to create sql db connection")
   123  		return err
   124  	}
   125  	p.sql = &DBHandle{DB: db}
   126  	return db.PingContext(ctx)
   127  }
   128  
   129  func (p *PSQLInjector) dbReadyz(_ *http.Request) error {
   130  	// readyz checks should take less than 5 seconds per k8s docs.
   131  	var ctx, cancel = context.WithTimeout(context.Background(), 4*time.Second)
   132  	defer cancel()
   133  
   134  	return p.sql.PingContext(ctx)
   135  }
   136  

View as plain text