package integration import ( "net" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v "edge-infra.dev/pkg/sds/remoteaccess/wireguard/vpn" "edge-infra.dev/test/f2" ) func TestIPAddressIsInSubnet(t *testing.T) { var ( vpn *v.VPN err error ) feature := f2.NewFeature("IPAddressIsInSubnet"). Setup("create VPN", func(ctx f2.Context, t *testing.T) f2.Context { vpn, err = v.New() require.NoError(t, err) return ctx }). Test("error when IP is nil", func(ctx f2.Context, t *testing.T) f2.Context { in, err := vpn.IPAddressIsInSubnet(net.ParseIP("0.0.0.0")) assert.Error(t, err) assert.False(t, in) return ctx }). Test("error when unable to parse subnet prefix", func(ctx f2.Context, t *testing.T) f2.Context { vpn.SubnetCIDR = "1.2.3.0" in, err := vpn.IPAddressIsInSubnet(net.ParseIP("0.0.0.0")) assert.Error(t, err) assert.False(t, in) return ctx }). Test("error when unable to parse subnet address", func(ctx f2.Context, t *testing.T) f2.Context { vpn.SubnetCIDR = "/24" in, err := vpn.IPAddressIsInSubnet(net.ParseIP("0.0.0.0")) assert.Error(t, err) assert.False(t, in) return ctx }). Test("return false if IP is not in subnet", func(ctx f2.Context, t *testing.T) f2.Context { vpn.SubnetCIDR = "1.2.3.0/24" in, err := vpn.IPAddressIsInSubnet(net.ParseIP("0.0.0.0")) assert.NoError(t, err) assert.False(t, in) return ctx }). Test("return true if IP is in subnet", func(ctx f2.Context, t *testing.T) f2.Context { vpn.SubnetCIDR = "1.2.3.0/24" in, err := vpn.IPAddressIsInSubnet(net.ParseIP("1.2.3.4")) assert.NoError(t, err) assert.True(t, in) return ctx }).Feature() f.Test(t, feature) } func TestRequestAvailableIPAddress(t *testing.T) { var ( vpn *v.VPN err error ) feature := f2.NewFeature("RequestAvailableIPAddress"). Setup("create VPN", func(ctx f2.Context, t *testing.T) f2.Context { vpn, err = v.New() require.NoError(t, err) return ctx }). Test("error and do not return IP when IP address pool is nil", func(ctx f2.Context, t *testing.T) f2.Context { ip, err := vpn.RequestAvailableIPAddress("test-clusterEdgeID") assert.Nil(t, ip) assert.Error(t, err) assert.ErrorIs(t, err, v.ErrSubnetNotConfigured) return ctx }). Test("error and do not return IP when IP address pool is empty", func(ctx f2.Context, t *testing.T) f2.Context { vpn.AvailableIPAddressPool = v.IPAddressPool{} ip, err := vpn.RequestAvailableIPAddress("test-clusterEdgeID") assert.Nil(t, ip) assert.Error(t, err) assert.ErrorIs(t, err, v.ErrNoIPAddressesAvailable) return ctx }). Test("error and do not return IP when all IP addresses are unavailable", func(ctx f2.Context, t *testing.T) f2.Context { vpn.AvailableIPAddressPool = v.IPAddressPool{"1.2.3.4": "another-cluster"} ip, err := vpn.RequestAvailableIPAddress("test-clusterEdgeID") assert.Nil(t, ip) assert.Error(t, err) assert.ErrorIs(t, err, v.ErrNoIPAddressesAvailable) return ctx }). Test("successfully return IP when IPs address are available", func(ctx f2.Context, t *testing.T) f2.Context { vpn.AvailableIPAddressPool = v.IPAddressPool{"1.2.3.4": ""} ip, err := vpn.RequestAvailableIPAddress("test-clusterEdgeID") assert.NoError(t, err) assert.Equal(t, net.ParseIP("1.2.3.4"), ip) return ctx }).Feature() f.Test(t, feature) }