...
1 package server
2
3 import (
4 "context"
5 "fmt"
6 "net/http"
7 "time"
8
9 "edge-infra.dev/pkg/lib/fog"
10 "edge-infra.dev/pkg/lib/runtime/healthz"
11 "edge-infra.dev/pkg/lib/runtime/manager"
12 )
13
14 type PSQLInjector struct {
15 cfg *Config
16 sql *DBHandle
17 }
18
19 func New(cfg *Config) (*PSQLInjector, error) {
20 return &PSQLInjector{cfg: cfg}, cfg.Validate()
21 }
22
23
24
25
26 func (p *PSQLInjector) Run(ctx context.Context) error {
27 log := fog.FromContext(ctx)
28 ctx, cancel := context.WithCancel(ctx)
29 defer cancel()
30
31 if err := p.initSQLConn(ctx); err != nil {
32 log.Error(err, "failed to initialize database connection")
33 return err
34 }
35 defer p.sql.Close()
36
37
38 if p.cfg.GarbageCollectDeletedWatchedFieldObjects {
39 deletedCount, err := p.sql.GarbageCollectDeletedWatchedFieldObjects(ctx)
40 if err != nil {
41
42 log.Error(err, "failed to garbage collect deleted watched field objects at startup")
43 } else {
44 log.Info("garbage collected deleted watched field objects", "deleted_count", deletedCount)
45 }
46 }
47
48 mgr, err := p.createManager(ctx)
49 if err != nil {
50 log.Error(err, "failed to run manager")
51 return err
52 }
53 log.Info("manager starting")
54 go func() {
55 defer cancel()
56 if err := mgr.Start(ctx); err != nil {
57 log.Error(err, "manager failed")
58 return
59 }
60 log.Info("manager stopped")
61 }()
62
63 rm, err := p.receiverMux()
64 if err != nil {
65 log.Error(err, "failed to create receiver mux")
66 return err
67 }
68
69 go func() {
70
71 for {
72 select {
73 case <-time.After(2*time.Minute + 13*time.Second):
74 log.Info("health check", "health_check", rm.HealthCheck())
75 case <-ctx.Done():
76 return
77 }
78 }
79 }()
80
81 if err := rm.Run(ctx); err != nil {
82 log.Error(err, "receiver mux quit")
83 return err
84 }
85 log.Info("receiver mux stopped")
86 return nil
87 }
88
89 func (p *PSQLInjector) createManager(ctx context.Context) (manager.Manager, error) {
90 log := fog.FromContext(ctx)
91 var mgrOpts = manager.Options{
92 MetricsBindAddress: p.cfg.MetricsAddr,
93 HealthProbeBindAddress: p.cfg.HealthzAddr,
94 BaseContext: func() context.Context { return ctx },
95 }
96
97 mgr, err := manager.New(mgrOpts)
98 if err != nil {
99 log.Error(err, "failed to create new manager")
100 return nil, fmt.Errorf("error creating new manager: %w", err)
101 }
102
103 if err := mgr.AddLivezCheck("alive", healthz.Ping); err != nil {
104 log.Error(err, "failed to add livez check")
105 return nil, fmt.Errorf("error adding livez check: %w", err)
106 }
107
108 if err := mgr.AddReadyzCheck("db", p.dbReadyz); err != nil {
109 log.Error(err, "failed to add readyz check")
110 return nil, fmt.Errorf("error adding readyz check: %w", err)
111 }
112
113 return mgr, nil
114 }
115
116 func (p *PSQLInjector) initSQLConn(ctx context.Context) error {
117 var log = fog.FromContext(ctx)
118 log.Info("initializing database connection")
119
120 db, err := p.cfg.ConnectToDB()
121 if err != nil {
122 log.Error(err, "failed to create sql db connection")
123 return err
124 }
125 p.sql = &DBHandle{DB: db}
126 return db.PingContext(ctx)
127 }
128
129 func (p *PSQLInjector) dbReadyz(_ *http.Request) error {
130
131 var ctx, cancel = context.WithTimeout(context.Background(), 4*time.Second)
132 defer cancel()
133
134 return p.sql.PingContext(ctx)
135 }
136
View as plain text