package server import ( "context" "fmt" "net/http" "time" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/lib/runtime/healthz" "edge-infra.dev/pkg/lib/runtime/manager" ) type PSQLInjector struct { cfg *Config sql *DBHandle } func New(cfg *Config) (*PSQLInjector, error) { return &PSQLInjector{cfg: cfg}, cfg.Validate() } // Run starts psqlinjector. // // This function returns nil when either the passed-in context was canceled, or the banners table changed. func (p *PSQLInjector) Run(ctx context.Context) error { log := fog.FromContext(ctx) ctx, cancel := context.WithCancel(ctx) defer cancel() if err := p.initSQLConn(ctx); err != nil { log.Error(err, "failed to initialize database connection") return err } defer p.sql.Close() // garbage collect outdated watched_field_objects before processing messages. if p.cfg.GarbageCollectDeletedWatchedFieldObjects { deletedCount, err := p.sql.GarbageCollectDeletedWatchedFieldObjects(ctx) if err != nil { // log the error and move on log.Error(err, "failed to garbage collect deleted watched field objects at startup") } else { log.Info("garbage collected deleted watched field objects", "deleted_count", deletedCount) } } mgr, err := p.createManager(ctx) if err != nil { log.Error(err, "failed to run manager") return err } log.Info("manager starting") go func() { defer cancel() if err := mgr.Start(ctx); err != nil { log.Error(err, "manager failed") return } log.Info("manager stopped") }() rm, err := p.receiverMux() if err != nil { log.Error(err, "failed to create receiver mux") return err } go func() { // TODO remove when using prometheus metrics for { select { case <-time.After(2*time.Minute + 13*time.Second): log.Info("health check", "health_check", rm.HealthCheck()) case <-ctx.Done(): return } } }() if err := rm.Run(ctx); err != nil { log.Error(err, "receiver mux quit") return err } log.Info("receiver mux stopped") return nil } func (p *PSQLInjector) createManager(ctx context.Context) (manager.Manager, error) { log := fog.FromContext(ctx) var mgrOpts = manager.Options{ MetricsBindAddress: p.cfg.MetricsAddr, HealthProbeBindAddress: p.cfg.HealthzAddr, BaseContext: func() context.Context { return ctx }, } mgr, err := manager.New(mgrOpts) if err != nil { log.Error(err, "failed to create new manager") return nil, fmt.Errorf("error creating new manager: %w", err) } if err := mgr.AddLivezCheck("alive", healthz.Ping); err != nil { log.Error(err, "failed to add livez check") return nil, fmt.Errorf("error adding livez check: %w", err) } if err := mgr.AddReadyzCheck("db", p.dbReadyz); err != nil { log.Error(err, "failed to add readyz check") return nil, fmt.Errorf("error adding readyz check: %w", err) } return mgr, nil } func (p *PSQLInjector) initSQLConn(ctx context.Context) error { var log = fog.FromContext(ctx) log.Info("initializing database connection") db, err := p.cfg.ConnectToDB() if err != nil { log.Error(err, "failed to create sql db connection") return err } p.sql = &DBHandle{DB: db} return db.PingContext(ctx) } func (p *PSQLInjector) dbReadyz(_ *http.Request) error { // readyz checks should take less than 5 seconds per k8s docs. var ctx, cancel = context.WithTimeout(context.Background(), 4*time.Second) defer cancel() return p.sql.PingContext(ctx) }