1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package ntpmonitor
17
18 import (
19 "errors"
20 "fmt"
21 "math/rand"
22 "sync/atomic"
23 "time"
24
25 "github.com/beevik/ntp"
26
27 pkgapi "github.com/sigstore/timestamp-authority/pkg/api"
28 "github.com/sigstore/timestamp-authority/pkg/log"
29 )
30
31 var (
32
33
34 ErrInvTime = errors.New("local time differs from observed")
35
36
37 ErrTooFewServers = errors.New("too few ntp servers configured")
38
39
40 ErrNoResponse = errors.New("no ntp response")
41
42 ErrThreshold = errors.New("no valid server threshold set")
43
44
45 ErrDeltaTooSmall = errors.New("delta is too small")
46 )
47
48 type serverResponses struct {
49 tooFewServerResponses bool
50 tooManyInvalidResponses bool
51 }
52
53 type NTPClient interface {
54 QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error)
55 }
56
57 type LiveNTPClient struct{}
58
59 func (c LiveNTPClient) QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error) {
60 return ntp.QueryWithOptions(srv, opts)
61 }
62
63
64 type NTPMonitor struct {
65 cfg *Config
66 run atomic.Bool
67 ntpClient NTPClient
68 }
69
70
71
72 func New(configFile string) (*NTPMonitor, error) {
73 cfg, err := LoadConfig(configFile)
74 if err != nil {
75 return nil, err
76 }
77 return NewFromConfig(cfg)
78 }
79
80
81 func NewFromConfig(cfg *Config) (*NTPMonitor, error) {
82
83 liveNTPClient := LiveNTPClient{}
84 return NewFromConfigWithClient(cfg, liveNTPClient)
85 }
86
87 func NewFromConfigWithClient(cfg *Config, client NTPClient) (*NTPMonitor, error) {
88 if len(cfg.Servers) == 0 || len(cfg.Servers) < cfg.NumServers {
89 return nil, ErrTooFewServers
90 }
91
92 if cfg.ServerThreshold < 1 {
93 return nil, ErrThreshold
94 }
95
96 if cfg.ServerThreshold > cfg.NumServers {
97 return nil, ErrTooFewServers
98 }
99
100 if cfg.RequestTimeout < 1 || cfg.MaxTimeDelta < cfg.RequestTimeout {
101 return nil, ErrDeltaTooSmall
102 }
103
104 return &NTPMonitor{cfg: cfg, ntpClient: client}, nil
105 }
106
107 func (n *NTPMonitor) queryServers(delta time.Duration, servers []string) serverResponses {
108 validResponses := 0
109 noResponse := 0
110 for _, srv := range servers {
111
112
113
114
115 resp, err := n.queryNTPServer(srv)
116 if err != nil {
117 log.Logger.Errorf("ntp response timeout from %s",
118 srv)
119 noResponse++
120 continue
121 }
122
123
124
125
126
127
128
129 if resp.ClockOffset.Abs() > delta {
130 log.Logger.Warnf("local time is different from %s: %s",
131 srv, resp.Time)
132 } else {
133 validResponses++
134 }
135 }
136
137
138 return serverResponses{
139 tooFewServerResponses: n.cfg.ServerThreshold > n.cfg.NumServers-noResponse,
140 tooManyInvalidResponses: n.cfg.ServerThreshold > validResponses,
141 }
142 }
143
144
145 func (n *NTPMonitor) Start() {
146 n.run.Store(true)
147
148 if n.cfg.RequestTimeout < 1 {
149 log.Logger.Warnf("NTP request timeout not set, default to 1s")
150 n.cfg.RequestTimeout = 1
151 }
152
153 delta := time.Duration(n.cfg.MaxTimeDelta) * time.Second
154 log.Logger.Info("ntp monitoring starting")
155
156
157 r := rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
158
159 for n.run.Load() {
160 servers := RandomChoice(n.cfg.Servers, n.cfg.NumServers, r)
161 responses := n.queryServers(delta, servers)
162
163
164 if responses.tooFewServerResponses {
165 pkgapi.MetricNTPErrorCount.With(map[string]string{
166 "reason": "err_too_few",
167 }).Inc()
168 }
169 if responses.tooManyInvalidResponses {
170 pkgapi.MetricNTPErrorCount.With(map[string]string{
171 "reason": "err_inv_time",
172 }).Inc()
173 }
174
175
176 time.Sleep(time.Duration(n.cfg.Period) * time.Second)
177 }
178 log.Logger.Info("ntp monitoring stopped")
179 }
180
181
182 func (n *NTPMonitor) Stop() {
183 log.Logger.Info("stopping ntp monitoring")
184 n.run.Store(false)
185 }
186
187
188
189 func (n *NTPMonitor) queryNTPServer(srv string) (*ntp.Response, error) {
190 var i = 1
191 for {
192 log.Logger.Debugf("querying ntp server %s", srv)
193
194 start := time.Now()
195 opts := ntp.QueryOptions{
196 Timeout: time.Duration(n.cfg.RequestTimeout) * time.Second,
197 }
198 resp, err := n.ntpClient.QueryWithOptions(srv, opts)
199 pkgapi.MetricNTPLatency.With(map[string]string{
200 "host": srv,
201 }).Observe(float64(time.Since(start)))
202 if err == nil {
203 pkgapi.MetricNTPSyncCount.With(map[string]string{
204 "failed": "false",
205 "host": srv,
206 }).Inc()
207
208 return resp, nil
209 }
210 pkgapi.MetricNTPSyncCount.With(map[string]string{
211 "failed": "true",
212 "host": srv,
213 }).Inc()
214
215 log.Logger.Infof("ntp timeout from %s, attempt %d/%d",
216 srv, i, n.cfg.RequestAttempts)
217 if i == n.cfg.RequestAttempts {
218 break
219 }
220 i++
221 time.Sleep(time.Second)
222 }
223 return nil, fmt.Errorf("ntp timeout: %s", srv)
224 }
225
View as plain text