...

Source file src/k8s.io/kubernetes/pkg/kubelet/cloudresource/cloud_request_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cloudresource

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cloudresource
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"k8s.io/apimachinery/pkg/util/wait"
    28  	cloudprovider "k8s.io/cloud-provider"
    29  
    30  	"k8s.io/klog/v2"
    31  )
    32  
    33  // SyncManager is an interface for making requests to a cloud provider
    34  type SyncManager interface {
    35  	Run(stopCh <-chan struct{})
    36  	NodeAddresses() ([]v1.NodeAddress, error)
    37  }
    38  
    39  var _ SyncManager = &cloudResourceSyncManager{}
    40  
    41  type cloudResourceSyncManager struct {
    42  	// Cloud provider interface.
    43  	cloud cloudprovider.Interface
    44  	// Sync period
    45  	syncPeriod time.Duration
    46  
    47  	nodeAddressesMonitor *sync.Cond
    48  	nodeAddressesErr     error
    49  	nodeAddresses        []v1.NodeAddress
    50  
    51  	nodeName types.NodeName
    52  }
    53  
    54  // NewSyncManager creates a manager responsible for collecting resources from a
    55  // cloud provider through requests that are sensitive to timeouts and hanging
    56  func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
    57  	return &cloudResourceSyncManager{
    58  		cloud:      cloud,
    59  		syncPeriod: syncPeriod,
    60  		nodeName:   nodeName,
    61  		// nodeAddressesMonitor is a monitor that guards a result (nodeAddresses,
    62  		// nodeAddressesErr) of the sync loop under the condition that a result has
    63  		// been saved at least once. The semantics here are:
    64  		//
    65  		// * Readers of the result will wait on the monitor until the first result
    66  		//   has been saved.
    67  		// * The sync loop (i.e. the only writer), will signal all waiters every
    68  		//   time it updates the result.
    69  		nodeAddressesMonitor: sync.NewCond(&sync.Mutex{}),
    70  	}
    71  }
    72  
    73  // NodeAddresses waits for the first sync loop to run. If no successful syncs
    74  // have run, it will return the most recent error. If node addresses have been
    75  // synced successfully, it will return the list of node addresses from the most
    76  // recent successful sync.
    77  func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
    78  	m.nodeAddressesMonitor.L.Lock()
    79  	defer m.nodeAddressesMonitor.L.Unlock()
    80  	// wait until there is something
    81  	for {
    82  		if addrs, err := m.nodeAddresses, m.nodeAddressesErr; len(addrs) > 0 || err != nil {
    83  			return addrs, err
    84  		}
    85  		klog.V(5).InfoS("Waiting for cloud provider to provide node addresses")
    86  		m.nodeAddressesMonitor.Wait()
    87  	}
    88  }
    89  
    90  // getNodeAddresses calls the cloud provider to get a current list of node addresses.
    91  func (m *cloudResourceSyncManager) getNodeAddresses() ([]v1.NodeAddress, error) {
    92  	// TODO(roberthbailey): Can we do this without having credentials to talk to
    93  	// the cloud provider?
    94  	// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and
    95  	// returned an interface.
    96  	// TODO: If IP addresses couldn't be fetched from the cloud provider, should
    97  	// kubelet fallback on the other methods for getting the IP below?
    98  	instances, ok := m.cloud.Instances()
    99  	if !ok {
   100  		return nil, fmt.Errorf("failed to get instances from cloud provider")
   101  	}
   102  	return instances.NodeAddresses(context.TODO(), m.nodeName)
   103  }
   104  
   105  func (m *cloudResourceSyncManager) syncNodeAddresses() {
   106  	klog.V(5).InfoS("Requesting node addresses from cloud provider for node", "nodeName", m.nodeName)
   107  
   108  	addrs, err := m.getNodeAddresses()
   109  
   110  	m.nodeAddressesMonitor.L.Lock()
   111  	defer m.nodeAddressesMonitor.L.Unlock()
   112  	defer m.nodeAddressesMonitor.Broadcast()
   113  
   114  	if err != nil {
   115  		klog.V(2).InfoS("Node addresses from cloud provider for node not collected", "nodeName", m.nodeName, "err", err)
   116  
   117  		if len(m.nodeAddresses) > 0 {
   118  			// in the event that a sync loop fails when a previous sync had
   119  			// succeeded, continue to use the old addresses.
   120  			return
   121  		}
   122  
   123  		m.nodeAddressesErr = fmt.Errorf("failed to get node address from cloud provider: %v", err)
   124  		return
   125  	}
   126  
   127  	klog.V(5).InfoS("Node addresses from cloud provider for node collected", "nodeName", m.nodeName)
   128  	m.nodeAddressesErr = nil
   129  	m.nodeAddresses = addrs
   130  }
   131  
   132  // Run starts the cloud resource sync manager's sync loop.
   133  func (m *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
   134  	wait.Until(m.syncNodeAddresses, m.syncPeriod, stopCh)
   135  }
   136  

View as plain text