...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pluginwatcher
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"net"
    24  	"reflect"
    25  	"sync"
    26  	"time"
    27  
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/credentials/insecure"
    30  	"k8s.io/klog/v2"
    31  
    32  	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
    33  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
    34  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
    35  )
    36  
    37  type exampleHandler struct {
    38  	SupportedVersions []string
    39  	ExpectedNames     map[string]int
    40  
    41  	eventChans map[string]chan examplePluginEvent // map[pluginName]eventChan
    42  
    43  	m sync.Mutex
    44  
    45  	permitDeprecatedDir bool
    46  }
    47  
    48  type examplePluginEvent int
    49  
    50  const (
    51  	exampleEventValidate   examplePluginEvent = 0
    52  	exampleEventRegister   examplePluginEvent = 1
    53  	exampleEventDeRegister examplePluginEvent = 2
    54  )
    55  
    56  // NewExampleHandler provide a example handler
    57  func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *exampleHandler {
    58  	return &exampleHandler{
    59  		SupportedVersions: supportedVersions,
    60  		ExpectedNames:     make(map[string]int),
    61  
    62  		eventChans:          make(map[string]chan examplePluginEvent),
    63  		permitDeprecatedDir: permitDeprecatedDir,
    64  	}
    65  }
    66  
    67  func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
    68  	p.SendEvent(pluginName, exampleEventValidate)
    69  
    70  	n, ok := p.DecreasePluginCount(pluginName)
    71  	if !ok && n > 0 {
    72  		return fmt.Errorf("pluginName('%s') wasn't expected (count is %d)", pluginName, n)
    73  	}
    74  
    75  	if !reflect.DeepEqual(versions, p.SupportedVersions) {
    76  		return fmt.Errorf("versions('%v') != supported versions('%v')", versions, p.SupportedVersions)
    77  	}
    78  
    79  	// this handler expects non-empty endpoint as an example
    80  	if len(endpoint) == 0 {
    81  		return errors.New("expecting non empty endpoint")
    82  	}
    83  
    84  	return nil
    85  }
    86  
    87  func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
    88  	p.SendEvent(pluginName, exampleEventRegister)
    89  
    90  	// Verifies the grpcServer is ready to serve services.
    91  	_, conn, err := dial(endpoint, time.Second)
    92  	if err != nil {
    93  		return fmt.Errorf("failed dialing endpoint (%s): %v", endpoint, err)
    94  	}
    95  	defer conn.Close()
    96  
    97  	// The plugin handler should be able to use any listed service API version.
    98  	v1beta1Client := v1beta1.NewExampleClient(conn)
    99  	v1beta2Client := v1beta2.NewExampleClient(conn)
   100  
   101  	// Tests v1beta1 GetExampleInfo
   102  	_, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{})
   103  	if err != nil {
   104  		return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
   105  	}
   106  
   107  	// Tests v1beta1 GetExampleInfo
   108  	_, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{})
   109  	if err != nil {
   110  		return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
   111  	}
   112  
   113  	return nil
   114  }
   115  
   116  func (p *exampleHandler) DeRegisterPlugin(pluginName string) {
   117  	p.SendEvent(pluginName, exampleEventDeRegister)
   118  }
   119  
   120  func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) {
   121  	klog.V(2).InfoS("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName])
   122  	p.eventChans[pluginName] <- event
   123  }
   124  
   125  func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok bool) {
   126  	p.m.Lock()
   127  	defer p.m.Unlock()
   128  
   129  	v, ok := p.ExpectedNames[pluginName]
   130  	if !ok {
   131  		v = -1
   132  	}
   133  
   134  	return v, ok
   135  }
   136  
   137  // Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
   138  func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
   139  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   140  	defer cancel()
   141  
   142  	c, err := grpc.DialContext(ctx, unixSocketPath,
   143  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   144  		grpc.WithBlock(),
   145  		grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
   146  			return (&net.Dialer{}).DialContext(ctx, "unix", addr)
   147  		}),
   148  	)
   149  
   150  	if err != nil {
   151  		return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
   152  	}
   153  
   154  	return registerapi.NewRegistrationClient(c), c, nil
   155  }
   156  

View as plain text