/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpc

import (
	"context"
	"fmt"
	"net"
	"net/http"
	"net/http/httptest"
	"strconv"
	"strings"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"google.golang.org/grpc"
	grpchealth "google.golang.org/grpc/health/grpc_health_v1"

	"k8s.io/kubernetes/pkg/probe"
)

func TestNew(t *testing.T) {
	t.Run("Should: implement Probe interface", func(t *testing.T) {
		s := New()
		assert.Implements(t, (*Prober)(nil), s)
	})
}

type successServerMock struct {
}

func (s successServerMock) Check(context.Context, *grpchealth.HealthCheckRequest) (*grpchealth.HealthCheckResponse, error) {
	return &grpchealth.HealthCheckResponse{
		Status: grpchealth.HealthCheckResponse_SERVING,
	}, nil
}

func (s successServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream grpchealth.Health_WatchServer) error {
	return stream.Send(&grpchealth.HealthCheckResponse{
		Status: grpchealth.HealthCheckResponse_SERVING,
	})
}

type errorTimeoutServerMock struct {
}

func (e errorTimeoutServerMock) Check(context.Context, *grpchealth.HealthCheckRequest) (*grpchealth.HealthCheckResponse, error) {
	time.Sleep(time.Second * 4)
	return &grpchealth.HealthCheckResponse{
		Status: grpchealth.HealthCheckResponse_SERVING,
	}, nil
}

func (e errorTimeoutServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream grpchealth.Health_WatchServer) error {
	time.Sleep(time.Second * 4)
	return stream.Send(&grpchealth.HealthCheckResponse{
		Status: grpchealth.HealthCheckResponse_SERVING,
	})
}

type errorNotServeServerMock struct {
}

func (e errorNotServeServerMock) Check(context.Context, *grpchealth.HealthCheckRequest) (*grpchealth.HealthCheckResponse, error) {
	return &grpchealth.HealthCheckResponse{
		Status: grpchealth.HealthCheckResponse_NOT_SERVING,
	}, nil
}

func (e errorNotServeServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream grpchealth.Health_WatchServer) error {
	return stream.Send(&grpchealth.HealthCheckResponse{
		Status: grpchealth.HealthCheckResponse_NOT_SERVING,
	})
}

func TestGrpcProber_Probe(t *testing.T) {
	t.Run("Should: failed but return nil error because cant find host", func(t *testing.T) {
		s := New()
		p, o, err := s.Probe("", "", 32, time.Second)
		assert.Equal(t, probe.Failure, p)
		assert.Equal(t, nil, err)
		assert.Equal(t, "timeout: failed to connect service \":32\" within 1s: context deadline exceeded", o)
	})
	t.Run("Should: return nil error because connection closed", func(t *testing.T) {
		s := New()
		server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			fmt.Fprint(w, "res")
		}))
		u := strings.Split(server.URL, ":")
		assert.Equal(t, 3, len(u))

		port, err := strconv.Atoi(u[2])
		assert.Equal(t, nil, err)

		// take some time to wait server boot
		time.Sleep(2 * time.Second)
		p, _, err := s.Probe("127.0.0.1", "", port, time.Second)
		assert.Equal(t, probe.Failure, p)
		assert.Equal(t, nil, err)
	})
	t.Run("Should: return nil error because server response not served", func(t *testing.T) {
		s := New()
		lis, _ := net.Listen("tcp", ":0")
		port := lis.Addr().(*net.TCPAddr).Port
		grpcServer := grpc.NewServer()
		defer grpcServer.Stop()
		grpchealth.RegisterHealthServer(grpcServer, &errorNotServeServerMock{})
		go func() {
			_ = grpcServer.Serve(lis)
		}()
		// take some time to wait server boot
		time.Sleep(2 * time.Second)
		p, o, err := s.Probe("0.0.0.0", "", port, time.Second)
		assert.Equal(t, probe.Failure, p)
		assert.Equal(t, nil, err)
		assert.Equal(t, "service unhealthy (responded with \"NOT_SERVING\")", o)
	})
	t.Run("Should: return nil-error because server not response in time", func(t *testing.T) {
		s := New()
		lis, _ := net.Listen("tcp", ":0")
		port := lis.Addr().(*net.TCPAddr).Port

		grpcServer := grpc.NewServer()
		defer grpcServer.Stop()
		grpchealth.RegisterHealthServer(grpcServer, &errorTimeoutServerMock{})
		go func() {
			_ = grpcServer.Serve(lis)
		}()
		// take some time to wait server boot
		time.Sleep(2 * time.Second)
		p, o, err := s.Probe("0.0.0.0", "", port, time.Second*2)
		assert.Equal(t, probe.Failure, p)
		assert.Equal(t, nil, err)
		assert.Equal(t, "timeout: health rpc did not complete within 2s", o)

	})
	t.Run("Should: not return error because check was success", func(t *testing.T) {
		s := New()
		lis, _ := net.Listen("tcp", ":0")
		port := lis.Addr().(*net.TCPAddr).Port

		grpcServer := grpc.NewServer()
		defer grpcServer.Stop()
		grpchealth.RegisterHealthServer(grpcServer, &successServerMock{})
		go func() {
			_ = grpcServer.Serve(lis)
		}()
		// take some time to wait server boot
		time.Sleep(2 * time.Second)
		p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2)
		assert.Equal(t, probe.Success, p)
		assert.Equal(t, nil, err)
	})
	t.Run("Should: not return error because check was success, when listen port is 0", func(t *testing.T) {
		s := New()
		lis, _ := net.Listen("tcp", ":0")
		port := lis.Addr().(*net.TCPAddr).Port

		grpcServer := grpc.NewServer()
		defer grpcServer.Stop()
		grpchealth.RegisterHealthServer(grpcServer, &successServerMock{})
		go func() {
			_ = grpcServer.Serve(lis)
		}()
		// take some time to wait server boot
		time.Sleep(2 * time.Second)
		p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2)
		assert.Equal(t, probe.Success, p)
		assert.Equal(t, nil, err)
	})
}