...

Source file src/github.com/xiang90/probing/prober.go

Documentation: github.com/xiang90/probing

     1  package probing
     2  
     3  import (
     4  	"encoding/json"
     5  	"errors"
     6  	"fmt"
     7  	"net/http"
     8  	"sync"
     9  	"time"
    10  )
    11  
    12  var (
    13  	ErrNotFound = errors.New("probing: id not found")
    14  	ErrExist    = errors.New("probing: id exists")
    15  )
    16  
    17  type Prober interface {
    18  	AddHTTP(id string, probingInterval time.Duration, endpoints []string) error
    19  	Remove(id string) error
    20  	RemoveAll()
    21  	Reset(id string) error
    22  	Status(id string) (Status, error)
    23  }
    24  
    25  type prober struct {
    26  	mu      sync.Mutex
    27  	targets map[string]*status
    28  	tr      http.RoundTripper
    29  }
    30  
    31  func NewProber(tr http.RoundTripper) Prober {
    32  	p := &prober{targets: make(map[string]*status)}
    33  	if tr == nil {
    34  		p.tr = http.DefaultTransport
    35  	} else {
    36  		p.tr = tr
    37  	}
    38  	return p
    39  }
    40  
    41  func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
    42  	p.mu.Lock()
    43  	defer p.mu.Unlock()
    44  	if _, ok := p.targets[id]; ok {
    45  		return ErrExist
    46  	}
    47  
    48  	s := &status{stopC: make(chan struct{})}
    49  	p.targets[id] = s
    50  
    51  	ticker := time.NewTicker(probingInterval)
    52  
    53  	go func() {
    54  		pinned := 0
    55  		for {
    56  			select {
    57  			case <-ticker.C:
    58  				start := time.Now()
    59  				req, err := http.NewRequest("GET", endpoints[pinned], nil)
    60  				if err != nil {
    61  					panic(err)
    62  				}
    63  				resp, err := p.tr.RoundTrip(req)
    64  				if err == nil && resp.StatusCode != http.StatusOK {
    65  					err = fmt.Errorf("got unexpected HTTP status code %s from %s", resp.Status, endpoints[pinned])
    66  					resp.Body.Close()
    67  				}
    68  				if err != nil {
    69  					s.recordFailure(err)
    70  					pinned = (pinned + 1) % len(endpoints)
    71  					continue
    72  				}
    73  
    74  				var hh Health
    75  				d := json.NewDecoder(resp.Body)
    76  				err = d.Decode(&hh)
    77  				resp.Body.Close()
    78  				if err != nil || !hh.OK {
    79  					s.recordFailure(err)
    80  					pinned = (pinned + 1) % len(endpoints)
    81  					continue
    82  				}
    83  
    84  				s.record(time.Since(start), hh.Now)
    85  			case <-s.stopC:
    86  				ticker.Stop()
    87  				return
    88  			}
    89  		}
    90  	}()
    91  
    92  	return nil
    93  }
    94  
    95  func (p *prober) Remove(id string) error {
    96  	p.mu.Lock()
    97  	defer p.mu.Unlock()
    98  
    99  	s, ok := p.targets[id]
   100  	if !ok {
   101  		return ErrNotFound
   102  	}
   103  	close(s.stopC)
   104  	delete(p.targets, id)
   105  	return nil
   106  }
   107  
   108  func (p *prober) RemoveAll() {
   109  	p.mu.Lock()
   110  	defer p.mu.Unlock()
   111  
   112  	for _, s := range p.targets {
   113  		close(s.stopC)
   114  	}
   115  	p.targets = make(map[string]*status)
   116  }
   117  
   118  func (p *prober) Reset(id string) error {
   119  	p.mu.Lock()
   120  	defer p.mu.Unlock()
   121  
   122  	s, ok := p.targets[id]
   123  	if !ok {
   124  		return ErrNotFound
   125  	}
   126  	s.reset()
   127  	return nil
   128  }
   129  
   130  func (p *prober) Status(id string) (Status, error) {
   131  	p.mu.Lock()
   132  	defer p.mu.Unlock()
   133  
   134  	s, ok := p.targets[id]
   135  	if !ok {
   136  		return nil, ErrNotFound
   137  	}
   138  	return s, nil
   139  }
   140  

View as plain text