1
16
17 package pluginwatcher
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net"
24 "os"
25 "sync"
26 "time"
27
28 "google.golang.org/grpc"
29 "k8s.io/klog/v2"
30
31 registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
32 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
33 v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
34 v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
35 )
36
37
38 type examplePlugin struct {
39 grpcServer *grpc.Server
40 wg sync.WaitGroup
41 registrationStatus chan registerapi.RegistrationStatus
42 endpoint string
43 pluginName string
44 pluginType string
45 versions []string
46 }
47
48 type pluginServiceV1Beta1 struct {
49 server *examplePlugin
50 }
51
52 func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) {
53 klog.InfoS("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field)
54 return &v1beta1.ExampleResponse{}, nil
55 }
56
57 func (s *pluginServiceV1Beta1) RegisterService() {
58 v1beta1.RegisterExampleServer(s.server.grpcServer, s)
59 }
60
61 type pluginServiceV1Beta2 struct {
62 server *examplePlugin
63 }
64
65 func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) {
66 klog.InfoS("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field)
67 return &v1beta2.ExampleResponse{}, nil
68 }
69
70 func (s *pluginServiceV1Beta2) RegisterService() {
71 v1beta2.RegisterExampleServer(s.server.grpcServer, s)
72 }
73
74
75 func NewExamplePlugin() *examplePlugin {
76 return &examplePlugin{}
77 }
78
79
80 func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string, advertisedVersions ...string) *examplePlugin {
81 return &examplePlugin{
82 pluginName: pluginName,
83 pluginType: pluginType,
84 endpoint: endpoint,
85 versions: advertisedVersions,
86 registrationStatus: make(chan registerapi.RegistrationStatus),
87 }
88 }
89
90
91 func GetPluginInfo(plugin *examplePlugin) cache.PluginInfo {
92 return cache.PluginInfo{
93 SocketPath: plugin.endpoint,
94 }
95 }
96
97
98 func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
99 return ®isterapi.PluginInfo{
100 Type: e.pluginType,
101 Name: e.pluginName,
102 Endpoint: e.endpoint,
103 SupportedVersions: e.versions,
104 }, nil
105 }
106
107 func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
108 klog.InfoS("Notify registration status", "status", status)
109
110 if e.registrationStatus != nil {
111 e.registrationStatus <- *status
112 }
113
114 return ®isterapi.RegistrationStatusResponse{}, nil
115 }
116
117
118 func (e *examplePlugin) Serve(services ...string) error {
119 klog.InfoS("Starting example server", "endpoint", e.endpoint)
120 lis, err := net.Listen("unix", e.endpoint)
121 if err != nil {
122 return err
123 }
124
125 klog.InfoS("Example server started", "endpoint", e.endpoint)
126 e.grpcServer = grpc.NewServer()
127
128
129 registerapi.RegisterRegistrationServer(e.grpcServer, e)
130
131 for _, service := range services {
132 switch service {
133 case "v1beta1":
134 v1beta1 := &pluginServiceV1Beta1{server: e}
135 v1beta1.RegisterService()
136 case "v1beta2":
137 v1beta2 := &pluginServiceV1Beta2{server: e}
138 v1beta2.RegisterService()
139 default:
140 return fmt.Errorf("unsupported service: '%s'", service)
141 }
142 }
143
144
145 e.wg.Add(1)
146 go func() {
147 defer e.wg.Done()
148
149 if err := e.grpcServer.Serve(lis); err != nil {
150 klog.ErrorS(err, "Example server stopped serving")
151 }
152 }()
153
154 return nil
155 }
156
157 func (e *examplePlugin) Stop() error {
158 klog.InfoS("Stopping example server", "endpoint", e.endpoint)
159
160 e.grpcServer.Stop()
161 c := make(chan struct{})
162 go func() {
163 defer close(c)
164 e.wg.Wait()
165 }()
166
167 select {
168 case <-c:
169 break
170 case <-time.After(time.Second):
171 return errors.New("timed out on waiting for stop completion")
172 }
173
174 if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) {
175 return err
176 }
177
178 return nil
179 }
180
View as plain text