...
1
2
3 package main
4
5 import (
6 "context"
7 "fmt"
8
9 "github.com/Microsoft/hcsshim/internal/oc"
10 "github.com/containerd/containerd/namespaces"
11 shim "github.com/containerd/containerd/runtime/v2/shim"
12 "go.opencensus.io/trace"
13 )
14
15 type publisher interface {
16 publishEvent(ctx context.Context, topic string, event interface{}) (err error)
17 }
18
19 type eventPublisher struct {
20 namespace string
21 remotePublisher *shim.RemoteEventsPublisher
22 }
23
24 var _ publisher = &eventPublisher{}
25
26 func newEventPublisher(address, namespace string) (*eventPublisher, error) {
27 p, err := shim.NewPublisher(address)
28 if err != nil {
29 return nil, err
30 }
31 return &eventPublisher{
32 namespace: namespace,
33 remotePublisher: p,
34 }, nil
35 }
36
37 func (e *eventPublisher) close() error {
38 return e.remotePublisher.Close()
39 }
40
41 func (e *eventPublisher) publishEvent(ctx context.Context, topic string, event interface{}) (err error) {
42 ctx, span := oc.StartSpan(ctx, "publishEvent")
43 defer span.End()
44 defer func() { oc.SetSpanStatus(span, err) }()
45 span.AddAttributes(
46 trace.StringAttribute("topic", topic),
47 trace.StringAttribute("event", fmt.Sprintf("%+v", event)))
48
49 if e == nil {
50 return nil
51 }
52
53 return e.remotePublisher.Publish(namespaces.WithNamespace(ctx, e.namespace), topic, event)
54 }
55
View as plain text