...

Source file src/k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service/controller.go

Documentation: k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package service
    18  
    19  import (
    20  	"fmt"
    21  	"path"
    22  	"reflect"
    23  	"strconv"
    24  
    25  	"github.com/container-storage-interface/spec/lib/go/csi"
    26  	"golang.org/x/net/context"
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/status"
    29  
    30  	"k8s.io/klog/v2"
    31  )
    32  
    33  const (
    34  	MaxStorageCapacity = tib
    35  	ReadOnlyKey        = "readonly"
    36  )
    37  
    38  func (s *service) CreateVolume(
    39  	ctx context.Context,
    40  	req *csi.CreateVolumeRequest) (
    41  	*csi.CreateVolumeResponse, error) {
    42  
    43  	if len(req.Name) == 0 {
    44  		return nil, status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
    45  	}
    46  	if req.VolumeCapabilities == nil {
    47  		return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
    48  	}
    49  
    50  	// Check to see if the volume already exists.
    51  	if i, v := s.findVolByName(ctx, req.Name); i >= 0 {
    52  		// Requested volume name already exists, need to check if the existing volume's
    53  		// capacity is more or equal to new request's capacity.
    54  		if v.GetCapacityBytes() < req.GetCapacityRange().GetRequiredBytes() {
    55  			return nil, status.Error(codes.AlreadyExists,
    56  				fmt.Sprintf("Volume with name %s already exists", req.GetName()))
    57  		}
    58  		return &csi.CreateVolumeResponse{Volume: &v}, nil
    59  	}
    60  
    61  	// If no capacity is specified then use 100GiB
    62  	capacity := gib100
    63  	if cr := req.CapacityRange; cr != nil {
    64  		if rb := cr.RequiredBytes; rb > 0 {
    65  			capacity = rb
    66  		}
    67  		if lb := cr.LimitBytes; lb > 0 {
    68  			capacity = lb
    69  		}
    70  	}
    71  	// Check for maximum available capacity
    72  	if capacity >= MaxStorageCapacity {
    73  		return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, MaxStorageCapacity)
    74  	}
    75  
    76  	var v csi.Volume
    77  	// Create volume from content source if provided.
    78  	if req.GetVolumeContentSource() != nil {
    79  		switch req.GetVolumeContentSource().GetType().(type) {
    80  		case *csi.VolumeContentSource_Snapshot:
    81  			sid := req.GetVolumeContentSource().GetSnapshot().GetSnapshotId()
    82  			// Check if the source snapshot exists.
    83  			if snapID, _ := s.snapshots.FindSnapshot("id", sid); snapID >= 0 {
    84  				v = s.newVolumeFromSnapshot(req.Name, capacity, snapID)
    85  			} else {
    86  				return nil, status.Errorf(codes.NotFound, "Requested source snapshot %s not found", sid)
    87  			}
    88  		case *csi.VolumeContentSource_Volume:
    89  			vid := req.GetVolumeContentSource().GetVolume().GetVolumeId()
    90  			// Check if the source volume exists.
    91  			if volID, _ := s.findVolNoLock("id", vid); volID >= 0 {
    92  				v = s.newVolumeFromVolume(req.Name, capacity, volID)
    93  			} else {
    94  				return nil, status.Errorf(codes.NotFound, "Requested source volume %s not found", vid)
    95  			}
    96  		}
    97  	} else {
    98  		v = s.newVolume(req.Name, capacity)
    99  	}
   100  
   101  	// Add the created volume to the service's in-mem volume slice.
   102  	s.volsRWL.Lock()
   103  	defer s.volsRWL.Unlock()
   104  	s.vols = append(s.vols, v)
   105  	MockVolumes[v.GetVolumeId()] = Volume{
   106  		VolumeCSI:       v,
   107  		NodeID:          "",
   108  		ISStaged:        false,
   109  		ISPublished:     false,
   110  		StageTargetPath: "",
   111  		TargetPath:      "",
   112  	}
   113  
   114  	if hookVal, hookMsg := s.execHook("CreateVolumeEnd"); hookVal != codes.OK {
   115  		return nil, status.Errorf(hookVal, hookMsg)
   116  	}
   117  
   118  	return &csi.CreateVolumeResponse{Volume: &v}, nil
   119  }
   120  
   121  func (s *service) DeleteVolume(
   122  	ctx context.Context,
   123  	req *csi.DeleteVolumeRequest) (
   124  	*csi.DeleteVolumeResponse, error) {
   125  
   126  	s.volsRWL.Lock()
   127  	defer s.volsRWL.Unlock()
   128  
   129  	//  If the volume is not specified, return error
   130  	if len(req.VolumeId) == 0 {
   131  		return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
   132  	}
   133  
   134  	if hookVal, hookMsg := s.execHook("DeleteVolumeStart"); hookVal != codes.OK {
   135  		return nil, status.Errorf(hookVal, hookMsg)
   136  	}
   137  
   138  	// If the volume does not exist then return an idempotent response.
   139  	i, _ := s.findVolNoLock("id", req.VolumeId)
   140  	if i < 0 {
   141  		return &csi.DeleteVolumeResponse{}, nil
   142  	}
   143  
   144  	// This delete logic preserves order and prevents potential memory
   145  	// leaks. The slice's elements may not be pointers, but the structs
   146  	// themselves have fields that are.
   147  	copy(s.vols[i:], s.vols[i+1:])
   148  	s.vols[len(s.vols)-1] = csi.Volume{}
   149  	s.vols = s.vols[:len(s.vols)-1]
   150  	klog.V(5).InfoS("mock delete volume", "volumeID", req.VolumeId)
   151  
   152  	if hookVal, hookMsg := s.execHook("DeleteVolumeEnd"); hookVal != codes.OK {
   153  		return nil, status.Errorf(hookVal, hookMsg)
   154  	}
   155  	return &csi.DeleteVolumeResponse{}, nil
   156  }
   157  
   158  func (s *service) ControllerPublishVolume(
   159  	ctx context.Context,
   160  	req *csi.ControllerPublishVolumeRequest) (
   161  	*csi.ControllerPublishVolumeResponse, error) {
   162  
   163  	if s.config.DisableAttach {
   164  		return nil, status.Error(codes.Unimplemented, "ControllerPublish is not supported")
   165  	}
   166  
   167  	if len(req.VolumeId) == 0 {
   168  		return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
   169  	}
   170  	if len(req.NodeId) == 0 {
   171  		return nil, status.Error(codes.InvalidArgument, "Node ID cannot be empty")
   172  	}
   173  	if req.VolumeCapability == nil {
   174  		return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
   175  	}
   176  
   177  	if req.NodeId != s.nodeID {
   178  		return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to Mock Node ID %s", req.NodeId, s.nodeID)
   179  	}
   180  
   181  	if hookVal, hookMsg := s.execHook("ControllerPublishVolumeStart"); hookVal != codes.OK {
   182  		return nil, status.Errorf(hookVal, hookMsg)
   183  	}
   184  
   185  	s.volsRWL.Lock()
   186  	defer s.volsRWL.Unlock()
   187  
   188  	i, v := s.findVolNoLock("id", req.VolumeId)
   189  	if i < 0 {
   190  		return nil, status.Error(codes.NotFound, req.VolumeId)
   191  	}
   192  
   193  	// devPathKey is the key in the volume's attributes that is set to a
   194  	// mock device path if the volume has been published by the controller
   195  	// to the specified node.
   196  	devPathKey := path.Join(req.NodeId, "dev")
   197  
   198  	// Check to see if the volume is already published.
   199  	if device := v.VolumeContext[devPathKey]; device != "" {
   200  		var volRo bool
   201  		var roVal string
   202  		if ro, ok := v.VolumeContext[ReadOnlyKey]; ok {
   203  			roVal = ro
   204  		}
   205  
   206  		if roVal == "true" {
   207  			volRo = true
   208  		} else {
   209  			volRo = false
   210  		}
   211  
   212  		// Check if readonly flag is compatible with the publish request.
   213  		if req.GetReadonly() != volRo {
   214  			return nil, status.Error(codes.AlreadyExists, "Volume published but has incompatible readonly flag")
   215  		}
   216  
   217  		return &csi.ControllerPublishVolumeResponse{
   218  			PublishContext: map[string]string{
   219  				"device":   device,
   220  				"readonly": roVal,
   221  			},
   222  		}, nil
   223  	}
   224  
   225  	// Check attach limit before publishing only if attach limit is set.
   226  	if s.config.AttachLimit > 0 && s.getAttachCount(devPathKey) >= s.config.AttachLimit {
   227  		return nil, status.Errorf(codes.ResourceExhausted, "Cannot attach any more volumes to this node")
   228  	}
   229  
   230  	var roVal string
   231  	if req.GetReadonly() {
   232  		roVal = "true"
   233  	} else {
   234  		roVal = "false"
   235  	}
   236  
   237  	// Publish the volume.
   238  	device := "/dev/mock"
   239  	v.VolumeContext[devPathKey] = device
   240  	v.VolumeContext[ReadOnlyKey] = roVal
   241  	s.vols[i] = v
   242  
   243  	if volInfo, ok := MockVolumes[req.VolumeId]; ok {
   244  		volInfo.ISControllerPublished = true
   245  		MockVolumes[req.VolumeId] = volInfo
   246  	}
   247  
   248  	if hookVal, hookMsg := s.execHook("ControllerPublishVolumeEnd"); hookVal != codes.OK {
   249  		return nil, status.Errorf(hookVal, hookMsg)
   250  	}
   251  
   252  	return &csi.ControllerPublishVolumeResponse{
   253  		PublishContext: map[string]string{
   254  			"device":   device,
   255  			"readonly": roVal,
   256  		},
   257  	}, nil
   258  }
   259  
   260  func (s *service) ControllerUnpublishVolume(
   261  	ctx context.Context,
   262  	req *csi.ControllerUnpublishVolumeRequest) (
   263  	*csi.ControllerUnpublishVolumeResponse, error) {
   264  
   265  	if s.config.DisableAttach {
   266  		return nil, status.Error(codes.Unimplemented, "ControllerPublish is not supported")
   267  	}
   268  
   269  	if len(req.VolumeId) == 0 {
   270  		return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
   271  	}
   272  	nodeID := req.NodeId
   273  	if len(nodeID) == 0 {
   274  		// If node id is empty, no failure as per Spec
   275  		nodeID = s.nodeID
   276  	}
   277  
   278  	if req.NodeId != s.nodeID {
   279  		return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, s.nodeID)
   280  	}
   281  
   282  	if hookVal, hookMsg := s.execHook("ControllerUnpublishVolumeStart"); hookVal != codes.OK {
   283  		return nil, status.Errorf(hookVal, hookMsg)
   284  	}
   285  
   286  	s.volsRWL.Lock()
   287  	defer s.volsRWL.Unlock()
   288  
   289  	i, v := s.findVolNoLock("id", req.VolumeId)
   290  	if i < 0 {
   291  		// Not an error: a non-existent volume is not published.
   292  		// See also https://github.com/kubernetes-csi/external-attacher/pull/165
   293  		return &csi.ControllerUnpublishVolumeResponse{}, nil
   294  	}
   295  
   296  	// devPathKey is the key in the volume's attributes that is set to a
   297  	// mock device path if the volume has been published by the controller
   298  	// to the specified node.
   299  	devPathKey := path.Join(nodeID, "dev")
   300  
   301  	// Check to see if the volume is already unpublished.
   302  	if v.VolumeContext[devPathKey] == "" {
   303  		return &csi.ControllerUnpublishVolumeResponse{}, nil
   304  	}
   305  
   306  	// Unpublish the volume.
   307  	delete(v.VolumeContext, devPathKey)
   308  	delete(v.VolumeContext, ReadOnlyKey)
   309  	s.vols[i] = v
   310  
   311  	if hookVal, hookMsg := s.execHook("ControllerUnpublishVolumeEnd"); hookVal != codes.OK {
   312  		return nil, status.Errorf(hookVal, hookMsg)
   313  	}
   314  
   315  	return &csi.ControllerUnpublishVolumeResponse{}, nil
   316  }
   317  
   318  func (s *service) ValidateVolumeCapabilities(
   319  	ctx context.Context,
   320  	req *csi.ValidateVolumeCapabilitiesRequest) (
   321  	*csi.ValidateVolumeCapabilitiesResponse, error) {
   322  
   323  	if len(req.GetVolumeId()) == 0 {
   324  		return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
   325  	}
   326  	if len(req.VolumeCapabilities) == 0 {
   327  		return nil, status.Error(codes.InvalidArgument, req.VolumeId)
   328  	}
   329  	i, _ := s.findVolNoLock("id", req.VolumeId)
   330  	if i < 0 {
   331  		return nil, status.Error(codes.NotFound, req.VolumeId)
   332  	}
   333  
   334  	if hookVal, hookMsg := s.execHook("ValidateVolumeCapabilities"); hookVal != codes.OK {
   335  		return nil, status.Errorf(hookVal, hookMsg)
   336  	}
   337  
   338  	return &csi.ValidateVolumeCapabilitiesResponse{
   339  		Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
   340  			VolumeContext:      req.GetVolumeContext(),
   341  			VolumeCapabilities: req.GetVolumeCapabilities(),
   342  			Parameters:         req.GetParameters(),
   343  		},
   344  	}, nil
   345  }
   346  
   347  func (s *service) ControllerGetVolume(
   348  	ctx context.Context,
   349  	req *csi.ControllerGetVolumeRequest) (
   350  	*csi.ControllerGetVolumeResponse, error) {
   351  
   352  	if hookVal, hookMsg := s.execHook("GetVolumeStart"); hookVal != codes.OK {
   353  		return nil, status.Errorf(hookVal, hookMsg)
   354  	}
   355  
   356  	resp := &csi.ControllerGetVolumeResponse{
   357  		Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
   358  			VolumeCondition: &csi.VolumeCondition{},
   359  		},
   360  	}
   361  	i, v := s.findVolByID(ctx, req.VolumeId)
   362  	if i < 0 {
   363  		resp.Status.VolumeCondition.Abnormal = true
   364  		resp.Status.VolumeCondition.Message = "volume not found"
   365  		return resp, status.Error(codes.NotFound, req.VolumeId)
   366  	}
   367  
   368  	resp.Volume = &v
   369  	if !s.config.DisableAttach {
   370  		resp.Status.PublishedNodeIds = []string{
   371  			s.nodeID,
   372  		}
   373  	}
   374  
   375  	if hookVal, hookMsg := s.execHook("GetVolumeEnd"); hookVal != codes.OK {
   376  		return nil, status.Errorf(hookVal, hookMsg)
   377  	}
   378  
   379  	return resp, nil
   380  }
   381  
   382  func (s *service) ListVolumes(
   383  	ctx context.Context,
   384  	req *csi.ListVolumesRequest) (
   385  	*csi.ListVolumesResponse, error) {
   386  
   387  	if hookVal, hookMsg := s.execHook("ListVolumesStart"); hookVal != codes.OK {
   388  		return nil, status.Errorf(hookVal, hookMsg)
   389  	}
   390  
   391  	// Copy the mock volumes into a new slice in order to avoid
   392  	// locking the service's volume slice for the duration of the
   393  	// ListVolumes RPC.
   394  	var vols []csi.Volume
   395  	func() {
   396  		s.volsRWL.RLock()
   397  		defer s.volsRWL.RUnlock()
   398  		vols = make([]csi.Volume, len(s.vols))
   399  		copy(vols, s.vols)
   400  	}()
   401  
   402  	var (
   403  		ulenVols      = int32(len(vols))
   404  		maxEntries    = req.MaxEntries
   405  		startingToken int32
   406  	)
   407  
   408  	if v := req.StartingToken; v != "" {
   409  		i, err := strconv.ParseUint(v, 10, 32)
   410  		if err != nil {
   411  			return nil, status.Errorf(
   412  				codes.Aborted,
   413  				"startingToken=%s: %v",
   414  				v, err)
   415  		}
   416  		startingToken = int32(i)
   417  	}
   418  
   419  	if startingToken > ulenVols {
   420  		return nil, status.Errorf(
   421  			codes.Aborted,
   422  			"startingToken=%d > len(vols)=%d",
   423  			startingToken, ulenVols)
   424  	}
   425  
   426  	// Discern the number of remaining entries.
   427  	rem := ulenVols - startingToken
   428  
   429  	// If maxEntries is 0 or greater than the number of remaining entries then
   430  	// set maxEntries to the number of remaining entries.
   431  	if maxEntries == 0 || maxEntries > rem {
   432  		maxEntries = rem
   433  	}
   434  
   435  	var (
   436  		i       int
   437  		j       = startingToken
   438  		entries = make(
   439  			[]*csi.ListVolumesResponse_Entry,
   440  			maxEntries)
   441  	)
   442  
   443  	for i = 0; i < len(entries); i++ {
   444  		volumeStatus := &csi.ListVolumesResponse_VolumeStatus{
   445  			VolumeCondition: &csi.VolumeCondition{},
   446  		}
   447  
   448  		if !s.config.DisableAttach {
   449  			volumeStatus.PublishedNodeIds = []string{
   450  				s.nodeID,
   451  			}
   452  		}
   453  
   454  		entries[i] = &csi.ListVolumesResponse_Entry{
   455  			Volume: &vols[j],
   456  			Status: volumeStatus,
   457  		}
   458  		j++
   459  	}
   460  
   461  	var nextToken string
   462  	if n := startingToken + int32(i); n < ulenVols {
   463  		nextToken = fmt.Sprintf("%d", n)
   464  	}
   465  
   466  	if hookVal, hookMsg := s.execHook("ListVolumesEnd"); hookVal != codes.OK {
   467  		return nil, status.Errorf(hookVal, hookMsg)
   468  	}
   469  
   470  	return &csi.ListVolumesResponse{
   471  		Entries:   entries,
   472  		NextToken: nextToken,
   473  	}, nil
   474  }
   475  
   476  func (s *service) GetCapacity(
   477  	ctx context.Context,
   478  	req *csi.GetCapacityRequest) (
   479  	*csi.GetCapacityResponse, error) {
   480  
   481  	if hookVal, hookMsg := s.execHook("GetCapacity"); hookVal != codes.OK {
   482  		return nil, status.Errorf(hookVal, hookMsg)
   483  	}
   484  
   485  	return &csi.GetCapacityResponse{
   486  		AvailableCapacity: MaxStorageCapacity,
   487  	}, nil
   488  }
   489  
   490  func (s *service) ControllerGetCapabilities(
   491  	ctx context.Context,
   492  	req *csi.ControllerGetCapabilitiesRequest) (
   493  	*csi.ControllerGetCapabilitiesResponse, error) {
   494  
   495  	if hookVal, hookMsg := s.execHook("ControllerGetCapabilitiesStart"); hookVal != codes.OK {
   496  		return nil, status.Errorf(hookVal, hookMsg)
   497  	}
   498  
   499  	caps := []*csi.ControllerServiceCapability{
   500  		{
   501  			Type: &csi.ControllerServiceCapability_Rpc{
   502  				Rpc: &csi.ControllerServiceCapability_RPC{
   503  					Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
   504  				},
   505  			},
   506  		},
   507  		{
   508  			Type: &csi.ControllerServiceCapability_Rpc{
   509  				Rpc: &csi.ControllerServiceCapability_RPC{
   510  					Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
   511  				},
   512  			},
   513  		},
   514  		{
   515  			Type: &csi.ControllerServiceCapability_Rpc{
   516  				Rpc: &csi.ControllerServiceCapability_RPC{
   517  					Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
   518  				},
   519  			},
   520  		},
   521  		{
   522  			Type: &csi.ControllerServiceCapability_Rpc{
   523  				Rpc: &csi.ControllerServiceCapability_RPC{
   524  					Type: csi.ControllerServiceCapability_RPC_GET_CAPACITY,
   525  				},
   526  			},
   527  		},
   528  		{
   529  			Type: &csi.ControllerServiceCapability_Rpc{
   530  				Rpc: &csi.ControllerServiceCapability_RPC{
   531  					Type: csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
   532  				},
   533  			},
   534  		},
   535  		{
   536  			Type: &csi.ControllerServiceCapability_Rpc{
   537  				Rpc: &csi.ControllerServiceCapability_RPC{
   538  					Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
   539  				},
   540  			},
   541  		},
   542  		{
   543  			Type: &csi.ControllerServiceCapability_Rpc{
   544  				Rpc: &csi.ControllerServiceCapability_RPC{
   545  					Type: csi.ControllerServiceCapability_RPC_PUBLISH_READONLY,
   546  				},
   547  			},
   548  		},
   549  		{
   550  			Type: &csi.ControllerServiceCapability_Rpc{
   551  				Rpc: &csi.ControllerServiceCapability_RPC{
   552  					Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
   553  				},
   554  			},
   555  		},
   556  		{
   557  			Type: &csi.ControllerServiceCapability_Rpc{
   558  				Rpc: &csi.ControllerServiceCapability_RPC{
   559  					Type: csi.ControllerServiceCapability_RPC_GET_VOLUME,
   560  				},
   561  			},
   562  		},
   563  		{
   564  			Type: &csi.ControllerServiceCapability_Rpc{
   565  				Rpc: &csi.ControllerServiceCapability_RPC{
   566  					Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
   567  				},
   568  			},
   569  		},
   570  	}
   571  
   572  	if !s.config.DisableAttach {
   573  		caps = append(caps, &csi.ControllerServiceCapability{
   574  			Type: &csi.ControllerServiceCapability_Rpc{
   575  				Rpc: &csi.ControllerServiceCapability_RPC{
   576  					Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
   577  				},
   578  			},
   579  		})
   580  	}
   581  
   582  	if !s.config.DisableControllerExpansion {
   583  		caps = append(caps, &csi.ControllerServiceCapability{
   584  			Type: &csi.ControllerServiceCapability_Rpc{
   585  				Rpc: &csi.ControllerServiceCapability_RPC{
   586  					Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
   587  				},
   588  			},
   589  		})
   590  	}
   591  
   592  	if hookVal, hookMsg := s.execHook("ControllerGetCapabilitiesEnd"); hookVal != codes.OK {
   593  		return nil, status.Errorf(hookVal, hookMsg)
   594  	}
   595  
   596  	return &csi.ControllerGetCapabilitiesResponse{
   597  		Capabilities: caps,
   598  	}, nil
   599  }
   600  
   601  func (s *service) CreateSnapshot(ctx context.Context,
   602  	req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
   603  	// Check arguments
   604  	if len(req.GetName()) == 0 {
   605  		return nil, status.Error(codes.InvalidArgument, "Snapshot Name cannot be empty")
   606  	}
   607  	if len(req.GetSourceVolumeId()) == 0 {
   608  		return nil, status.Error(codes.InvalidArgument, "Snapshot SourceVolumeId cannot be empty")
   609  	}
   610  
   611  	// Check to see if the snapshot already exists.
   612  	if i, v := s.snapshots.FindSnapshot("name", req.GetName()); i >= 0 {
   613  		// Requested snapshot name already exists
   614  		if v.SnapshotCSI.GetSourceVolumeId() != req.GetSourceVolumeId() || !reflect.DeepEqual(v.Parameters, req.GetParameters()) {
   615  			return nil, status.Error(codes.AlreadyExists,
   616  				fmt.Sprintf("Snapshot with name %s already exists", req.GetName()))
   617  		}
   618  		return &csi.CreateSnapshotResponse{Snapshot: &v.SnapshotCSI}, nil
   619  	}
   620  
   621  	// Create the snapshot and add it to the service's in-mem snapshot slice.
   622  	snapshot := s.newSnapshot(req.GetName(), req.GetSourceVolumeId(), req.GetParameters())
   623  	s.snapshots.Add(snapshot)
   624  
   625  	if hookVal, hookMsg := s.execHook("CreateSnapshotEnd"); hookVal != codes.OK {
   626  		return nil, status.Errorf(hookVal, hookMsg)
   627  	}
   628  
   629  	return &csi.CreateSnapshotResponse{Snapshot: &snapshot.SnapshotCSI}, nil
   630  }
   631  
   632  func (s *service) DeleteSnapshot(ctx context.Context,
   633  	req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
   634  
   635  	//  If the snapshot is not specified, return error
   636  	if len(req.SnapshotId) == 0 {
   637  		return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
   638  	}
   639  
   640  	if hookVal, hookMsg := s.execHook("DeleteSnapshotStart"); hookVal != codes.OK {
   641  		return nil, status.Errorf(hookVal, hookMsg)
   642  	}
   643  
   644  	// If the snapshot does not exist then return an idempotent response.
   645  	i, _ := s.snapshots.FindSnapshot("id", req.SnapshotId)
   646  	if i < 0 {
   647  		return &csi.DeleteSnapshotResponse{}, nil
   648  	}
   649  
   650  	// This delete logic preserves order and prevents potential memory
   651  	// leaks. The slice's elements may not be pointers, but the structs
   652  	// themselves have fields that are.
   653  	s.snapshots.Delete(i)
   654  	klog.V(5).InfoS("mock delete snapshot", "snapshotId", req.SnapshotId)
   655  
   656  	if hookVal, hookMsg := s.execHook("DeleteSnapshotEnd"); hookVal != codes.OK {
   657  		return nil, status.Errorf(hookVal, hookMsg)
   658  	}
   659  
   660  	return &csi.DeleteSnapshotResponse{}, nil
   661  }
   662  
   663  func (s *service) ListSnapshots(ctx context.Context,
   664  	req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
   665  
   666  	if hookVal, hookMsg := s.execHook("ListSnapshots"); hookVal != codes.OK {
   667  		return nil, status.Errorf(hookVal, hookMsg)
   668  	}
   669  
   670  	// case 1: SnapshotId is not empty, return snapshots that match the snapshot id.
   671  	if len(req.GetSnapshotId()) != 0 {
   672  		return getSnapshotById(s, req)
   673  	}
   674  
   675  	// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id.
   676  	if len(req.GetSourceVolumeId()) != 0 {
   677  		return getSnapshotByVolumeId(s, req)
   678  	}
   679  
   680  	// case 3: no parameter is set, so we return all the snapshots.
   681  	return getAllSnapshots(s, req)
   682  }
   683  
   684  func (s *service) ControllerExpandVolume(
   685  	ctx context.Context,
   686  	req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
   687  	if len(req.VolumeId) == 0 {
   688  		return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
   689  	}
   690  
   691  	if req.CapacityRange == nil {
   692  		return nil, status.Error(codes.InvalidArgument, "Request capacity cannot be empty")
   693  	}
   694  
   695  	if hookVal, hookMsg := s.execHook("ControllerExpandVolumeStart"); hookVal != codes.OK {
   696  		return nil, status.Errorf(hookVal, hookMsg)
   697  	}
   698  
   699  	s.volsRWL.Lock()
   700  	defer s.volsRWL.Unlock()
   701  
   702  	i, v := s.findVolNoLock("id", req.VolumeId)
   703  	if i < 0 {
   704  		return nil, status.Error(codes.NotFound, req.VolumeId)
   705  	}
   706  
   707  	if s.config.DisableOnlineExpansion && MockVolumes[v.GetVolumeId()].ISControllerPublished {
   708  		return nil, status.Error(codes.FailedPrecondition, "volume is published and online volume expansion is not supported")
   709  	}
   710  
   711  	requestBytes := req.CapacityRange.RequiredBytes
   712  
   713  	if v.CapacityBytes > requestBytes {
   714  		return nil, status.Error(codes.InvalidArgument, "cannot change volume capacity to a smaller size")
   715  	}
   716  
   717  	resp := &csi.ControllerExpandVolumeResponse{
   718  		CapacityBytes:         requestBytes,
   719  		NodeExpansionRequired: s.config.NodeExpansionRequired,
   720  	}
   721  
   722  	// Check to see if the volume already satisfied request size.
   723  	if v.CapacityBytes == requestBytes {
   724  		klog.V(5).InfoS("volume capacity sufficient, no need to expand", "requested", requestBytes, "current", v.CapacityBytes, "volumeID", v.VolumeId)
   725  		return resp, nil
   726  	}
   727  
   728  	// Update volume's capacity to the requested size.
   729  	v.CapacityBytes = requestBytes
   730  	s.vols[i] = v
   731  
   732  	if hookVal, hookMsg := s.execHook("ControllerExpandVolumeEnd"); hookVal != codes.OK {
   733  		return nil, status.Errorf(hookVal, hookMsg)
   734  	}
   735  
   736  	return resp, nil
   737  }
   738  
   739  func getSnapshotById(s *service, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
   740  	if len(req.GetSnapshotId()) != 0 {
   741  		i, snapshot := s.snapshots.FindSnapshot("id", req.GetSnapshotId())
   742  		if i < 0 {
   743  			return &csi.ListSnapshotsResponse{}, nil
   744  		}
   745  
   746  		if len(req.GetSourceVolumeId()) != 0 {
   747  			if snapshot.SnapshotCSI.GetSourceVolumeId() != req.GetSourceVolumeId() {
   748  				return &csi.ListSnapshotsResponse{}, nil
   749  			}
   750  		}
   751  
   752  		return &csi.ListSnapshotsResponse{
   753  			Entries: []*csi.ListSnapshotsResponse_Entry{
   754  				{
   755  					Snapshot: &snapshot.SnapshotCSI,
   756  				},
   757  			},
   758  		}, nil
   759  	}
   760  	return nil, nil
   761  }
   762  
   763  func getSnapshotByVolumeId(s *service, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
   764  	if len(req.GetSourceVolumeId()) != 0 {
   765  		i, snapshot := s.snapshots.FindSnapshot("sourceVolumeId", req.SourceVolumeId)
   766  		if i < 0 {
   767  			return &csi.ListSnapshotsResponse{}, nil
   768  		}
   769  		return &csi.ListSnapshotsResponse{
   770  			Entries: []*csi.ListSnapshotsResponse_Entry{
   771  				{
   772  					Snapshot: &snapshot.SnapshotCSI,
   773  				},
   774  			},
   775  		}, nil
   776  	}
   777  	return nil, nil
   778  }
   779  
   780  func getAllSnapshots(s *service, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
   781  	// Copy the mock snapshots into a new slice in order to avoid
   782  	// locking the service's snapshot slice for the duration of the
   783  	// ListSnapshots RPC.
   784  	readyToUse := true
   785  	snapshots := s.snapshots.List(readyToUse)
   786  
   787  	var (
   788  		ulenSnapshots = int32(len(snapshots))
   789  		maxEntries    = req.MaxEntries
   790  		startingToken int32
   791  	)
   792  
   793  	if v := req.StartingToken; v != "" {
   794  		i, err := strconv.ParseUint(v, 10, 32)
   795  		if err != nil {
   796  			return nil, status.Errorf(
   797  				codes.Aborted,
   798  				"startingToken=%s: %v",
   799  				v, err)
   800  		}
   801  		startingToken = int32(i)
   802  	}
   803  
   804  	if startingToken > ulenSnapshots {
   805  		return nil, status.Errorf(
   806  			codes.Aborted,
   807  			"startingToken=%d > len(snapshots)=%d",
   808  			startingToken, ulenSnapshots)
   809  	}
   810  
   811  	// Discern the number of remaining entries.
   812  	rem := ulenSnapshots - startingToken
   813  
   814  	// If maxEntries is 0 or greater than the number of remaining entries then
   815  	// set maxEntries to the number of remaining entries.
   816  	if maxEntries == 0 || maxEntries > rem {
   817  		maxEntries = rem
   818  	}
   819  
   820  	var (
   821  		i       int
   822  		j       = startingToken
   823  		entries = make(
   824  			[]*csi.ListSnapshotsResponse_Entry,
   825  			maxEntries)
   826  	)
   827  
   828  	for i = 0; i < len(entries); i++ {
   829  		entries[i] = &csi.ListSnapshotsResponse_Entry{
   830  			Snapshot: &snapshots[j],
   831  		}
   832  		j++
   833  	}
   834  
   835  	var nextToken string
   836  	if n := startingToken + int32(i); n < ulenSnapshots {
   837  		nextToken = fmt.Sprintf("%d", n)
   838  	}
   839  
   840  	return &csi.ListSnapshotsResponse{
   841  		Entries:   entries,
   842  		NextToken: nextToken,
   843  	}, nil
   844  }
   845  

View as plain text