// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package clusterreader

import (
	"context"
	"errors"
	"fmt"
	"sync"

	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/tools/pager"
	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
	"sigs.k8s.io/cli-utils/pkg/object"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

// This map is hard-coded knowledge that a Deployment contains and
// ReplicaSet, and that a ReplicaSet in turn contains Pods, etc., and the
// approach to finding status being used here requires hardcoding that
// knowledge in the status client library.
// TODO: These should probably be defined in the statusreaders rather than here.
var genGroupKinds = map[schema.GroupKind][]schema.GroupKind{
	schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
		{
			Group: "apps",
			Kind:  "ReplicaSet",
		},
	},
	schema.GroupKind{Group: "apps", Kind: "ReplicaSet"}: { //nolint:gofmt
		{
			Group: "",
			Kind:  "Pod",
		},
	},
	schema.GroupKind{Group: "apps", Kind: "StatefulSet"}: { //nolint:gofmt
		{
			Group: "",
			Kind:  "Pod",
		},
	},
}

// NewCachingClusterReader returns a new instance of the ClusterReader. The
// ClusterReader needs will use the clusterreader to fetch resources from the cluster,
// while the mapper is used to resolve the version for GroupKinds. The set of
// identifiers is needed so the ClusterReader can figure out which GroupKind
// and namespace combinations it needs to cache when the Sync function is called.
// We only want to fetch the resources that are actually needed.
func NewCachingClusterReader(reader client.Reader, mapper meta.RESTMapper, identifiers object.ObjMetadataSet) (engine.ClusterReader, error) {
	gvkNamespaceSet := newGnSet()
	for _, id := range identifiers {
		// For every identifier, add the GroupVersionKind and namespace combination to the gvkNamespaceSet and
		// check the genGroupKinds map for any generated resources that also should be included.
		err := buildGvkNamespaceSet([]schema.GroupKind{id.GroupKind}, id.Namespace, gvkNamespaceSet)
		if err != nil {
			return nil, err
		}
	}

	return &CachingClusterReader{
		reader: reader,
		mapper: mapper,
		gns:    gvkNamespaceSet.gvkNamespaces,
	}, nil
}

