...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pluginwatcher
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"net"
    24  	"os"
    25  	"sync"
    26  	"time"
    27  
    28  	"google.golang.org/grpc"
    29  	"k8s.io/klog/v2"
    30  
    31  	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
    32  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    33  	v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
    34  	v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
    35  )
    36  
    37  // examplePlugin is a sample plugin to work with plugin watcher
    38  type examplePlugin struct {
    39  	grpcServer         *grpc.Server
    40  	wg                 sync.WaitGroup
    41  	registrationStatus chan registerapi.RegistrationStatus // for testing
    42  	endpoint           string                              // for testing
    43  	pluginName         string
    44  	pluginType         string
    45  	versions           []string
    46  }
    47  
    48  type pluginServiceV1Beta1 struct {
    49  	server *examplePlugin
    50  }
    51  
    52  func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) {
    53  	klog.InfoS("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field)
    54  	return &v1beta1.ExampleResponse{}, nil
    55  }
    56  
    57  func (s *pluginServiceV1Beta1) RegisterService() {
    58  	v1beta1.RegisterExampleServer(s.server.grpcServer, s)
    59  }
    60  
    61  type pluginServiceV1Beta2 struct {
    62  	server *examplePlugin
    63  }
    64  
    65  func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) {
    66  	klog.InfoS("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field)
    67  	return &v1beta2.ExampleResponse{}, nil
    68  }
    69  
    70  func (s *pluginServiceV1Beta2) RegisterService() {
    71  	v1beta2.RegisterExampleServer(s.server.grpcServer, s)
    72  }
    73  
    74  // NewExamplePlugin returns an initialized examplePlugin instance
    75  func NewExamplePlugin() *examplePlugin {
    76  	return &examplePlugin{}
    77  }
    78  
    79  // NewTestExamplePlugin returns an initialized examplePlugin instance for testing
    80  func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string, advertisedVersions ...string) *examplePlugin {
    81  	return &examplePlugin{
    82  		pluginName:         pluginName,
    83  		pluginType:         pluginType,
    84  		endpoint:           endpoint,
    85  		versions:           advertisedVersions,
    86  		registrationStatus: make(chan registerapi.RegistrationStatus),
    87  	}
    88  }
    89  
    90  // GetPluginInfo returns a PluginInfo object
    91  func GetPluginInfo(plugin *examplePlugin) cache.PluginInfo {
    92  	return cache.PluginInfo{
    93  		SocketPath: plugin.endpoint,
    94  	}
    95  }
    96  
    97  // GetInfo is the RPC invoked by plugin watcher
    98  func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
    99  	return &registerapi.PluginInfo{
   100  		Type:              e.pluginType,
   101  		Name:              e.pluginName,
   102  		Endpoint:          e.endpoint,
   103  		SupportedVersions: e.versions,
   104  	}, nil
   105  }
   106  
   107  func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
   108  	klog.InfoS("Notify registration status", "status", status)
   109  
   110  	if e.registrationStatus != nil {
   111  		e.registrationStatus <- *status
   112  	}
   113  
   114  	return &registerapi.RegistrationStatusResponse{}, nil
   115  }
   116  
   117  // Serve starts a pluginwatcher server and one or more of the plugin services
   118  func (e *examplePlugin) Serve(services ...string) error {
   119  	klog.InfoS("Starting example server", "endpoint", e.endpoint)
   120  	lis, err := net.Listen("unix", e.endpoint)
   121  	if err != nil {
   122  		return err
   123  	}
   124  
   125  	klog.InfoS("Example server started", "endpoint", e.endpoint)
   126  	e.grpcServer = grpc.NewServer()
   127  
   128  	// Registers kubelet plugin watcher api.
   129  	registerapi.RegisterRegistrationServer(e.grpcServer, e)
   130  
   131  	for _, service := range services {
   132  		switch service {
   133  		case "v1beta1":
   134  			v1beta1 := &pluginServiceV1Beta1{server: e}
   135  			v1beta1.RegisterService()
   136  		case "v1beta2":
   137  			v1beta2 := &pluginServiceV1Beta2{server: e}
   138  			v1beta2.RegisterService()
   139  		default:
   140  			return fmt.Errorf("unsupported service: '%s'", service)
   141  		}
   142  	}
   143  
   144  	// Starts service
   145  	e.wg.Add(1)
   146  	go func() {
   147  		defer e.wg.Done()
   148  		// Blocking call to accept incoming connections.
   149  		if err := e.grpcServer.Serve(lis); err != nil {
   150  			klog.ErrorS(err, "Example server stopped serving")
   151  		}
   152  	}()
   153  
   154  	return nil
   155  }
   156  
   157  func (e *examplePlugin) Stop() error {
   158  	klog.InfoS("Stopping example server", "endpoint", e.endpoint)
   159  
   160  	e.grpcServer.Stop()
   161  	c := make(chan struct{})
   162  	go func() {
   163  		defer close(c)
   164  		e.wg.Wait()
   165  	}()
   166  
   167  	select {
   168  	case <-c:
   169  		break
   170  	case <-time.After(time.Second):
   171  		return errors.New("timed out on waiting for stop completion")
   172  	}
   173  
   174  	if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) {
   175  		return err
   176  	}
   177  
   178  	return nil
   179  }
   180  

View as plain text