...

Source file src/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go

Documentation: k8s.io/kubernetes/pkg/volume/util/recyclerclient

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package recyclerclient
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  
    24  	"k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/fields"
    28  	"k8s.io/apimachinery/pkg/watch"
    29  	clientset "k8s.io/client-go/kubernetes"
    30  	"k8s.io/klog/v2"
    31  )
    32  
    33  // RecycleEventRecorder is a func that defines how to record RecycleEvent.
    34  type RecycleEventRecorder func(eventtype, message string)
    35  
    36  // RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
    37  // Recyclers. This function will save the given Pod to the API and watch it
    38  // until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded,
    39  // whichever comes first. An attempt to delete a recycler pod is always
    40  // attempted before returning.
    41  //
    42  // In case there is a pod with the same namespace+name already running, this
    43  // function deletes it as it is not able to judge if it is an old recycler
    44  // or user has forged a fake recycler to block Kubernetes from recycling.//
    45  //
    46  //	 pod - the pod designed by a volume plugin to recycle the volume. pod.Name
    47  //	       will be overwritten with unique name based on PV.Name.
    48  //		client - kube client for API operations.
    49  func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error {
    50  	return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder))
    51  }
    52  
    53  // same as above func comments, except 'recyclerClient' is a narrower pod API
    54  // interface to ease testing
    55  func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, recyclerClient recyclerClient) error {
    56  	klog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name)
    57  
    58  	// Generate unique name for the recycler pod - we need to get "already
    59  	// exists" error when a previous controller has already started recycling
    60  	// the volume. Here we assume that pv.Name is already unique.
    61  	pod.Name = "recycler-for-" + pvName
    62  	pod.GenerateName = ""
    63  
    64  	stopChannel := make(chan struct{})
    65  	defer close(stopChannel)
    66  	podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel)
    67  	if err != nil {
    68  		klog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err)
    69  		return err
    70  	}
    71  
    72  	// Start the pod
    73  	_, err = recyclerClient.CreatePod(pod)
    74  	if err != nil {
    75  		if errors.IsAlreadyExists(err) {
    76  			deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace)
    77  			if deleteErr != nil {
    78  				return fmt.Errorf("failed to delete old recycler pod %s/%s: %s", pod.Namespace, pod.Name, deleteErr)
    79  			}
    80  			// Recycler will try again and the old pod will be hopefully deleted
    81  			// at that time.
    82  			return fmt.Errorf("old recycler pod found, will retry later")
    83  		}
    84  		return fmt.Errorf("unexpected error creating recycler pod:  %+v", err)
    85  	}
    86  	err = waitForPod(pod, recyclerClient, podCh)
    87  
    88  	// In all cases delete the recycler pod and log its result.
    89  	klog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name)
    90  	deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace)
    91  	if deleteErr != nil {
    92  		klog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err)
    93  	}
    94  
    95  	// Returning recycler error is preferred, the pod will be deleted again on
    96  	// the next retry.
    97  	if err != nil {
    98  		return fmt.Errorf("failed to recycle volume: %s", err)
    99  	}
   100  
   101  	// Recycle succeeded but we failed to delete the recycler pod. Report it,
   102  	// the controller will re-try recycling the PV again shortly.
   103  	if deleteErr != nil {
   104  		return fmt.Errorf("failed to delete recycler pod: %s", deleteErr)
   105  	}
   106  
   107  	return nil
   108  }
   109  
   110  // waitForPod watches the pod it until it finishes and send all events on the
   111  // pod to the PV.
   112  func waitForPod(pod *v1.Pod, recyclerClient recyclerClient, podCh <-chan watch.Event) error {
   113  	for {
   114  		event, ok := <-podCh
   115  		if !ok {
   116  			return fmt.Errorf("recycler pod %q watch channel had been closed", pod.Name)
   117  		}
   118  		switch event.Object.(type) {
   119  		case *v1.Pod:
   120  			// POD changed
   121  			pod := event.Object.(*v1.Pod)
   122  			klog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase)
   123  			switch event.Type {
   124  			case watch.Added, watch.Modified:
   125  				if pod.Status.Phase == v1.PodSucceeded {
   126  					// Recycle succeeded.
   127  					return nil
   128  				}
   129  				if pod.Status.Phase == v1.PodFailed {
   130  					if pod.Status.Message != "" {
   131  						return fmt.Errorf(pod.Status.Message)
   132  					}
   133  					return fmt.Errorf("pod failed, pod.Status.Message unknown")
   134  				}
   135  
   136  			case watch.Deleted:
   137  				return fmt.Errorf("recycler pod was deleted")
   138  
   139  			case watch.Error:
   140  				return fmt.Errorf("recycler pod watcher failed")
   141  			}
   142  
   143  		case *v1.Event:
   144  			// Event received
   145  			podEvent := event.Object.(*v1.Event)
   146  			klog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message)
   147  			if event.Type == watch.Added {
   148  				recyclerClient.Event(podEvent.Type, podEvent.Message)
   149  			}
   150  		}
   151  	}
   152  }
   153  
   154  // recyclerClient abstracts access to a Pod by providing a narrower interface.
   155  // This makes it easier to mock a client for testing.
   156  type recyclerClient interface {
   157  	CreatePod(pod *v1.Pod) (*v1.Pod, error)
   158  	GetPod(name, namespace string) (*v1.Pod, error)
   159  	DeletePod(name, namespace string) error
   160  	// WatchPod returns a ListWatch for watching a pod.  The stopChannel is used
   161  	// to close the reflector backing the watch.  The caller is responsible for
   162  	// derring a close on the channel to stop the reflector.
   163  	WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error)
   164  	// Event sends an event to the volume that is being recycled.
   165  	Event(eventtype, message string)
   166  }
   167  
   168  func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient {
   169  	return &realRecyclerClient{
   170  		client,
   171  		recorder,
   172  	}
   173  }
   174  
   175  type realRecyclerClient struct {
   176  	client   clientset.Interface
   177  	recorder RecycleEventRecorder
   178  }
   179  
   180  func (c *realRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) {
   181  	return c.client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
   182  }
   183  
   184  func (c *realRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) {
   185  	return c.client.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
   186  }
   187  
   188  func (c *realRecyclerClient) DeletePod(name, namespace string) error {
   189  	return c.client.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
   190  }
   191  
   192  func (c *realRecyclerClient) Event(eventtype, message string) {
   193  	c.recorder(eventtype, message)
   194  }
   195  
   196  // WatchPod watches a pod and events related to it. It sends pod updates and events over the returned channel
   197  // It will continue until stopChannel is closed
   198  func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) {
   199  	podSelector, err := fields.ParseSelector("metadata.name=" + name)
   200  	if err != nil {
   201  		return nil, err
   202  	}
   203  	options := metav1.ListOptions{
   204  		FieldSelector: podSelector.String(),
   205  		Watch:         true,
   206  	}
   207  
   208  	podWatch, err := c.client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
   209  	if err != nil {
   210  		return nil, err
   211  	}
   212  
   213  	eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name)
   214  	eventWatch, err := c.client.CoreV1().Events(namespace).Watch(context.TODO(), metav1.ListOptions{
   215  		FieldSelector: eventSelector.String(),
   216  		Watch:         true,
   217  	})
   218  	if err != nil {
   219  		podWatch.Stop()
   220  		return nil, err
   221  	}
   222  
   223  	eventCh := make(chan watch.Event, 30)
   224  	var wg sync.WaitGroup
   225  	wg.Add(2)
   226  
   227  	go func() {
   228  		defer close(eventCh)
   229  		wg.Wait()
   230  	}()
   231  
   232  	go func() {
   233  		defer eventWatch.Stop()
   234  		defer wg.Done()
   235  		for {
   236  			select {
   237  			case <-stopChannel:
   238  				return
   239  			case eventEvent, ok := <-eventWatch.ResultChan():
   240  				if !ok {
   241  					return
   242  				}
   243  				eventCh <- eventEvent
   244  			}
   245  		}
   246  	}()
   247  
   248  	go func() {
   249  		defer podWatch.Stop()
   250  		defer wg.Done()
   251  		for {
   252  			select {
   253  			case <-stopChannel:
   254  				return
   255  
   256  			case podEvent, ok := <-podWatch.ResultChan():
   257  				if !ok {
   258  					return
   259  				}
   260  				eventCh <- podEvent
   261  			}
   262  		}
   263  	}()
   264  
   265  	return eventCh, nil
   266  }
   267  

View as plain text