...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/worker/message-worker.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp/worker

     1  package worker
     2  
     3  import (
     4  	"fmt"
     5  	"time"
     6  
     7  	"github.com/go-logr/logr"
     8  
     9  	"edge-infra.dev/pkg/lib/fog"
    10  
    11  	"edge-infra.dev/pkg/edge/datasync/chirp/provider"
    12  	"edge-infra.dev/pkg/edge/datasync/chirp/sender"
    13  
    14  	"edge-infra.dev/pkg/edge/datasync/internal/config"
    15  )
    16  
    17  type messageWorker struct {
    18  	partition       int
    19  	messageProvider provider.MessageProvider
    20  	messageSender   sender.MessageSender
    21  	logger          logr.Logger
    22  	config          *config.Config
    23  }
    24  
    25  func NewMessageWorker(partition int, msgProvider provider.MessageProvider, msgSender sender.MessageSender, cfg *config.Config) Worker {
    26  	return &messageWorker{
    27  		partition:       partition,
    28  		messageProvider: msgProvider,
    29  		messageSender:   msgSender,
    30  		logger:          fog.New(),
    31  		config:          cfg,
    32  	}
    33  }
    34  
    35  func (w *messageWorker) DoWork() {
    36  	shouldRun := w.config.RunMessageWorker
    37  	delayDuration := time.Duration(w.config.WorkerPolling) * time.Second
    38  
    39  	for shouldRun {
    40  		messages := w.messageProvider.GetMessages()
    41  		if len(messages) > 0 {
    42  			sentMessages := w.messageSender.Send(messages)
    43  			if len(sentMessages) > 0 {
    44  				for _, msg := range sentMessages {
    45  					message := fmt.Sprintf("successfully sent message ID: %s", msg.ID)
    46  					w.logger.Info(message)
    47  				}
    48  				//  w.logger.WithFields(map[string]interface{}{
    49  				//	"source":    "message-worker.DoWork",
    50  				//	"sent":      len(sentMessages),
    51  				//	"worker-id": w.partition,
    52  				//  }).Info("successfully sent messages")
    53  			}
    54  		} else {
    55  			//w.logger.Info("outbox is empty")
    56  			// logger.WithFields(map[string]interface{}{
    57  			// 	"source":    "message-worker.DoWork",
    58  			// 	"worker-id": w.partition,
    59  			// }).Trace("outbox is empty")
    60  
    61  			time.Sleep(delayDuration)
    62  		}
    63  	}
    64  }
    65  

View as plain text