...

Source file src/k8s.io/kubernetes/pkg/kubelet/runtime.go

Documentation: k8s.io/kubernetes/pkg/kubelet

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    26  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    27  )
    28  
    29  type runtimeState struct {
    30  	sync.RWMutex
    31  	lastBaseRuntimeSync      time.Time
    32  	baseRuntimeSyncThreshold time.Duration
    33  	networkError             error
    34  	runtimeError             error
    35  	storageError             error
    36  	cidr                     string
    37  	healthChecks             []*healthCheck
    38  	rtHandlers               []kubecontainer.RuntimeHandler
    39  }
    40  
    41  // A health check function should be efficient and not rely on external
    42  // components (e.g., container runtime).
    43  type healthCheckFnType func() (bool, error)
    44  
    45  type healthCheck struct {
    46  	name string
    47  	fn   healthCheckFnType
    48  }
    49  
    50  func (s *runtimeState) addHealthCheck(name string, f healthCheckFnType) {
    51  	s.Lock()
    52  	defer s.Unlock()
    53  	s.healthChecks = append(s.healthChecks, &healthCheck{name: name, fn: f})
    54  }
    55  
    56  func (s *runtimeState) setRuntimeSync(t time.Time) {
    57  	s.Lock()
    58  	defer s.Unlock()
    59  	s.lastBaseRuntimeSync = t
    60  }
    61  
    62  func (s *runtimeState) setNetworkState(err error) {
    63  	s.Lock()
    64  	defer s.Unlock()
    65  	s.networkError = err
    66  }
    67  
    68  func (s *runtimeState) setRuntimeState(err error) {
    69  	s.Lock()
    70  	defer s.Unlock()
    71  	s.runtimeError = err
    72  }
    73  
    74  func (s *runtimeState) setRuntimeHandlers(rtHandlers []kubecontainer.RuntimeHandler) {
    75  	s.Lock()
    76  	defer s.Unlock()
    77  	s.rtHandlers = rtHandlers
    78  }
    79  
    80  func (s *runtimeState) runtimeHandlers() []kubecontainer.RuntimeHandler {
    81  	s.RLock()
    82  	defer s.RUnlock()
    83  	return s.rtHandlers
    84  }
    85  
    86  func (s *runtimeState) setStorageState(err error) {
    87  	s.Lock()
    88  	defer s.Unlock()
    89  	s.storageError = err
    90  }
    91  
    92  func (s *runtimeState) setPodCIDR(cidr string) {
    93  	s.Lock()
    94  	defer s.Unlock()
    95  	s.cidr = cidr
    96  }
    97  
    98  func (s *runtimeState) podCIDR() string {
    99  	s.RLock()
   100  	defer s.RUnlock()
   101  	return s.cidr
   102  }
   103  
   104  func (s *runtimeState) runtimeErrors() error {
   105  	s.RLock()
   106  	defer s.RUnlock()
   107  	errs := []error{}
   108  	if s.lastBaseRuntimeSync.IsZero() {
   109  		errs = append(errs, errors.New("container runtime status check may not have completed yet"))
   110  	} else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
   111  		errs = append(errs, errors.New("container runtime is down"))
   112  	}
   113  	for _, hc := range s.healthChecks {
   114  		if ok, err := hc.fn(); !ok {
   115  			errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
   116  		}
   117  	}
   118  	if s.runtimeError != nil {
   119  		errs = append(errs, s.runtimeError)
   120  	}
   121  
   122  	return utilerrors.NewAggregate(errs)
   123  }
   124  
   125  func (s *runtimeState) networkErrors() error {
   126  	s.RLock()
   127  	defer s.RUnlock()
   128  	errs := []error{}
   129  	if s.networkError != nil {
   130  		errs = append(errs, s.networkError)
   131  	}
   132  	return utilerrors.NewAggregate(errs)
   133  }
   134  
   135  func (s *runtimeState) storageErrors() error {
   136  	s.RLock()
   137  	defer s.RUnlock()
   138  	errs := []error{}
   139  	if s.storageError != nil {
   140  		errs = append(errs, s.storageError)
   141  	}
   142  	return utilerrors.NewAggregate(errs)
   143  }
   144  
   145  func newRuntimeState(runtimeSyncThreshold time.Duration) *runtimeState {
   146  	return &runtimeState{
   147  		lastBaseRuntimeSync:      time.Time{},
   148  		baseRuntimeSyncThreshold: runtimeSyncThreshold,
   149  		networkError:             ErrNetworkUnknown,
   150  	}
   151  }
   152  

View as plain text