...
1 package octtrpc
2
3 import (
4 "context"
5 "encoding/base64"
6 "strings"
7
8 "github.com/containerd/ttrpc"
9 "go.opencensus.io/trace"
10 "go.opencensus.io/trace/propagation"
11 "google.golang.org/grpc/codes"
12 "google.golang.org/grpc/status"
13
14 "github.com/Microsoft/hcsshim/internal/oc"
15 )
16
17 type options struct {
18 sampler trace.Sampler
19 }
20
21
22
23 type Option func(*options)
24
25
26
27 func WithSampler(sampler trace.Sampler) Option {
28 return func(opts *options) {
29 opts.sampler = sampler
30 }
31 }
32
33 const metadataTraceContextKey = "octtrpc.tracecontext"
34
35 func convertMethodName(name string) string {
36 name = strings.TrimPrefix(name, "/")
37 name = strings.Replace(name, "/", ".", -1)
38 return name
39 }
40
41 func getParentSpanFromContext(ctx context.Context) (trace.SpanContext, bool) {
42 md, _ := ttrpc.GetMetadata(ctx)
43 traceContext := md[metadataTraceContextKey]
44 if len(traceContext) > 0 {
45 traceContextBinary, _ := base64.StdEncoding.DecodeString(traceContext[0])
46 return propagation.FromBinary(traceContextBinary)
47 }
48 return trace.SpanContext{}, false
49 }
50
51 func setSpanStatus(span *trace.Span, err error) {
52
53 if err != nil {
54 s, ok := status.FromError(err)
55 if ok {
56 span.SetStatus(trace.Status{Code: int32(s.Code()), Message: s.Message()})
57 } else {
58 span.SetStatus(trace.Status{Code: int32(codes.Internal), Message: err.Error()})
59 }
60 }
61 }
62
63
64
65
66 func ClientInterceptor(opts ...Option) ttrpc.UnaryClientInterceptor {
67 o := options{
68 sampler: oc.DefaultSampler,
69 }
70 for _, opt := range opts {
71 opt(&o)
72 }
73 return func(ctx context.Context, req *ttrpc.Request, resp *ttrpc.Response, info *ttrpc.UnaryClientInfo, inv ttrpc.Invoker) (err error) {
74 ctx, span := oc.StartSpan(
75 ctx,
76 convertMethodName(info.FullMethod),
77 trace.WithSampler(o.sampler),
78 oc.WithClientSpanKind)
79 defer span.End()
80 defer setSpanStatus(span, err)
81
82 spanContextBinary := propagation.Binary(span.SpanContext())
83 b64 := base64.StdEncoding.EncodeToString(spanContextBinary)
84 kvp := &ttrpc.KeyValue{Key: metadataTraceContextKey, Value: b64}
85 req.Metadata = append(req.Metadata, kvp)
86
87 return inv(ctx, req, resp)
88 }
89 }
90
91
92
93
94 func ServerInterceptor(opts ...Option) ttrpc.UnaryServerInterceptor {
95 o := options{
96 sampler: oc.DefaultSampler,
97 }
98 for _, opt := range opts {
99 opt(&o)
100 }
101 return func(ctx context.Context, unmarshal ttrpc.Unmarshaler, info *ttrpc.UnaryServerInfo, method ttrpc.Method) (_ interface{}, err error) {
102 name := convertMethodName(info.FullMethod)
103
104 var span *trace.Span
105 opts := []trace.StartOption{trace.WithSampler(o.sampler), oc.WithServerSpanKind}
106 parent, ok := getParentSpanFromContext(ctx)
107 if ok {
108 ctx, span = oc.StartSpanWithRemoteParent(ctx, name, parent, opts...)
109 } else {
110 ctx, span = oc.StartSpan(ctx, name, opts...)
111 }
112 defer span.End()
113 defer setSpanStatus(span, err)
114
115 return method(ctx, unmarshal)
116 }
117 }
118
View as plain text