...
1
16
17 package cloudresource
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "time"
24
25 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/apimachinery/pkg/util/wait"
28 cloudprovider "k8s.io/cloud-provider"
29
30 "k8s.io/klog/v2"
31 )
32
33
34 type SyncManager interface {
35 Run(stopCh <-chan struct{})
36 NodeAddresses() ([]v1.NodeAddress, error)
37 }
38
39 var _ SyncManager = &cloudResourceSyncManager{}
40
41 type cloudResourceSyncManager struct {
42
43 cloud cloudprovider.Interface
44
45 syncPeriod time.Duration
46
47 nodeAddressesMonitor *sync.Cond
48 nodeAddressesErr error
49 nodeAddresses []v1.NodeAddress
50
51 nodeName types.NodeName
52 }
53
54
55
56 func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
57 return &cloudResourceSyncManager{
58 cloud: cloud,
59 syncPeriod: syncPeriod,
60 nodeName: nodeName,
61
62
63
64
65
66
67
68
69 nodeAddressesMonitor: sync.NewCond(&sync.Mutex{}),
70 }
71 }
72
73
74
75
76
77 func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
78 m.nodeAddressesMonitor.L.Lock()
79 defer m.nodeAddressesMonitor.L.Unlock()
80
81 for {
82 if addrs, err := m.nodeAddresses, m.nodeAddressesErr; len(addrs) > 0 || err != nil {
83 return addrs, err
84 }
85 klog.V(5).InfoS("Waiting for cloud provider to provide node addresses")
86 m.nodeAddressesMonitor.Wait()
87 }
88 }
89
90
91 func (m *cloudResourceSyncManager) getNodeAddresses() ([]v1.NodeAddress, error) {
92
93
94
95
96
97
98 instances, ok := m.cloud.Instances()
99 if !ok {
100 return nil, fmt.Errorf("failed to get instances from cloud provider")
101 }
102 return instances.NodeAddresses(context.TODO(), m.nodeName)
103 }
104
105 func (m *cloudResourceSyncManager) syncNodeAddresses() {
106 klog.V(5).InfoS("Requesting node addresses from cloud provider for node", "nodeName", m.nodeName)
107
108 addrs, err := m.getNodeAddresses()
109
110 m.nodeAddressesMonitor.L.Lock()
111 defer m.nodeAddressesMonitor.L.Unlock()
112 defer m.nodeAddressesMonitor.Broadcast()
113
114 if err != nil {
115 klog.V(2).InfoS("Node addresses from cloud provider for node not collected", "nodeName", m.nodeName, "err", err)
116
117 if len(m.nodeAddresses) > 0 {
118
119
120 return
121 }
122
123 m.nodeAddressesErr = fmt.Errorf("failed to get node address from cloud provider: %v", err)
124 return
125 }
126
127 klog.V(5).InfoS("Node addresses from cloud provider for node collected", "nodeName", m.nodeName)
128 m.nodeAddressesErr = nil
129 m.nodeAddresses = addrs
130 }
131
132
133 func (m *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
134 wait.Until(m.syncNodeAddresses, m.syncPeriod, stopCh)
135 }
136
View as plain text