...
1
16
17 package kubelet
18
19 import (
20 "errors"
21 "fmt"
22 "sync"
23 "time"
24
25 utilerrors "k8s.io/apimachinery/pkg/util/errors"
26 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
27 )
28
29 type runtimeState struct {
30 sync.RWMutex
31 lastBaseRuntimeSync time.Time
32 baseRuntimeSyncThreshold time.Duration
33 networkError error
34 runtimeError error
35 storageError error
36 cidr string
37 healthChecks []*healthCheck
38 rtHandlers []kubecontainer.RuntimeHandler
39 }
40
41
42
43 type healthCheckFnType func() (bool, error)
44
45 type healthCheck struct {
46 name string
47 fn healthCheckFnType
48 }
49
50 func (s *runtimeState) addHealthCheck(name string, f healthCheckFnType) {
51 s.Lock()
52 defer s.Unlock()
53 s.healthChecks = append(s.healthChecks, &healthCheck{name: name, fn: f})
54 }
55
56 func (s *runtimeState) setRuntimeSync(t time.Time) {
57 s.Lock()
58 defer s.Unlock()
59 s.lastBaseRuntimeSync = t
60 }
61
62 func (s *runtimeState) setNetworkState(err error) {
63 s.Lock()
64 defer s.Unlock()
65 s.networkError = err
66 }
67
68 func (s *runtimeState) setRuntimeState(err error) {
69 s.Lock()
70 defer s.Unlock()
71 s.runtimeError = err
72 }
73
74 func (s *runtimeState) setRuntimeHandlers(rtHandlers []kubecontainer.RuntimeHandler) {
75 s.Lock()
76 defer s.Unlock()
77 s.rtHandlers = rtHandlers
78 }
79
80 func (s *runtimeState) runtimeHandlers() []kubecontainer.RuntimeHandler {
81 s.RLock()
82 defer s.RUnlock()
83 return s.rtHandlers
84 }
85
86 func (s *runtimeState) setStorageState(err error) {
87 s.Lock()
88 defer s.Unlock()
89 s.storageError = err
90 }
91
92 func (s *runtimeState) setPodCIDR(cidr string) {
93 s.Lock()
94 defer s.Unlock()
95 s.cidr = cidr
96 }
97
98 func (s *runtimeState) podCIDR() string {
99 s.RLock()
100 defer s.RUnlock()
101 return s.cidr
102 }
103
104 func (s *runtimeState) runtimeErrors() error {
105 s.RLock()
106 defer s.RUnlock()
107 errs := []error{}
108 if s.lastBaseRuntimeSync.IsZero() {
109 errs = append(errs, errors.New("container runtime status check may not have completed yet"))
110 } else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
111 errs = append(errs, errors.New("container runtime is down"))
112 }
113 for _, hc := range s.healthChecks {
114 if ok, err := hc.fn(); !ok {
115 errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
116 }
117 }
118 if s.runtimeError != nil {
119 errs = append(errs, s.runtimeError)
120 }
121
122 return utilerrors.NewAggregate(errs)
123 }
124
125 func (s *runtimeState) networkErrors() error {
126 s.RLock()
127 defer s.RUnlock()
128 errs := []error{}
129 if s.networkError != nil {
130 errs = append(errs, s.networkError)
131 }
132 return utilerrors.NewAggregate(errs)
133 }
134
135 func (s *runtimeState) storageErrors() error {
136 s.RLock()
137 defer s.RUnlock()
138 errs := []error{}
139 if s.storageError != nil {
140 errs = append(errs, s.storageError)
141 }
142 return utilerrors.NewAggregate(errs)
143 }
144
145 func newRuntimeState(runtimeSyncThreshold time.Duration) *runtimeState {
146 return &runtimeState{
147 lastBaseRuntimeSync: time.Time{},
148 baseRuntimeSyncThreshold: runtimeSyncThreshold,
149 networkError: ErrNetworkUnknown,
150 }
151 }
152
View as plain text