...
1 package grpc
2
3 import (
4 "fmt"
5 "net"
6 "os"
7 "os/signal"
8
9 "github.com/go-logr/logr"
10
11 "edge-infra.dev/pkg/lib/fog"
12
13 syscall "golang.org/x/sys/unix"
14
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/reflection"
17
18 "edge-infra.dev/pkg/edge/datasync/chirp/persister"
19 "edge-infra.dev/pkg/edge/datasync/chirp/server/http"
20 "edge-infra.dev/pkg/edge/datasync/internal/config"
21 protos "edge-infra.dev/pkg/edge/datasync/internal/protos/upload"
22 )
23
24 type Server struct {
25 server *grpc.Server
26 logger logr.Logger
27 config *config.Config
28 }
29
30 func NewServer(msgPersister persister.MessagePersister, cfg *config.Config) *Server {
31 server := grpc.NewServer()
32
33 messagingService := NewMessagingService(msgPersister, cfg)
34 protos.RegisterMessagingServiceServer(server, messagingService)
35
36 reflection.Register(server)
37
38 return &Server{server: server, logger: fog.New(), config: cfg}
39 }
40
41 func (s *Server) ListenAndServe() {
42 port := s.config.GrpcPort
43
44 lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
45 if err != nil {
46 s.logger.Error(err, "failed to listen on tcp address")
47 } else {
48 http.SetIsGRPCServerReady(true)
49 }
50
51 term := make(chan os.Signal, 1)
52 signal.Notify(term, syscall.SIGINT, syscall.SIGTERM)
53
54 go func() {
55 <-term
56 s.logger.Info("got termination signal. attempting graceful shutdown")
57 s.server.GracefulStop()
58 }()
59
60 s.logger.Info("starting gRPC server")
61
62 err = s.server.Serve(lis)
63
64 if err != nil {
65 s.logger.Error(err, "failed to serve gRPC")
66 }
67
68 s.logger.Info("gRPC server shutdown completed")
69 }
70
View as plain text