1
16
17 package kubelet
18
19 import (
20 "fmt"
21 "net"
22 "runtime"
23
24 "k8s.io/klog/v2"
25 "k8s.io/mount-utils"
26 utilexec "k8s.io/utils/exec"
27
28 authenticationv1 "k8s.io/api/authentication/v1"
29 v1 "k8s.io/api/core/v1"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/types"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/client-go/informers"
34 clientset "k8s.io/client-go/kubernetes"
35 storagelisters "k8s.io/client-go/listers/storage/v1"
36 "k8s.io/client-go/tools/cache"
37 "k8s.io/client-go/tools/record"
38 cloudprovider "k8s.io/cloud-provider"
39 "k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
40 "k8s.io/kubernetes/pkg/kubelet/configmap"
41 "k8s.io/kubernetes/pkg/kubelet/secret"
42 "k8s.io/kubernetes/pkg/kubelet/token"
43 "k8s.io/kubernetes/pkg/volume"
44 "k8s.io/kubernetes/pkg/volume/util"
45 "k8s.io/kubernetes/pkg/volume/util/hostutil"
46 "k8s.io/kubernetes/pkg/volume/util/subpath"
47 )
48
49
50
51
52
53
54
55 func NewInitializedVolumePluginMgr(
56 kubelet *Kubelet,
57 secretManager secret.Manager,
58 configMapManager configmap.Manager,
59 tokenManager *token.Manager,
60 clusterTrustBundleManager clustertrustbundle.Manager,
61 plugins []volume.VolumePlugin,
62 prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
63
64
65 var informerFactory informers.SharedInformerFactory
66 var csiDriverLister storagelisters.CSIDriverLister
67 var csiDriversSynced cache.InformerSynced
68 const resyncPeriod = 0
69
70 if kubelet.kubeClient != nil {
71 informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
72 csiDriverInformer := informerFactory.Storage().V1().CSIDrivers()
73 csiDriverLister = csiDriverInformer.Lister()
74 csiDriversSynced = csiDriverInformer.Informer().HasSynced
75
76 } else {
77 klog.InfoS("KubeClient is nil. Skip initialization of CSIDriverLister")
78 }
79
80 kvh := &kubeletVolumeHost{
81 kubelet: kubelet,
82 volumePluginMgr: volume.VolumePluginMgr{},
83 secretManager: secretManager,
84 configMapManager: configMapManager,
85 tokenManager: tokenManager,
86 clusterTrustBundleManager: clusterTrustBundleManager,
87 informerFactory: informerFactory,
88 csiDriverLister: csiDriverLister,
89 csiDriversSynced: csiDriversSynced,
90 exec: utilexec.New(),
91 }
92
93 if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
94 return nil, fmt.Errorf(
95 "could not initialize volume plugins for KubeletVolumePluginMgr: %v",
96 err)
97 }
98
99 return &kvh.volumePluginMgr, nil
100 }
101
102
103 var _ volume.VolumeHost = &kubeletVolumeHost{}
104 var _ volume.KubeletVolumeHost = &kubeletVolumeHost{}
105
106 func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string {
107 return kvh.kubelet.getPluginDir(pluginName)
108 }
109
110 type kubeletVolumeHost struct {
111 kubelet *Kubelet
112 volumePluginMgr volume.VolumePluginMgr
113 secretManager secret.Manager
114 tokenManager *token.Manager
115 configMapManager configmap.Manager
116 clusterTrustBundleManager clustertrustbundle.Manager
117 informerFactory informers.SharedInformerFactory
118 csiDriverLister storagelisters.CSIDriverLister
119 csiDriversSynced cache.InformerSynced
120 exec utilexec.Interface
121 }
122
123 func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
124 kvh.kubelet.runtimeState.setStorageState(err)
125 }
126
127 func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
128 return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
129 }
130
131 func (kvh *kubeletVolumeHost) GetPodsDir() string {
132 return kvh.kubelet.getPodsDir()
133 }
134
135 func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
136 dir := kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
137 if runtime.GOOS == "windows" {
138 dir = util.GetWindowsPath(dir)
139 }
140 return dir
141 }
142
143 func (kvh *kubeletVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
144 return kvh.kubelet.getPodVolumeDeviceDir(podUID, pluginName)
145 }
146
147 func (kvh *kubeletVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
148 return kvh.kubelet.getPodPluginDir(podUID, pluginName)
149 }
150
151 func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface {
152 return kvh.kubelet.kubeClient
153 }
154
155 func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
156 return kvh.kubelet.subpather
157 }
158
159 func (kvh *kubeletVolumeHost) GetHostUtil() hostutil.HostUtils {
160 return kvh.kubelet.hostutil
161 }
162
163 func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
164 return kvh.informerFactory
165 }
166
167 func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
168 return kvh.csiDriverLister
169 }
170
171 func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
172 return kvh.csiDriversSynced
173 }
174
175
176 func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
177 if kvh.csiDriversSynced == nil {
178 klog.ErrorS(nil, "CsiDriversSynced not found on KubeletVolumeHost")
179 return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
180 }
181
182 synced := []cache.InformerSynced{kvh.csiDriversSynced}
183 if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
184 klog.InfoS("Failed to wait for cache sync for CSIDriverLister")
185 return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
186 }
187
188 return nil
189 }
190
191 func (kvh *kubeletVolumeHost) NewWrapperMounter(
192 volName string,
193 spec volume.Spec,
194 pod *v1.Pod,
195 opts volume.VolumeOptions) (volume.Mounter, error) {
196
197 wrapperVolumeName := "wrapped_" + volName
198 if spec.Volume != nil {
199 spec.Volume.Name = wrapperVolumeName
200 }
201
202 return kvh.kubelet.newVolumeMounterFromPlugins(&spec, pod, opts)
203 }
204
205 func (kvh *kubeletVolumeHost) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
206
207 wrapperVolumeName := "wrapped_" + volName
208 if spec.Volume != nil {
209 spec.Volume.Name = wrapperVolumeName
210 }
211
212 plugin, err := kvh.kubelet.volumePluginMgr.FindPluginBySpec(&spec)
213 if err != nil {
214 return nil, err
215 }
216
217 return plugin.NewUnmounter(spec.Name(), podUID)
218 }
219
220 func (kvh *kubeletVolumeHost) GetCloudProvider() cloudprovider.Interface {
221 return kvh.kubelet.cloud
222 }
223
224 func (kvh *kubeletVolumeHost) GetMounter(pluginName string) mount.Interface {
225 return kvh.kubelet.mounter
226 }
227
228 func (kvh *kubeletVolumeHost) GetHostName() string {
229 return kvh.kubelet.hostname
230 }
231
232 func (kvh *kubeletVolumeHost) GetHostIP() (net.IP, error) {
233 hostIPs, err := kvh.kubelet.GetHostIPs()
234 if err != nil {
235 return nil, err
236 }
237 return hostIPs[0], err
238 }
239
240 func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
241 node, err := kvh.kubelet.getNodeAnyWay()
242 if err != nil {
243 return nil, fmt.Errorf("error retrieving node: %v", err)
244 }
245 return node.Status.Allocatable, nil
246 }
247
248 func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
249 if kvh.secretManager != nil {
250 return kvh.secretManager.GetSecret
251 }
252 return func(namespace, name string) (*v1.Secret, error) {
253 return nil, fmt.Errorf("not supported due to running kubelet in standalone mode")
254 }
255 }
256
257 func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
258 if kvh.configMapManager != nil {
259 return kvh.configMapManager.GetConfigMap
260 }
261 return func(namespace, name string) (*v1.ConfigMap, error) {
262 return nil, fmt.Errorf("not supported due to running kubelet in standalone mode")
263 }
264 }
265
266 func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
267 return kvh.tokenManager.GetServiceAccountToken
268 }
269
270 func (kvh *kubeletVolumeHost) DeleteServiceAccountTokenFunc() func(podUID types.UID) {
271 return kvh.tokenManager.DeleteServiceAccountToken
272 }
273
274 func (kvh *kubeletVolumeHost) GetTrustAnchorsByName(name string, allowMissing bool) ([]byte, error) {
275 return kvh.clusterTrustBundleManager.GetTrustAnchorsByName(name, allowMissing)
276 }
277
278 func (kvh *kubeletVolumeHost) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
279 return kvh.clusterTrustBundleManager.GetTrustAnchorsBySigner(signerName, labelSelector, allowMissing)
280 }
281
282 func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
283 node, err := kvh.kubelet.GetNode()
284 if err != nil {
285 return nil, fmt.Errorf("error retrieving node: %v", err)
286 }
287 return node.Labels, nil
288 }
289
290 func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
291 node, err := kvh.kubelet.GetNode()
292 if err != nil {
293 return nil, fmt.Errorf("error retrieving node: %v", err)
294 }
295 attachedVolumes := node.Status.VolumesAttached
296 result := map[v1.UniqueVolumeName]string{}
297 for i := range attachedVolumes {
298 attachedVolume := attachedVolumes[i]
299 result[attachedVolume.Name] = attachedVolume.DevicePath
300 }
301 return result, nil
302 }
303
304 func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
305 return kvh.kubelet.nodeName
306 }
307
308 func (kvh *kubeletVolumeHost) GetEventRecorder() record.EventRecorder {
309 return kvh.kubelet.recorder
310 }
311
312 func (kvh *kubeletVolumeHost) GetExec(pluginName string) utilexec.Interface {
313 return kvh.exec
314 }
315
View as plain text