...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package operationexecutor implements interfaces that enable execution of
    18  // register and unregister operations with a
    19  // goroutinemap so that more than one operation is never triggered
    20  // on the same plugin.
    21  package operationexecutor
    22  
    23  import (
    24  	"context"
    25  	"errors"
    26  	"fmt"
    27  	"net"
    28  	"time"
    29  
    30  	"k8s.io/klog/v2"
    31  
    32  	"google.golang.org/grpc"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"k8s.io/client-go/tools/record"
    35  	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
    36  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    37  )
    38  
    39  const (
    40  	dialTimeoutDuration   = 10 * time.Second
    41  	notifyTimeoutDuration = 5 * time.Second
    42  )
    43  
    44  var _ OperationGenerator = &operationGenerator{}
    45  
    46  type operationGenerator struct {
    47  
    48  	// recorder is used to record events in the API server
    49  	recorder record.EventRecorder
    50  }
    51  
    52  // NewOperationGenerator is returns instance of operationGenerator
    53  func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {
    54  
    55  	return &operationGenerator{
    56  		recorder: recorder,
    57  	}
    58  }
    59  
    60  // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
    61  type OperationGenerator interface {
    62  	// Generates the RegisterPlugin function needed to perform the registration of a plugin
    63  	GenerateRegisterPluginFunc(
    64  		socketPath string,
    65  		timestamp time.Time,
    66  		pluginHandlers map[string]cache.PluginHandler,
    67  		actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
    68  
    69  	// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
    70  	GenerateUnregisterPluginFunc(
    71  		pluginInfo cache.PluginInfo,
    72  		actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
    73  }
    74  
    75  func (og *operationGenerator) GenerateRegisterPluginFunc(
    76  	socketPath string,
    77  	timestamp time.Time,
    78  	pluginHandlers map[string]cache.PluginHandler,
    79  	actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
    80  
    81  	registerPluginFunc := func() error {
    82  		client, conn, err := dial(socketPath, dialTimeoutDuration)
    83  		if err != nil {
    84  			return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
    85  		}
    86  		defer conn.Close()
    87  
    88  		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    89  		defer cancel()
    90  
    91  		infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
    92  		if err != nil {
    93  			return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
    94  		}
    95  
    96  		handler, ok := pluginHandlers[infoResp.Type]
    97  		if !ok {
    98  			if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
    99  				return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
   100  			}
   101  			return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
   102  		}
   103  
   104  		if infoResp.Endpoint == "" {
   105  			infoResp.Endpoint = socketPath
   106  		}
   107  		if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
   108  			if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
   109  				return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
   110  			}
   111  			return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
   112  		}
   113  		// We add the plugin to the actual state of world cache before calling a plugin consumer's Register handle
   114  		// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
   115  		err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
   116  			SocketPath: socketPath,
   117  			Timestamp:  timestamp,
   118  			Handler:    handler,
   119  			Name:       infoResp.Name,
   120  		})
   121  		if err != nil {
   122  			klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
   123  		}
   124  		if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil {
   125  			return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
   126  		}
   127  
   128  		// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
   129  		if err := og.notifyPlugin(client, true, ""); err != nil {
   130  			return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
   131  		}
   132  		return nil
   133  	}
   134  	return registerPluginFunc
   135  }
   136  
   137  func (og *operationGenerator) GenerateUnregisterPluginFunc(
   138  	pluginInfo cache.PluginInfo,
   139  	actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
   140  
   141  	unregisterPluginFunc := func() error {
   142  		if pluginInfo.Handler == nil {
   143  			return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
   144  		}
   145  		// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
   146  		// so that if we receive a register event during Register Plugin, we can process it as a Register call.
   147  		actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
   148  
   149  		pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
   150  
   151  		klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
   152  		return nil
   153  	}
   154  	return unregisterPluginFunc
   155  }
   156  
   157  func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error {
   158  	ctx, cancel := context.WithTimeout(context.Background(), notifyTimeoutDuration)
   159  	defer cancel()
   160  
   161  	status := &registerapi.RegistrationStatus{
   162  		PluginRegistered: registered,
   163  		Error:            errStr,
   164  	}
   165  
   166  	if _, err := client.NotifyRegistrationStatus(ctx, status); err != nil {
   167  		return fmt.Errorf("%s: %w", errStr, err)
   168  	}
   169  
   170  	if errStr != "" {
   171  		return errors.New(errStr)
   172  	}
   173  
   174  	return nil
   175  }
   176  
   177  // Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
   178  func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
   179  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   180  	defer cancel()
   181  
   182  	c, err := grpc.DialContext(ctx, unixSocketPath,
   183  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   184  		grpc.WithBlock(),
   185  		grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
   186  			return (&net.Dialer{}).DialContext(ctx, "unix", addr)
   187  		}),
   188  	)
   189  
   190  	if err != nil {
   191  		return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
   192  	}
   193  
   194  	return registerapi.NewRegistrationClient(c), c, nil
   195  }
   196  

View as plain text