func buildGvkNamespaceSet(gks []schema.GroupKind, namespace string, gvkNamespaceSet *gvkNamespaceSet) error {
	for _, gk := range gks {
		gvkNamespaceSet.add(gkNamespace{
			GroupKind: gk,
			Namespace: namespace,
		})
		genGKs, found := genGroupKinds[gk]
		if found {
			err := buildGvkNamespaceSet(genGKs, namespace, gvkNamespaceSet)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

type gvkNamespaceSet struct {
	gvkNamespaces []gkNamespace
	seen          map[gkNamespace]struct{}
}

func newGnSet() *gvkNamespaceSet {
	return &gvkNamespaceSet{
		seen: make(map[gkNamespace]struct{}),
	}
}

func (g *gvkNamespaceSet) add(gn gkNamespace) {
	if _, found := g.seen[gn]; !found {
		g.gvkNamespaces = append(g.gvkNamespaces, gn)
		g.seen[gn] = struct{}{}
	}
}

// CachingClusterReader is an implementation of the ObserverReader interface that will
// pre-fetch all resources needed before every sync loop. The resources needed are decided by
// finding all combinations of GroupVersionKind and namespace referenced by the provided
// identifiers. This list is then expanded to include any known generated resource types.
type CachingClusterReader struct {
	mx sync.RWMutex

	// clusterreader provides functions to read and list resources from the
	// cluster.
	reader client.Reader

	// mapper is the client-side representation of the server-side scheme. It is used
	// to resolve GroupVersionKind from GroupKind.
	mapper meta.RESTMapper

	// gns contains the slice of all the GVK and namespace combinations that
	// should be included in the cache. This is computed based the resource identifiers
	// passed in when the CachingClusterReader is created and augmented with other
	// resource types needed to compute status (see genGroupKinds).
	gns []gkNamespace

	// cache contains the resources found in the cluster for the given combination
	// of GVK and namespace. Before each polling cycle, the framework will call the
	// Sync function, which is responsible for repopulating the cache.
	cache map[gkNamespace]cacheEntry
}

type cacheEntry struct {
	resources unstructured.UnstructuredList
	err       error
}

// gkNamespace contains information about a GroupVersionKind and a namespace.
type gkNamespace struct {
	GroupKind schema.GroupKind
	Namespace string
}

// Get looks up the resource identified by the key and the object GVK in the cache. If the needed combination
// of GVK and namespace is not part of the cache, that is considered an error.
func (c *CachingClusterReader) Get(_ context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
	c.mx.RLock()
	defer c.mx.RUnlock()
	gvk := obj.GetObjectKind().GroupVersionKind()
	mapping, err := c.mapper.RESTMapping(gvk.GroupKind())
	if err != nil {
		return err
	}
	gn := gkNamespace{
		GroupKind: gvk.GroupKind(),
		Namespace: key.Namespace,
	}
	cacheEntry, found := c.cache[gn]
	if !found {
		return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
	}

	if cacheEntry.err != nil {
		return cacheEntry.err
	}
	for _, u := range cacheEntry.resources.Items {
		if u.GetName() == key.Name {
			obj.Object = u.Object
			return nil
		}
	}
	return apierrors.NewNotFound(mapping.Resource.GroupResource(), key.Name)
}

// ListNamespaceScoped lists all resource identifier by the GVK of the list, the namespace and the selector
// from the cache. If the needed combination of GVK and namespace is not part of the cache, that is considered an error.
func (c *CachingClusterReader) ListNamespaceScoped(_ context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error {
	c.mx.RLock()
	defer c.mx.RUnlock()
	gvk := list.GroupVersionKind()
	gn := gkNamespace{
		GroupKind: gvk.GroupKind(),
		Namespace: namespace,
	}

	cacheEntry, found := c.cache[gn]
	if !found {
		return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
	}

	if cacheEntry.err != nil {
		return cacheEntry.err
	}

	var items []unstructured.Unstructured
	for _, u := range cacheEntry.resources.Items {
		if selector.Matches(labels.Set(u.GetLabels())) {
			items = append(items, u)
		}
	}
	list.Items = items
	return nil
}

// ListClusterScoped lists all resource identifier by the GVK of the list and selector
// from the cache. If the needed combination of GVK and namespace (which for clusterscoped resources
// will always be the empty string) is not part of the cache, that is considered an error.
func (c *CachingClusterReader) ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error {
	return c.ListNamespaceScoped(ctx, list, "", selector)
}

// Sync loops over the list of gkNamespace we know of, and uses list calls to fetch the resources.
// This information populates the cache.
func (c *CachingClusterReader) Sync(ctx context.Context) error {
	c.mx.Lock()
	defer c.mx.Unlock()
	cache := make(map[gkNamespace]cacheEntry)
	for _, gn := range c.gns {
		mapping, err := c.mapper.RESTMapping(gn.GroupKind)
		if err != nil {
			if meta.IsNoMatchError(err) {
				// If we get a NoMatchError, it means we are checking for
				// a type that doesn't exist. Presumably the CRD is being
				// applied, so it will be added. Reset the RESTMapper to
				// make sure we pick up any new resource types on the
				// APIServer.
				cache[gn] = cacheEntry{
					err: err,
				}
				continue
			}
			return err
		}
		ns := ""
		if mapping.Scope == meta.RESTScopeNamespace {
			ns = gn.Namespace
		}
		list, err := c.listUnstructured(ctx, mapping.GroupVersionKind, ns)
		if err != nil {
			// If the context was cancelled, we just stop the work and return
			// the error.
			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
				return err
			}
			// For other errors, we just keep it the error. Whenever any pollers
			// request a resource covered by this gns, we just return the
			// error.
			cache[gn] = cacheEntry{
				err: err,
			}
			continue
		}
		cache[gn] = cacheEntry{
			resources: *list,
		}
	}
	c.cache = cache
	return nil
}

// listUnstructured performs one or more LIST calls, paginating the requests
// and aggregating the results.  If aggregated, only the ResourceVersion,
// SelfLink, and Items will be populated. The default page size is 500.
func (c *CachingClusterReader) listUnstructured(
	ctx context.Context,
	gvk schema.GroupVersionKind,
	namespace string,
) (*unstructured.UnstructuredList, error) {
	mOpts := metav1.ListOptions{}
	mOpts.SetGroupVersionKind(gvk)
	obj, _, err := pager.New(c.listPageFunc(namespace)).List(ctx, mOpts)
	if err != nil {
		return nil, err
	}

	switch t := obj.(type) {
	case *unstructured.UnstructuredList:
		// all in one
		return t, nil
	case *metainternalversion.List:
		// aggregated result
		u := &unstructured.UnstructuredList{}
		u.SetGroupVersionKind(gvk)
		// Only ResourceVersion & SelfLink are copied into the aggregated result
		// by ListPager.
		if t.ResourceVersion != "" {
			u.SetResourceVersion(t.ResourceVersion)
		}
		if t.SelfLink != "" { // nolint:staticcheck
			u.SetSelfLink(t.SelfLink) // nolint:staticcheck
		}
		u.Items = make([]unstructured.Unstructured, len(t.Items))
		for i, item := range t.Items {
			ui, ok := item.(*unstructured.Unstructured)
			if !ok {
				return nil, fmt.Errorf("unexpected list item type: %t", item)
			}
			u.Items[i] = *ui
		}
		return u, nil
	default:
		return nil, fmt.Errorf("unexpected list type: %t", t)
	}
}

func (c *CachingClusterReader) listPageFunc(namespace string) pager.ListPageFunc {
	return func(ctx context.Context, mOpts metav1.ListOptions) (runtime.Object, error) {
		mOptsCopy := mOpts
		labelSelector, err := labels.Parse(mOpts.LabelSelector)
		if err != nil {
			return nil, fmt.Errorf("failed to parse label selector: %w", err)
		}
		fieldSelector, err := fields.ParseSelector(mOpts.FieldSelector)
		if err != nil {
			return nil, fmt.Errorf("failed to parse field selector: %w", err)
		}
		cOpts := &client.ListOptions{
			LabelSelector: labelSelector,
			FieldSelector: fieldSelector,
			Namespace:     namespace,
			Limit:         mOpts.Limit,
			Continue:      mOpts.Continue,
			Raw:           &mOptsCopy,
		}
		var list unstructured.UnstructuredList
		list.SetGroupVersionKind(mOpts.GroupVersionKind())
		// Note: client.ListOptions only supports Exact ResourceVersion matching.
		// So leave ResourceVersion blank to get Any ResourceVersion.
		err = c.reader.List(ctx, &list, cOpts)
		return &list, err
	}
}