...

Source file src/edge-infra.dev/pkg/edge/lighthouse/listener.go

Documentation: edge-infra.dev/pkg/edge/lighthouse

     1  package lighthouse
     2  
     3  import (
     4  	"context"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/go-logr/logr"
     9  	"github.com/lib/pq"
    10  
    11  	"edge-infra.dev/pkg/edge/lighthouse/event"
    12  	"edge-infra.dev/pkg/lib/gcp/pubsub"
    13  )
    14  
    15  const (
    16  	// minReconnectInterval is the minimum default
    17  	// reconnect interval for the PG listener.
    18  	minReconnectInterval = time.Second * 5
    19  	// maxReconnectInterval is the maximum default
    20  	// reconnect interval for the PG listener.
    21  	maxReconnectInterval = time.Minute * 5
    22  	// DefaultLivelinessProbe is the default time to wait
    23  	// for a message before pinging the connection for liveliness.
    24  	DefaultLivelinessProbe = time.Minute * 20
    25  	// DefaultMaxConcurrent the maximum number of goroutines
    26  	// that can be spawn by the daemon to process the PG notify events.
    27  	DefaultMaxConcurrent = 1000
    28  )
    29  
    30  // WatchTower represents listener for PG notifications and events.
    31  type WatchTower struct {
    32  	sync.Mutex
    33  	// channels the channels to listen on for PG events.
    34  	channels map[string]struct{}
    35  	// listener an interface for listening to events from PG.
    36  	listener *pq.Listener
    37  	// livelinessProbe the wait time for a message before the connection is pinged for liveliness.
    38  	livelinessProbe time.Duration
    39  	// maxConcurrent the maximum number of goroutines the can be spawn to handle PG events.
    40  	maxConcurrent int64
    41  	// projectID the gcp project
    42  	projectID string
    43  	// pubsubClient the gcp pubsub client
    44  	pubsubClient pubsub.Publisher
    45  	// topicID the gcp pubsub topic to send the pubsub messages.
    46  	topicID string
    47  	// ticker is the wait time before querying the database for unacknowledged events.
    48  	ticker *time.Ticker
    49  }
    50  
    51  // NotificationEvent represents the PG notification stored in the database.
    52  type NotificationEvent struct {
    53  	ID           string
    54  	Channel      string
    55  	Operation    string
    56  	Record       interface{}
    57  	Acknowledged bool
    58  }
    59  
    60  // NewWatchTower returns a new listener.
    61  func NewWatchTower() *WatchTower {
    62  	return &WatchTower{
    63  		channels:        make(map[string]struct{}),
    64  		livelinessProbe: DefaultLivelinessProbe,
    65  		maxConcurrent:   DefaultMaxConcurrent,
    66  		ticker:          time.NewTicker(DefaultLivelinessProbe),
    67  	}
    68  }
    69  
    70  // SetLivelinessProbe sets the listener livelinessprobe duration value.
    71  func (w *WatchTower) SetLivelinessProbe(liveliness time.Duration) *WatchTower {
    72  	w.livelinessProbe = liveliness
    73  	return w
    74  }
    75  
    76  // GetLivelinessProbe gets the listener livelinessprobe duration value.
    77  func (w *WatchTower) GetLivelinessProbe() time.Duration {
    78  	return w.livelinessProbe
    79  }
    80  
    81  // SetMaxConcurrent sets the listener maximum concurrent goroutine bounds.
    82  func (w *WatchTower) SetMaxConcurrent(max int64) *WatchTower {
    83  	w.maxConcurrent = max
    84  	return w
    85  }
    86  
    87  // GetMaxConcurrent gets the listener maximum concurrent goroutine bounds.
    88  func (w *WatchTower) GetMaxConcurrent() int64 {
    89  	return w.maxConcurrent
    90  }
    91  
    92  // RegisterChannel registers a new channel to be watched for PG events.
    93  func (w *WatchTower) RegisterChannel(channel string) error {
    94  	w.Mutex.Lock()
    95  	defer w.Mutex.Unlock()
    96  	if _, exists := w.channels[channel]; !exists {
    97  		if err := w.listener.Listen(channel); err != nil {
    98  			return err
    99  		}
   100  		w.channels[channel] = struct{}{}
   101  	}
   102  	return nil
   103  }
   104  
   105  // ChannelExists checks if a channel exists.
   106  func (w *WatchTower) ChannelExists(channel string) bool {
   107  	w.Mutex.Lock()
   108  	defer w.Mutex.Unlock()
   109  	_, exists := w.channels[channel]
   110  	return exists
   111  }
   112  
   113  // UnregisterChannel unregisters a channel from the listener if the channel exists
   114  // after this is called, no events will be received from that channel.
   115  func (w *WatchTower) UnregisterChannel(channel string) error {
   116  	w.Mutex.Lock()
   117  	defer w.Mutex.Unlock()
   118  	if _, exists := w.channels[channel]; exists {
   119  		if err := w.listener.Unlisten(channel); err != nil {
   120  			return err
   121  		}
   122  		delete(w.channels, channel)
   123  	}
   124  	return nil
   125  }
   126  
   127  // UnregisterAllChannels unregisters all channels from the listener and no events
   128  // will be received after this is called until a new channel is registered.
   129  func (w *WatchTower) UnregisterAllChannels() error {
   130  	w.Mutex.Lock()
   131  	defer w.Mutex.Unlock()
   132  	if err := w.listener.UnlistenAll(); err != nil {
   133  		return err
   134  	}
   135  	w.channels = make(map[string]struct{})
   136  	return nil
   137  }
   138  
   139  // AddListener adds a new listener by initializing a gcp cloudsql dialer.
   140  func (w *WatchTower) AddListener(dbcfg pq.Dialer, connectionString string, minInterval, maxInterval time.Duration, log logr.Logger) *WatchTower {
   141  	if minInterval == 0 {
   142  		minInterval = minReconnectInterval
   143  	}
   144  	if maxInterval == 0 {
   145  		maxInterval = maxReconnectInterval
   146  	}
   147  	dbListener := pq.NewDialListener(dbcfg, connectionString, minInterval, maxInterval, func(_ pq.ListenerEventType, err error) {
   148  		if err != nil {
   149  			log.Error(err, "An error occurred creating new dialer listener")
   150  		}
   151  	})
   152  	w.listener = dbListener
   153  	return w
   154  }
   155  
   156  // SetProjectID sets the pubsub projectID.
   157  func (w *WatchTower) SetProjectID(projectID string) *WatchTower {
   158  	w.projectID = projectID
   159  	return w
   160  }
   161  
   162  // GetProjectID gets the pubsub projectID.
   163  func (w *WatchTower) GetProjectID() string {
   164  	return w.projectID
   165  }
   166  
   167  // SetTopicID sets the pubsub topicID.
   168  func (w *WatchTower) SetTopicID(topicID string) *WatchTower {
   169  	w.topicID = topicID
   170  	return w
   171  }
   172  
   173  // GetTopicID gets the pubsub topicID.
   174  func (w *WatchTower) GetTopicID() string {
   175  	return w.topicID
   176  }
   177  
   178  // AddPubsubClient sets the pubsub client.
   179  func (w *WatchTower) AddPubsubClient(pubsub pubsub.Publisher) *WatchTower {
   180  	w.pubsubClient = pubsub
   181  	return w
   182  }
   183  
   184  // Stream listens to the notification channel and spawns a new goroutine to handle each event.
   185  func (w *WatchTower) Stream(ctx context.Context, log logr.Logger) {
   186  	wg := &sync.WaitGroup{}
   187  	semaphore := make(chan struct{}, w.maxConcurrent)
   188  	defer close(semaphore)
   189  	for {
   190  		select {
   191  		case <-ctx.Done():
   192  			return
   193  		case n := <-w.listener.Notify:
   194  			wg.Add(1)
   195  			go func(n *pq.Notification, log logr.Logger) {
   196  				defer wg.Done()
   197  				semaphore <- struct{}{}
   198  				defer func() { <-semaphore }()
   199  				if n == nil || n.Extra == "" {
   200  					log.Info("ignoring empty message", "channel", n.Channel)
   201  					return
   202  				}
   203  				ev, err := event.New().Unmarshal(n.Channel, n.Extra)
   204  				if err != nil {
   205  					log.Error(err, "An error occurred unmarshaling postgres event")
   206  					return
   207  				}
   208  				payload, err := ev.ToByte()
   209  				if err != nil {
   210  					log.Error(err, "An error occurred converting postgres event to byte")
   211  					return
   212  				}
   213  				bannerEdgeID, ok := ev.BannerEdgeID.(string)
   214  				if !ok {
   215  					bannerEdgeID = ""
   216  				}
   217  				ClusterInfraclusterEdgeID, ok := ev.ClusterInfraClusterEdgeID.(string)
   218  				if !ok {
   219  					ClusterInfraclusterEdgeID = ""
   220  				}
   221  				if err := w.pubsubClient.Send(ctx, w.topicID, payload, map[string]string{
   222  					"banner_edge_id":                bannerEdgeID,
   223  					"cluster_infra_cluster_edge_id": ClusterInfraclusterEdgeID,
   224  				}); err != nil {
   225  					log.Error(err, "An error occurred publishing postgres event to pubsub")
   226  					return
   227  				}
   228  				log.Info("successfully published postgres event to pubsub", "operation", ev.Operation, "postgres_channel", n.Channel, "pubsub_topic", w.topicID)
   229  			}(n, log)
   230  			wg.Wait()
   231  		case <-time.After(w.livelinessProbe):
   232  			log.Info("no messages received within the liveliness probe limit. checking the postgres notification connection", "liveliness_probe_duration_in_minutes", w.livelinessProbe/60)
   233  			go func(log logr.Logger) {
   234  				switch err := w.listener.Ping(); err == nil {
   235  				case true:
   236  					log.Info("connection ping was successful, postgres notification connection is healthy")
   237  				default:
   238  					log.Error(err, "An error occurred pinging the postgres notification connection")
   239  				}
   240  			}(log)
   241  		}
   242  	}
   243  }
   244  

View as plain text