...
1
16
17 package devicemanager
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "time"
24
25 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
26 plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
27 )
28
29
30
31
32 type endpoint interface {
33 getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
34 allocate(devs []string) (*pluginapi.AllocateResponse, error)
35 preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
36 setStopTime(t time.Time)
37 isStopped() bool
38 stopGracePeriodExpired() bool
39 }
40
41 type endpointImpl struct {
42 mutex sync.Mutex
43 resourceName string
44 api pluginapi.DevicePluginClient
45 stopTime time.Time
46 client plugin.Client
47 }
48
49
50
51 func newEndpointImpl(p plugin.DevicePlugin) *endpointImpl {
52 return &endpointImpl{
53 api: p.API(),
54 resourceName: p.Resource(),
55 }
56 }
57
58
59
60 func newStoppedEndpointImpl(resourceName string) *endpointImpl {
61 return &endpointImpl{
62 resourceName: resourceName,
63 stopTime: time.Now(),
64 }
65 }
66
67 func (e *endpointImpl) isStopped() bool {
68 e.mutex.Lock()
69 defer e.mutex.Unlock()
70 return !e.stopTime.IsZero()
71 }
72
73 func (e *endpointImpl) stopGracePeriodExpired() bool {
74 e.mutex.Lock()
75 defer e.mutex.Unlock()
76 return !e.stopTime.IsZero() && time.Since(e.stopTime) > endpointStopGracePeriod
77 }
78
79 func (e *endpointImpl) setStopTime(t time.Time) {
80 e.mutex.Lock()
81 defer e.mutex.Unlock()
82 e.stopTime = t
83 }
84
85
86 func (e *endpointImpl) getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
87 if e.isStopped() {
88 return nil, fmt.Errorf(errEndpointStopped, e)
89 }
90 return e.api.GetPreferredAllocation(context.Background(), &pluginapi.PreferredAllocationRequest{
91 ContainerRequests: []*pluginapi.ContainerPreferredAllocationRequest{
92 {
93 AvailableDeviceIDs: available,
94 MustIncludeDeviceIDs: mustInclude,
95 AllocationSize: int32(size),
96 },
97 },
98 })
99 }
100
101
102 func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
103 if e.isStopped() {
104 return nil, fmt.Errorf(errEndpointStopped, e)
105 }
106 return e.api.Allocate(context.Background(), &pluginapi.AllocateRequest{
107 ContainerRequests: []*pluginapi.ContainerAllocateRequest{
108 {DevicesIDs: devs},
109 },
110 })
111 }
112
113
114 func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
115 if e.isStopped() {
116 return nil, fmt.Errorf(errEndpointStopped, e)
117 }
118 ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second)
119 defer cancel()
120 return e.api.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{
121 DevicesIDs: devs,
122 })
123 }
124
View as plain text