1
16
17 package plugin
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net"
24 "strings"
25 "sync"
26 "time"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/connectivity"
30 "google.golang.org/grpc/credentials/insecure"
31 v1 "k8s.io/api/core/v1"
32 utilversion "k8s.io/apimachinery/pkg/util/version"
33 "k8s.io/client-go/kubernetes"
34 "k8s.io/klog/v2"
35 )
36
37 const (
38
39 DRAPluginName = "kubernetes.io/dra"
40 v1alpha3Version = "v1alpha3"
41 v1alpha2Version = "v1alpha2"
42 )
43
44
45
46 type plugin struct {
47 sync.Mutex
48 conn *grpc.ClientConn
49 endpoint string
50 version string
51 highestSupportedVersion *utilversion.Version
52 clientTimeout time.Duration
53 }
54
55 func (p *plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
56 p.Lock()
57 defer p.Unlock()
58
59 if p.conn != nil {
60 return p.conn, nil
61 }
62
63 network := "unix"
64 klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", p.endpoint)
65 conn, err := grpc.Dial(
66 p.endpoint,
67 grpc.WithTransportCredentials(insecure.NewCredentials()),
68 grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
69 return (&net.Dialer{}).DialContext(ctx, network, target)
70 }),
71 )
72 if err != nil {
73 return nil, err
74 }
75
76 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
77 defer cancel()
78
79 if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok {
80 return nil, errors.New("timed out waiting for gRPC connection to be ready")
81 }
82
83 p.conn = conn
84 return p.conn, nil
85 }
86
87 func (p *plugin) getVersion() string {
88 p.Lock()
89 defer p.Unlock()
90 return p.version
91 }
92
93 func (p *plugin) setVersion(version string) {
94 p.Lock()
95 p.version = version
96 p.Unlock()
97 }
98
99
100 type RegistrationHandler struct {
101 controller *nodeResourcesController
102 }
103
104
105
106
107
108
109 func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
110 handler := &RegistrationHandler{}
111
112
113
114 handler.controller = startNodeResourcesController(context.TODO(), kubeClient, getNode)
115
116 return handler
117 }
118
119
120 func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
121 klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint)
122
123 highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions)
124 if err != nil {
125 return err
126 }
127
128 var timeout time.Duration
129 if pluginClientTimeout == nil {
130 timeout = PluginClientTimeout
131 } else {
132 timeout = *pluginClientTimeout
133 }
134
135 pluginInstance := &plugin{
136 conn: nil,
137 endpoint: endpoint,
138 version: v1alpha3Version,
139 highestSupportedVersion: highestSupportedVersion,
140 clientTimeout: timeout,
141 }
142
143
144
145
146 draPlugins.add(pluginName, pluginInstance)
147 h.controller.addPlugin(pluginName, pluginInstance)
148
149 return nil
150 }
151
152 func (h *RegistrationHandler) validateVersions(
153 callerName string,
154 pluginName string,
155 versions []string,
156 ) (*utilversion.Version, error) {
157 if len(versions) == 0 {
158 return nil, errors.New(
159 log(
160 "%s for DRA plugin %q failed. Plugin returned an empty list for supported versions",
161 callerName,
162 pluginName,
163 ),
164 )
165 }
166
167
168 newPluginHighestVersion, err := utilversion.HighestSupportedVersion(versions)
169 if err != nil {
170 return nil, errors.New(
171 log(
172 "%s for DRA plugin %q failed. None of the versions specified %q are supported. err=%v",
173 callerName,
174 pluginName,
175 versions,
176 err,
177 ),
178 )
179 }
180
181 existingPlugin := draPlugins.get(pluginName)
182 if existingPlugin == nil {
183 return newPluginHighestVersion, nil
184 }
185 if existingPlugin.highestSupportedVersion.LessThan(newPluginHighestVersion) {
186 return newPluginHighestVersion, nil
187 }
188 return nil, errors.New(
189 log(
190 "%s for DRA plugin %q failed. Another plugin with the same name is already registered with a higher supported version: %q",
191 callerName,
192 pluginName,
193 existingPlugin.highestSupportedVersion,
194 ),
195 )
196 }
197
198 func deregisterPlugin(pluginName string) {
199 draPlugins.delete(pluginName)
200 }
201
202
203
204 func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
205 klog.InfoS("DeRegister DRA plugin", "name", pluginName)
206 deregisterPlugin(pluginName)
207 h.controller.removePlugin(pluginName)
208 }
209
210
211
212 func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
213 klog.InfoS("Validate DRA plugin", "name", pluginName, "endpoint", endpoint, "versions", strings.Join(versions, ","))
214
215 _, err := h.validateVersions("ValidatePlugin", pluginName, versions)
216 if err != nil {
217 return fmt.Errorf("validation failed for DRA plugin %s at endpoint %s: %+v", pluginName, endpoint, err)
218 }
219
220 return err
221 }
222
223
224 func log(msg string, parts ...interface{}) string {
225 return fmt.Sprintf(fmt.Sprintf("%s: %s", DRAPluginName, msg), parts...)
226 }
227
View as plain text