1
16
17 package plugin
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "google.golang.org/grpc"
25 grpccodes "google.golang.org/grpc/codes"
26 grpcstatus "google.golang.org/grpc/status"
27
28 "k8s.io/klog/v2"
29 drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
30 drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
31 )
32
33 const PluginClientTimeout = 45 * time.Second
34
35 type (
36 nodeResourceManager interface {
37 Prepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error)
38 Unprepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error)
39 }
40
41 v1alpha2NodeResourceManager struct{}
42 v1alpha3NodeResourceManager struct{}
43 )
44
45 var nodeResourceManagers = map[string]nodeResourceManager{
46 v1alpha2Version: v1alpha2NodeResourceManager{},
47 v1alpha3Version: v1alpha3NodeResourceManager{},
48 }
49
50 func (v1alpha2rm v1alpha2NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
51 nodeClient := drapbv1alpha2.NewNodeClient(conn)
52 response := &drapb.NodePrepareResourcesResponse{
53 Claims: make(map[string]*drapb.NodePrepareResourceResponse),
54 }
55
56 for _, claim := range req.Claims {
57 req := &drapbv1alpha2.NodePrepareResourceRequest{
58 Namespace: claim.Namespace,
59 ClaimUid: claim.Uid,
60 ClaimName: claim.Name,
61 ResourceHandle: claim.ResourceHandle,
62 StructuredResourceHandle: claim.StructuredResourceHandle,
63 }
64 res, err := nodeClient.NodePrepareResource(ctx, req)
65 result := &drapb.NodePrepareResourceResponse{}
66 if err != nil {
67 result.Error = err.Error()
68 } else {
69 result.CDIDevices = res.CdiDevices
70 }
71 response.Claims[claim.Uid] = result
72 }
73
74 return response, nil
75 }
76
77 func (v1alpha2rm v1alpha2NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
78 nodeClient := drapbv1alpha2.NewNodeClient(conn)
79 response := &drapb.NodeUnprepareResourcesResponse{
80 Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
81 }
82
83 for _, claim := range req.Claims {
84 _, err := nodeClient.NodeUnprepareResource(ctx,
85 &drapbv1alpha2.NodeUnprepareResourceRequest{
86 Namespace: claim.Namespace,
87 ClaimUid: claim.Uid,
88 ClaimName: claim.Name,
89 ResourceHandle: claim.ResourceHandle,
90 })
91 result := &drapb.NodeUnprepareResourceResponse{}
92 if err != nil {
93 result.Error = err.Error()
94 }
95 response.Claims[claim.Uid] = result
96 }
97
98 return response, nil
99 }
100
101 func (v1alpha3rm v1alpha3NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
102 nodeClient := drapb.NewNodeClient(conn)
103 response, err := nodeClient.NodePrepareResources(ctx, req)
104 if err != nil {
105 status, _ := grpcstatus.FromError(err)
106 if status.Code() == grpccodes.Unimplemented {
107 p.setVersion(v1alpha2Version)
108 return nodeResourceManagers[v1alpha2Version].Prepare(ctx, conn, p, req)
109 }
110 return nil, err
111 }
112
113 return response, nil
114 }
115
116 func (v1alpha3rm v1alpha3NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
117 nodeClient := drapb.NewNodeClient(conn)
118 response, err := nodeClient.NodeUnprepareResources(ctx, req)
119 if err != nil {
120 status, _ := grpcstatus.FromError(err)
121 if status.Code() == grpccodes.Unimplemented {
122 p.setVersion(v1alpha2Version)
123 return nodeResourceManagers[v1alpha2Version].Unprepare(ctx, conn, p, req)
124 }
125 return nil, err
126 }
127
128 return response, nil
129 }
130
131 func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
132 if pluginName == "" {
133 return nil, fmt.Errorf("plugin name is empty")
134 }
135
136 existingPlugin := draPlugins.get(pluginName)
137 if existingPlugin == nil {
138 return nil, fmt.Errorf("plugin name %s not found in the list of registered DRA plugins", pluginName)
139 }
140
141 return existingPlugin, nil
142 }
143
144 func (p *plugin) NodePrepareResources(
145 ctx context.Context,
146 req *drapb.NodePrepareResourcesRequest,
147 opts ...grpc.CallOption,
148 ) (*drapb.NodePrepareResourcesResponse, error) {
149 logger := klog.FromContext(ctx)
150 logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
151
152 conn, err := p.getOrCreateGRPCConn()
153 if err != nil {
154 return nil, err
155 }
156
157 ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
158 defer cancel()
159
160 version := p.getVersion()
161 resourceManager, exists := nodeResourceManagers[version]
162 if !exists {
163 err := fmt.Errorf("unsupported plugin version: %s", version)
164 logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", nil, "err", err)
165 return nil, err
166 }
167
168 response, err := resourceManager.Prepare(ctx, conn, p, req)
169 logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", response, "err", err)
170 return response, err
171 }
172
173 func (p *plugin) NodeUnprepareResources(
174 ctx context.Context,
175 req *drapb.NodeUnprepareResourcesRequest,
176 opts ...grpc.CallOption,
177 ) (*drapb.NodeUnprepareResourcesResponse, error) {
178 logger := klog.FromContext(ctx)
179 logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
180
181 conn, err := p.getOrCreateGRPCConn()
182 if err != nil {
183 return nil, err
184 }
185
186 ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
187 defer cancel()
188
189 version := p.getVersion()
190 resourceManager, exists := nodeResourceManagers[version]
191 if !exists {
192 err := fmt.Errorf("unsupported plugin version: %s", version)
193 logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", nil, "err", err)
194 return nil, err
195 }
196
197 response, err := resourceManager.Unprepare(ctx, conn, p, req)
198 logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
199 return response, err
200 }
201
202 func (p *plugin) NodeListAndWatchResources(
203 ctx context.Context,
204 req *drapb.NodeListAndWatchResourcesRequest,
205 opts ...grpc.CallOption,
206 ) (drapb.Node_NodeListAndWatchResourcesClient, error) {
207 logger := klog.FromContext(ctx)
208 logger.V(4).Info(log("calling NodeListAndWatchResources rpc"), "request", req)
209
210 conn, err := p.getOrCreateGRPCConn()
211 if err != nil {
212 return nil, err
213 }
214
215 nodeClient := drapb.NewNodeClient(conn)
216 return nodeClient.NodeListAndWatchResources(ctx, req, opts...)
217 }
218
View as plain text