1
16
17 package service
18
19 import (
20 "fmt"
21 "os"
22 "strings"
23 "sync"
24 "sync/atomic"
25
26 "github.com/container-storage-interface/spec/lib/go/csi"
27 "golang.org/x/net/context"
28 "google.golang.org/grpc/codes"
29 "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/cache"
30
31 "google.golang.org/protobuf/types/known/timestamppb"
32 )
33
34 const (
35
36 Name = "io.kubernetes.storage.mock"
37
38
39 VendorVersion = "0.3.0"
40
41
42 TopologyKey = Name + "/node"
43
44
45 TopologyValue = "some-mock-node"
46 )
47
48
49 var Manifest = map[string]string{
50 "url": "https://github.com/kubernetes/kubernetes/tree/master/test/e2e/storage/drivers/csi-test/mock",
51 }
52
53 type Config struct {
54 DisableAttach bool
55 DriverName string
56 AttachLimit int64
57 NodeExpansionRequired bool
58 VolumeMountGroupRequired bool
59 DisableControllerExpansion bool
60 DisableOnlineExpansion bool
61 PermissiveTargetPath bool
62 EnableTopology bool
63 IO DirIO
64 }
65
66
67 type DirIO interface {
68
69 DirExists(path string) (bool, error)
70
71 Mkdir(path string) error
72
73 RemoveAll(path string) error
74
75
76 Rename(oldPath, newPath string) error
77 }
78
79 type OSDirIO struct{}
80
81 func (o OSDirIO) DirExists(path string) (bool, error) {
82 info, err := os.Stat(path)
83 switch {
84 case err == nil && !info.IsDir():
85 return false, fmt.Errorf("%s: not a directory", path)
86 case err == nil:
87 return true, nil
88 case os.IsNotExist(err):
89 return false, nil
90 default:
91 return false, err
92 }
93 }
94
95 func (o OSDirIO) Mkdir(path string) error {
96 return os.Mkdir(path, os.FileMode(0755))
97 }
98
99 func (o OSDirIO) RemoveAll(path string) error {
100 return os.RemoveAll(path)
101 }
102
103 func (o OSDirIO) Rename(oldPath, newPath string) error {
104 return os.Rename(oldPath, newPath)
105 }
106
107
108 type Service interface {
109 csi.ControllerServer
110 csi.IdentityServer
111 csi.NodeServer
112 }
113
114 type service struct {
115 sync.Mutex
116 nodeID string
117 vols []csi.Volume
118 volsRWL sync.RWMutex
119 volsNID uint64
120 snapshots cache.SnapshotCache
121 snapshotsNID uint64
122 config Config
123 }
124
125 type Volume struct {
126 VolumeCSI csi.Volume
127 NodeID string
128 ISStaged bool
129 ISPublished bool
130 ISEphemeral bool
131 ISControllerPublished bool
132 StageTargetPath string
133 TargetPath string
134 }
135
136 var MockVolumes map[string]Volume
137
138
139 func New(config Config) Service {
140 s := &service{
141 nodeID: config.DriverName,
142 config: config,
143 }
144 if s.config.IO == nil {
145 s.config.IO = OSDirIO{}
146 }
147 s.snapshots = cache.NewSnapshotCache()
148 s.vols = []csi.Volume{
149 s.newVolume("Mock Volume 1", gib100),
150 s.newVolume("Mock Volume 2", gib100),
151 s.newVolume("Mock Volume 3", gib100),
152 }
153 MockVolumes = map[string]Volume{}
154
155 s.snapshots.Add(s.newSnapshot("Mock Snapshot 1", "1", map[string]string{"Description": "snapshot 1"}))
156 s.snapshots.Add(s.newSnapshot("Mock Snapshot 2", "2", map[string]string{"Description": "snapshot 2"}))
157 s.snapshots.Add(s.newSnapshot("Mock Snapshot 3", "3", map[string]string{"Description": "snapshot 3"}))
158
159 return s
160 }
161
162 const (
163 kib int64 = 1024
164 mib int64 = kib * 1024
165 gib int64 = mib * 1024
166 gib100 int64 = gib * 100
167 tib int64 = gib * 1024
168 )
169
170 func (s *service) newVolume(name string, capcity int64) csi.Volume {
171 vol := csi.Volume{
172 VolumeId: fmt.Sprintf("%d", atomic.AddUint64(&s.volsNID, 1)),
173 VolumeContext: map[string]string{"name": name},
174 CapacityBytes: capcity,
175 }
176 s.setTopology(&vol)
177 return vol
178 }
179
180 func (s *service) newVolumeFromSnapshot(name string, capacity int64, snapshotID int) csi.Volume {
181 vol := s.newVolume(name, capacity)
182 vol.ContentSource = &csi.VolumeContentSource{
183 Type: &csi.VolumeContentSource_Snapshot{
184 Snapshot: &csi.VolumeContentSource_SnapshotSource{
185 SnapshotId: fmt.Sprintf("%d", snapshotID),
186 },
187 },
188 }
189 s.setTopology(&vol)
190 return vol
191 }
192
193 func (s *service) newVolumeFromVolume(name string, capacity int64, volumeID int) csi.Volume {
194 vol := s.newVolume(name, capacity)
195 vol.ContentSource = &csi.VolumeContentSource{
196 Type: &csi.VolumeContentSource_Volume{
197 Volume: &csi.VolumeContentSource_VolumeSource{
198 VolumeId: fmt.Sprintf("%d", volumeID),
199 },
200 },
201 }
202 s.setTopology(&vol)
203 return vol
204 }
205
206 func (s *service) setTopology(vol *csi.Volume) {
207 if s.config.EnableTopology {
208 vol.AccessibleTopology = []*csi.Topology{
209 {
210 Segments: map[string]string{
211 TopologyKey: TopologyValue,
212 },
213 },
214 }
215 }
216 }
217
218 func (s *service) findVol(k, v string) (volIdx int, volInfo csi.Volume) {
219 s.volsRWL.RLock()
220 defer s.volsRWL.RUnlock()
221 return s.findVolNoLock(k, v)
222 }
223
224 func (s *service) findVolNoLock(k, v string) (volIdx int, volInfo csi.Volume) {
225 volIdx = -1
226
227 for i, vi := range s.vols {
228 switch k {
229 case "id":
230 if strings.EqualFold(v, vi.GetVolumeId()) {
231 return i, vi
232 }
233 case "name":
234 if n, ok := vi.VolumeContext["name"]; ok && strings.EqualFold(v, n) {
235 return i, vi
236 }
237 }
238 }
239
240 return
241 }
242
243 func (s *service) findVolByName(
244 ctx context.Context, name string) (int, csi.Volume) {
245
246 return s.findVol("name", name)
247 }
248
249 func (s *service) findVolByID(
250 ctx context.Context, id string) (int, csi.Volume) {
251
252 return s.findVol("id", id)
253 }
254
255 func (s *service) newSnapshot(name, sourceVolumeId string, parameters map[string]string) cache.Snapshot {
256
257 ptime := timestamppb.Now()
258 return cache.Snapshot{
259 Name: name,
260 Parameters: parameters,
261 SnapshotCSI: csi.Snapshot{
262 SnapshotId: fmt.Sprintf("%d", atomic.AddUint64(&s.snapshotsNID, 1)),
263 CreationTime: ptime,
264 SourceVolumeId: sourceVolumeId,
265 ReadyToUse: true,
266 },
267 }
268 }
269
270
271 func (s *service) getAttachCount(devPathKey string) int64 {
272 var count int64
273 for _, v := range s.vols {
274 if device := v.VolumeContext[devPathKey]; device != "" {
275 count++
276 }
277 }
278 return count
279 }
280
281 func (s *service) execHook(hookName string) (codes.Code, string) {
282 return codes.OK, ""
283 }
284
View as plain text