1
16
17
18
19
20
21 package operationexecutor
22
23 import (
24 "context"
25 "errors"
26 "fmt"
27 "net"
28 "time"
29
30 "k8s.io/klog/v2"
31
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/credentials/insecure"
34 "k8s.io/client-go/tools/record"
35 registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
36 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
37 )
38
39 const (
40 dialTimeoutDuration = 10 * time.Second
41 notifyTimeoutDuration = 5 * time.Second
42 )
43
44 var _ OperationGenerator = &operationGenerator{}
45
46 type operationGenerator struct {
47
48
49 recorder record.EventRecorder
50 }
51
52
53 func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {
54
55 return &operationGenerator{
56 recorder: recorder,
57 }
58 }
59
60
61 type OperationGenerator interface {
62
63 GenerateRegisterPluginFunc(
64 socketPath string,
65 timestamp time.Time,
66 pluginHandlers map[string]cache.PluginHandler,
67 actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
68
69
70 GenerateUnregisterPluginFunc(
71 pluginInfo cache.PluginInfo,
72 actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
73 }
74
75 func (og *operationGenerator) GenerateRegisterPluginFunc(
76 socketPath string,
77 timestamp time.Time,
78 pluginHandlers map[string]cache.PluginHandler,
79 actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
80
81 registerPluginFunc := func() error {
82 client, conn, err := dial(socketPath, dialTimeoutDuration)
83 if err != nil {
84 return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
85 }
86 defer conn.Close()
87
88 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
89 defer cancel()
90
91 infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{})
92 if err != nil {
93 return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
94 }
95
96 handler, ok := pluginHandlers[infoResp.Type]
97 if !ok {
98 if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
99 return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
100 }
101 return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
102 }
103
104 if infoResp.Endpoint == "" {
105 infoResp.Endpoint = socketPath
106 }
107 if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
108 if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
109 return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
110 }
111 return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
112 }
113
114
115 err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
116 SocketPath: socketPath,
117 Timestamp: timestamp,
118 Handler: handler,
119 Name: infoResp.Name,
120 })
121 if err != nil {
122 klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
123 }
124 if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil {
125 return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
126 }
127
128
129 if err := og.notifyPlugin(client, true, ""); err != nil {
130 return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
131 }
132 return nil
133 }
134 return registerPluginFunc
135 }
136
137 func (og *operationGenerator) GenerateUnregisterPluginFunc(
138 pluginInfo cache.PluginInfo,
139 actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
140
141 unregisterPluginFunc := func() error {
142 if pluginInfo.Handler == nil {
143 return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
144 }
145
146
147 actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
148
149 pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
150
151 klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
152 return nil
153 }
154 return unregisterPluginFunc
155 }
156
157 func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error {
158 ctx, cancel := context.WithTimeout(context.Background(), notifyTimeoutDuration)
159 defer cancel()
160
161 status := ®isterapi.RegistrationStatus{
162 PluginRegistered: registered,
163 Error: errStr,
164 }
165
166 if _, err := client.NotifyRegistrationStatus(ctx, status); err != nil {
167 return fmt.Errorf("%s: %w", errStr, err)
168 }
169
170 if errStr != "" {
171 return errors.New(errStr)
172 }
173
174 return nil
175 }
176
177
178 func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
179 ctx, cancel := context.WithTimeout(context.Background(), timeout)
180 defer cancel()
181
182 c, err := grpc.DialContext(ctx, unixSocketPath,
183 grpc.WithTransportCredentials(insecure.NewCredentials()),
184 grpc.WithBlock(),
185 grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
186 return (&net.Dialer{}).DialContext(ctx, "unix", addr)
187 }),
188 )
189
190 if err != nil {
191 return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
192 }
193
194 return registerapi.NewRegistrationClient(c), c, nil
195 }
196
View as plain text