...
1 package lighthouse
2
3 import (
4 "context"
5 "fmt"
6
7 "github.com/go-logr/logr"
8 v1 "k8s.io/api/core/v1"
9 "k8s.io/apimachinery/pkg/fields"
10 "k8s.io/apimachinery/pkg/types"
11 kwatch "k8s.io/apimachinery/pkg/watch"
12
13 "edge-infra.dev/pkg/edge/lighthouse/config"
14
15 k8serrors "k8s.io/apimachinery/pkg/api/errors"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17 )
18
19 const (
20
21 lighthouseConfigMapName = "lighthouse"
22
23 lighthouseConfigMapNs = "lighthouse"
24
25 lighthouseDataFieldName = "CHANNELS"
26 )
27
28
29
30 func WatchCfg(ctx context.Context, s *config.Config, lstner *WatchTower, log logr.Logger) (kwatch.Interface, error) {
31
32 lighthouseConfigmap := &v1.ConfigMap{}
33 if err := s.WatchClient.Get(ctx, types.NamespacedName{
34 Name: lighthouseConfigMapName,
35 Namespace: lighthouseConfigMapNs,
36 }, lighthouseConfigmap); err != nil {
37 if !k8serrors.IsNotFound(err) {
38 return nil, err
39 }
40 updateChannels(lstner, "", log)
41 }
42 list := &v1.ConfigMapList{}
43 watch, err := s.WatchClient.Watch(ctx, list, &client.ListOptions{
44 FieldSelector: fields.OneTermEqualSelector("metadata.name", lighthouseConfigMapName),
45 Namespace: lighthouseConfigMapNs,
46 })
47 if err != nil {
48 return nil, err
49 }
50 go func(log logr.Logger) {
51 for event := range watch.ResultChan() {
52 switch event.Type {
53 case kwatch.Deleted:
54
55
56 if err := lstner.UnregisterAllChannels(); err != nil {
57 log.Error(err, "An error occurred unregistering all Postgres channels")
58 panic(err)
59 }
60 updateChannels(lstner, "", log)
61 case kwatch.Added, kwatch.Modified:
62 updatedConfigmap := event.Object.(*v1.ConfigMap)
63 updateChannels(lstner, updatedConfigmap.Data[lighthouseDataFieldName], log)
64 }
65 }
66 }(log)
67 return watch, nil
68 }
69
70
71 func updateChannels(lstner *WatchTower, list string, log logr.Logger) {
72 channels := config.ParseChannelList(list)
73 for i := range channels {
74 if err := lstner.RegisterChannel(channels[i]); err != nil {
75 log.Error(err, fmt.Sprintf("An error occurred registering %s Postgres channels", channels[i]))
76 }
77 }
78 }
79
View as plain text