...

Source file src/github.com/sigstore/timestamp-authority/pkg/ntpmonitor/ntpmonitor.go

Documentation: github.com/sigstore/timestamp-authority/pkg/ntpmonitor

     1  //
     2  // Copyright 2022 The Sigstore Authors.
     3  //
     4  // Licensed under the Apache License, Version 2.0 (the "License");
     5  // you may not use this file except in compliance with the License.
     6  // You may obtain a copy of the License at
     7  //
     8  //     http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  // Unless required by applicable law or agreed to in writing, software
    11  // distributed under the License is distributed on an "AS IS" BASIS,
    12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  // See the License for the specific language governing permissions and
    14  // limitations under the License.
    15  
    16  package ntpmonitor
    17  
    18  import (
    19  	"errors"
    20  	"fmt"
    21  	"math/rand"
    22  	"sync/atomic"
    23  	"time"
    24  
    25  	"github.com/beevik/ntp"
    26  
    27  	pkgapi "github.com/sigstore/timestamp-authority/pkg/api"
    28  	"github.com/sigstore/timestamp-authority/pkg/log"
    29  )
    30  
    31  var (
    32  	// ErrInvTime indicates that the local time has drifted too much
    33  	// from the monitored NTP servers.
    34  	ErrInvTime = errors.New("local time differs from observed")
    35  	// ErrTooFewServers means that the number of trusted servers are
    36  	// smaller then the selected num servers to query.
    37  	ErrTooFewServers = errors.New("too few ntp servers configured")
    38  	// ErrNoResponse indicates that there is an error to communicate with
    39  	// the remote NTP servers
    40  	ErrNoResponse = errors.New("no ntp response")
    41  	// ErrThreshold means that there is no positive threshold value
    42  	ErrThreshold = errors.New("no valid server threshold set")
    43  	// ErrDeltaTooSmall is referring to when the max delta time is
    44  	// smaller than the request timeout which can give unstable behaviour.
    45  	ErrDeltaTooSmall = errors.New("delta is too small")
    46  )
    47  
    48  type serverResponses struct {
    49  	tooFewServerResponses   bool
    50  	tooManyInvalidResponses bool
    51  }
    52  
    53  type NTPClient interface {
    54  	QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error)
    55  }
    56  
    57  type LiveNTPClient struct{}
    58  
    59  func (c LiveNTPClient) QueryWithOptions(srv string, opts ntp.QueryOptions) (*ntp.Response, error) {
    60  	return ntp.QueryWithOptions(srv, opts)
    61  }
    62  
    63  // NTPMonitor compares the local time with a set of trusted NTP servers.
    64  type NTPMonitor struct {
    65  	cfg       *Config
    66  	run       atomic.Bool
    67  	ntpClient NTPClient
    68  }
    69  
    70  // New creates a NTPMonitor, reading the configuration from the provided
    71  // path.
    72  func New(configFile string) (*NTPMonitor, error) {
    73  	cfg, err := LoadConfig(configFile)
    74  	if err != nil {
    75  		return nil, err
    76  	}
    77  	return NewFromConfig(cfg)
    78  }
    79  
    80  // NewFromConfig creates a NTPMonitor from an instantiated configuration.
    81  func NewFromConfig(cfg *Config) (*NTPMonitor, error) {
    82  	// default to using a live NTP client
    83  	liveNTPClient := LiveNTPClient{}
    84  	return NewFromConfigWithClient(cfg, liveNTPClient)
    85  }
    86  
    87  func NewFromConfigWithClient(cfg *Config, client NTPClient) (*NTPMonitor, error) {
    88  	if len(cfg.Servers) == 0 || len(cfg.Servers) < cfg.NumServers {
    89  		return nil, ErrTooFewServers
    90  	}
    91  
    92  	if cfg.ServerThreshold < 1 {
    93  		return nil, ErrThreshold
    94  	}
    95  
    96  	if cfg.ServerThreshold > cfg.NumServers {
    97  		return nil, ErrTooFewServers
    98  	}
    99  
   100  	if cfg.RequestTimeout < 1 || cfg.MaxTimeDelta < cfg.RequestTimeout {
   101  		return nil, ErrDeltaTooSmall
   102  	}
   103  
   104  	return &NTPMonitor{cfg: cfg, ntpClient: client}, nil
   105  }
   106  
   107  func (n *NTPMonitor) queryServers(delta time.Duration, servers []string) serverResponses {
   108  	validResponses := 0
   109  	noResponse := 0
   110  	for _, srv := range servers {
   111  		// Create a time interval from 'now' with the max
   112  		// time delta added/removed
   113  		// Make sure the time from the remote NTP server lies
   114  		// within this interval.
   115  		resp, err := n.queryNTPServer(srv)
   116  		if err != nil {
   117  			log.Logger.Errorf("ntp response timeout from %s",
   118  				srv)
   119  			noResponse++
   120  			continue
   121  		}
   122  
   123  		// ClockOffset is the estimated difference from
   124  		// local time to NTP server's time.
   125  		// The estimate assumes latency is similar for both
   126  		// sending and receiving data.
   127  		// The estimated offset does not depend on the value
   128  		// of the latency.
   129  		if resp.ClockOffset.Abs() > delta {
   130  			log.Logger.Warnf("local time is different from %s: %s",
   131  				srv, resp.Time)
   132  		} else {
   133  			validResponses++
   134  		}
   135  	}
   136  
   137  	// Did enough NTP servers respond?
   138  	return serverResponses{
   139  		tooFewServerResponses:   n.cfg.ServerThreshold > n.cfg.NumServers-noResponse,
   140  		tooManyInvalidResponses: n.cfg.ServerThreshold > validResponses,
   141  	}
   142  }
   143  
   144  // Start the periodic monitor. Once started, it runs until Stop() is called,
   145  func (n *NTPMonitor) Start() {
   146  	n.run.Store(true)
   147  
   148  	if n.cfg.RequestTimeout < 1 {
   149  		log.Logger.Warnf("NTP request timeout not set, default to 1s")
   150  		n.cfg.RequestTimeout = 1
   151  	}
   152  
   153  	delta := time.Duration(n.cfg.MaxTimeDelta) * time.Second
   154  	log.Logger.Info("ntp monitoring starting")
   155  
   156  	//nolint:gosec
   157  	r := rand.New(rand.NewSource(time.Now().UTC().UnixNano())) // initialize local pseudorandom generator //nolint:gosec
   158  
   159  	for n.run.Load() {
   160  		servers := RandomChoice(n.cfg.Servers, n.cfg.NumServers, r)
   161  		responses := n.queryServers(delta, servers)
   162  
   163  		// Did enough NTP servers respond?
   164  		if responses.tooFewServerResponses {
   165  			pkgapi.MetricNTPErrorCount.With(map[string]string{
   166  				"reason": "err_too_few",
   167  			}).Inc()
   168  		}
   169  		if responses.tooManyInvalidResponses {
   170  			pkgapi.MetricNTPErrorCount.With(map[string]string{
   171  				"reason": "err_inv_time",
   172  			}).Inc()
   173  		}
   174  
   175  		// Local time is in sync. Wait for next poll.
   176  		time.Sleep(time.Duration(n.cfg.Period) * time.Second)
   177  	}
   178  	log.Logger.Info("ntp monitoring stopped")
   179  }
   180  
   181  // Stop the monitoring.
   182  func (n *NTPMonitor) Stop() {
   183  	log.Logger.Info("stopping ntp monitoring")
   184  	n.run.Store(false)
   185  }
   186  
   187  // queryNTPServer queries a provided ntp server, trying up to a configured
   188  // amount of times. There is one second sleep between each attempt.
   189  func (n *NTPMonitor) queryNTPServer(srv string) (*ntp.Response, error) {
   190  	var i = 1
   191  	for {
   192  		log.Logger.Debugf("querying ntp server %s", srv)
   193  
   194  		start := time.Now()
   195  		opts := ntp.QueryOptions{
   196  			Timeout: time.Duration(n.cfg.RequestTimeout) * time.Second,
   197  		}
   198  		resp, err := n.ntpClient.QueryWithOptions(srv, opts)
   199  		pkgapi.MetricNTPLatency.With(map[string]string{
   200  			"host": srv,
   201  		}).Observe(float64(time.Since(start)))
   202  		if err == nil {
   203  			pkgapi.MetricNTPSyncCount.With(map[string]string{
   204  				"failed": "false",
   205  				"host":   srv,
   206  			}).Inc()
   207  
   208  			return resp, nil
   209  		}
   210  		pkgapi.MetricNTPSyncCount.With(map[string]string{
   211  			"failed": "true",
   212  			"host":   srv,
   213  		}).Inc()
   214  
   215  		log.Logger.Infof("ntp timeout from %s, attempt %d/%d",
   216  			srv, i, n.cfg.RequestAttempts)
   217  		if i == n.cfg.RequestAttempts {
   218  			break
   219  		}
   220  		i++
   221  		time.Sleep(time.Second)
   222  	}
   223  	return nil, fmt.Errorf("ntp timeout: %s", srv)
   224  }
   225  

View as plain text