package info import ( "context" "reflect" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" kwatch "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" ) // OnConfigMapUpdate watches for configmap updates, execute function param if valid configmap and based on optional filters // returns a watch ref that can be called to stop the watch `defer watch.Stop()` func OnConfigMapUpdate(ctx context.Context, cl client.WithWatch, logger logr.Logger, fn func(cfg *EdgeInfo), filters ...string) (kwatch.Interface, error) { list := &v1.ConfigMapList{} watch, err := cl.Watch(ctx, list, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", EdgeConfigMapName), Namespace: EdgeConfigMapNS, }) if err != nil { return nil, err } go func() { var old *v1.ConfigMap for event := range watch.ResultChan() { switch event.Type { case kwatch.Added, kwatch.Modified: updatedConfigmap := event.Object.(*v1.ConfigMap) edgeInfoUpdated, err := New(updatedConfigmap) if err != nil { // skip for invalid config logger.Error(err, "fail to parse edge info ConfigMap, will watch for update") continue } if old == nil { old = updatedConfigmap fn(edgeInfoUpdated) continue } configMapChanged := !reflect.DeepEqual(FromConfigMap(old), edgeInfoUpdated) matchedOnFilters := cfgValuesMatches(old, updatedConfigmap, filters) if configMapChanged && matchedOnFilters { old = updatedConfigmap fn(edgeInfoUpdated) } } } }() return watch, err } func cfgValuesMatches(old, updated *v1.ConfigMap, filters []string) bool { for _, f := range filters { if old.Data[f] != updated.Data[f] { return true } } return len(filters) == 0 }