1
16
17 package pluginwatcher
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net"
24 "reflect"
25 "sync"
26 "time"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/credentials/insecure"
30 "k8s.io/klog/v2"
31
32 registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
33 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
34 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
35 )
36
37 type exampleHandler struct {
38 SupportedVersions []string
39 ExpectedNames map[string]int
40
41 eventChans map[string]chan examplePluginEvent
42
43 m sync.Mutex
44
45 permitDeprecatedDir bool
46 }
47
48 type examplePluginEvent int
49
50 const (
51 exampleEventValidate examplePluginEvent = 0
52 exampleEventRegister examplePluginEvent = 1
53 exampleEventDeRegister examplePluginEvent = 2
54 )
55
56
57 func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *exampleHandler {
58 return &exampleHandler{
59 SupportedVersions: supportedVersions,
60 ExpectedNames: make(map[string]int),
61
62 eventChans: make(map[string]chan examplePluginEvent),
63 permitDeprecatedDir: permitDeprecatedDir,
64 }
65 }
66
67 func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
68 p.SendEvent(pluginName, exampleEventValidate)
69
70 n, ok := p.DecreasePluginCount(pluginName)
71 if !ok && n > 0 {
72 return fmt.Errorf("pluginName('%s') wasn't expected (count is %d)", pluginName, n)
73 }
74
75 if !reflect.DeepEqual(versions, p.SupportedVersions) {
76 return fmt.Errorf("versions('%v') != supported versions('%v')", versions, p.SupportedVersions)
77 }
78
79
80 if len(endpoint) == 0 {
81 return errors.New("expecting non empty endpoint")
82 }
83
84 return nil
85 }
86
87 func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
88 p.SendEvent(pluginName, exampleEventRegister)
89
90
91 _, conn, err := dial(endpoint, time.Second)
92 if err != nil {
93 return fmt.Errorf("failed dialing endpoint (%s): %v", endpoint, err)
94 }
95 defer conn.Close()
96
97
98 v1beta1Client := v1beta1.NewExampleClient(conn)
99 v1beta2Client := v1beta2.NewExampleClient(conn)
100
101
102 _, err = v1beta1Client.GetExampleInfo(context.Background(), &v1beta1.ExampleRequest{})
103 if err != nil {
104 return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
105 }
106
107
108 _, err = v1beta2Client.GetExampleInfo(context.Background(), &v1beta2.ExampleRequest{})
109 if err != nil {
110 return fmt.Errorf("failed GetExampleInfo for v1beta2Client(%s): %v", endpoint, err)
111 }
112
113 return nil
114 }
115
116 func (p *exampleHandler) DeRegisterPlugin(pluginName string) {
117 p.SendEvent(pluginName, exampleEventDeRegister)
118 }
119
120 func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) {
121 klog.V(2).InfoS("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName])
122 p.eventChans[pluginName] <- event
123 }
124
125 func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok bool) {
126 p.m.Lock()
127 defer p.m.Unlock()
128
129 v, ok := p.ExpectedNames[pluginName]
130 if !ok {
131 v = -1
132 }
133
134 return v, ok
135 }
136
137
138 func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
139 ctx, cancel := context.WithTimeout(context.Background(), timeout)
140 defer cancel()
141
142 c, err := grpc.DialContext(ctx, unixSocketPath,
143 grpc.WithTransportCredentials(insecure.NewCredentials()),
144 grpc.WithBlock(),
145 grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
146 return (&net.Dialer{}).DialContext(ctx, "unix", addr)
147 }),
148 )
149
150 if err != nil {
151 return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
152 }
153
154 return registerapi.NewRegistrationClient(c), c, nil
155 }
156
View as plain text