...

Source file src/github.com/datawire/ambassador/v2/cmd/kat-server/services/grpc-echo.go

Documentation: github.com/datawire/ambassador/v2/cmd/kat-server/services

     1  package services
     2  
     3  import (
     4  	// stdlib
     5  	"bytes"
     6  	"context"
     7  	"crypto/tls"
     8  	"encoding/json"
     9  	"fmt"
    10  	"strconv"
    11  	"strings"
    12  	"time"
    13  
    14  	// third party
    15  	grpc "google.golang.org/grpc"
    16  	codes "google.golang.org/grpc/codes"
    17  	metadata "google.golang.org/grpc/metadata"
    18  	status "google.golang.org/grpc/status"
    19  
    20  	// first party (protobuf)
    21  	pb "github.com/datawire/ambassador/v2/pkg/api/kat"
    22  
    23  	// first party
    24  	"github.com/datawire/dlib/dgroup"
    25  	"github.com/datawire/dlib/dhttp"
    26  	"github.com/datawire/dlib/dlog"
    27  )
    28  
    29  // GRPC server object (all fields are required).
    30  type GRPC struct {
    31  	Port          int16
    32  	Backend       string
    33  	SecurePort    int16
    34  	SecureBackend string
    35  	Cert          string
    36  	Key           string
    37  
    38  	pb.UnsafeEchoServiceServer
    39  }
    40  
    41  // DefaultOpts sets gRPC service options.
    42  func DefaultOpts() []grpc.ServerOption {
    43  	return []grpc.ServerOption{
    44  		grpc.MaxRecvMsgSize(1024 * 1024 * 5),
    45  		grpc.MaxSendMsgSize(1024 * 1024 * 5),
    46  	}
    47  }
    48  
    49  // Start initializes the gRPC server.
    50  func (g *GRPC) Start(ctx context.Context) <-chan bool {
    51  	dlog.Printf(ctx, "GRPC: %s listening on %d/%d", g.Backend, g.Port, g.SecurePort)
    52  
    53  	grpcHandler := grpc.NewServer(DefaultOpts()...)
    54  	pb.RegisterEchoServiceServer(grpcHandler, g)
    55  
    56  	cer, err := tls.LoadX509KeyPair(g.Cert, g.Key)
    57  	if err != nil {
    58  		dlog.Error(ctx, err)
    59  		panic(err) // TODO: do something better
    60  	}
    61  
    62  	sc := &dhttp.ServerConfig{
    63  		Handler: grpcHandler,
    64  		TLSConfig: &tls.Config{
    65  			Certificates: []tls.Certificate{cer},
    66  		},
    67  	}
    68  
    69  	grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
    70  	grp.Go("cleartext", func(ctx context.Context) error {
    71  		return sc.ListenAndServe(ctx, fmt.Sprintf(":%v", g.Port))
    72  	})
    73  	grp.Go("tls", func(ctx context.Context) error {
    74  		return sc.ListenAndServeTLS(ctx, fmt.Sprintf(":%v", g.SecurePort), "", "")
    75  	})
    76  
    77  	dlog.Print(ctx, "starting gRPC echo service")
    78  
    79  	exited := make(chan bool)
    80  	go func() {
    81  		if err := grp.Wait(); err != nil {
    82  			dlog.Error(ctx, err)
    83  			panic(err) // TODO: do something better
    84  		}
    85  		close(exited)
    86  	}()
    87  	return exited
    88  }
    89  
    90  // Echo returns the an object with the HTTP context of the request.
    91  func (g *GRPC) Echo(ctx context.Context, r *pb.EchoRequest) (*pb.EchoResponse, error) {
    92  	md, ok := metadata.FromIncomingContext(ctx)
    93  	if !ok {
    94  		return nil, status.Error(codes.Code(13), "request has not valid context metadata")
    95  	}
    96  
    97  	buf := bytes.Buffer{}
    98  	buf.WriteString("metadata received: \n")
    99  	for k, v := range md {
   100  		buf.WriteString(fmt.Sprintf("%v : %s\n", k, strings.Join(v, ",")))
   101  	}
   102  	dlog.Println(ctx, buf.String())
   103  
   104  	request := &pb.Request{
   105  		Headers: make(map[string]string),
   106  	}
   107  
   108  	response := &pb.Response{
   109  		Headers: make(map[string]string),
   110  	}
   111  
   112  	// Sets request headers.
   113  	for k, v := range md {
   114  		request.Headers[k] = strings.Join(v, ",")
   115  		response.Headers[k] = strings.Join(v, ",")
   116  	}
   117  
   118  	// Set default backend and assume we're the clear side of the world.
   119  	backend := g.Backend
   120  
   121  	// Checks scheme and set TLS info.
   122  	if len(md["x-forwarded-proto"]) > 0 && md["x-forwarded-proto"][0] == "https" {
   123  		// We're the secure side of the world, I guess.
   124  		backend = g.SecureBackend
   125  		request.Tls = &pb.TLS{
   126  			Enabled: true,
   127  		}
   128  	}
   129  
   130  	// Check header and delay response.
   131  	if h, ok := md["Requested-Backend-Delay"]; ok {
   132  		if v, err := strconv.Atoi(h[0]); err == nil {
   133  			dlog.Printf(ctx, "Delaying response by %v ms", v)
   134  			time.Sleep(time.Duration(v) * time.Millisecond)
   135  		}
   136  	}
   137  
   138  	// Set response date header.
   139  	response.Headers["date"] = time.Now().Format(time.RFC1123)
   140  
   141  	// Sets client requested metadata.
   142  	if len(md["requested-headers"]) > 0 {
   143  		for _, v := range md["requested-headers"] {
   144  			if len(md[v]) > 0 {
   145  				s := strings.Join(md[v], ",")
   146  				response.Headers[v] = s
   147  				p := metadata.Pairs(v, s)
   148  				if err := grpc.SendHeader(ctx, p); err != nil {
   149  					return nil, err
   150  				}
   151  			}
   152  		}
   153  	}
   154  
   155  	// Sets grpc response.
   156  	echoRES := &pb.EchoResponse{
   157  		Backend:  backend,
   158  		Request:  request,
   159  		Response: response,
   160  	}
   161  
   162  	// Set a log message.
   163  	if data, err := json.MarshalIndent(echoRES, "", "  "); err == nil {
   164  		dlog.Printf(ctx, "setting response: %s\n", string(data))
   165  	}
   166  
   167  	// Checks if requested-status is a valid and not OK gRPC status.
   168  	if len(md["requested-status"]) > 0 {
   169  		val, err := strconv.Atoi(md["requested-status"][0])
   170  		if err == nil {
   171  			if val < 18 || val > 0 {
   172  				// Return response and the not OK status.
   173  				return echoRES, status.Error(codes.Code(val), "requested-error")
   174  			}
   175  		}
   176  	}
   177  
   178  	// Returns response and the OK status.
   179  	return echoRES, nil
   180  }
   181  

View as plain text