...
1 package events
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/containerd/containerd"
10 eventtypes "github.com/containerd/containerd/api/events"
11 "github.com/containerd/containerd/containers"
12 ctrevents "github.com/containerd/containerd/events"
13 typeurl "github.com/containerd/typeurl/v2"
14
15 criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
16
17 devicecontainers "edge-infra.dev/pkg/sds/devices/agent/containers"
18
19 "edge-infra.dev/pkg/sds/devices/class"
20 "edge-infra.dev/pkg/sds/devices/logger"
21 )
22
23
24 func ContainerEventConstructor(containers map[string]*containers.Container, ctrClient *containerd.Client, runtimeClient criruntime.RuntimeServiceClient) func(ctx context.Context, ctrEvent *ctrevents.Envelope) (DeviceEvent, error) {
25 return func(ctx context.Context, ctrEvent *ctrevents.Envelope) (DeviceEvent, error) {
26 event, err := typeurl.UnmarshalAny(ctrEvent.Event)
27 if err != nil {
28 return nil, fmt.Errorf("error reading container event: %w", err)
29 }
30
31 log := logger.FromContext(ctx)
32 if createEvent, ok := event.(*eventtypes.ContainerCreate); ok {
33 log.Debug("container was created", "containerID", createEvent.ID)
34 ctr, err := devicecontainers.FetchContainer(ctx, ctrClient, runtimeClient, createEvent.ID)
35 if err != nil {
36 log.Error("could not fetch container", "containerID", createEvent.ID, "error", err)
37 return nil, nil
38 }
39 if ctr == nil {
40 return nil, nil
41 }
42
43 containers[ctr.ID] = ctr
44 event, err := NewContainerEvent(ctr)
45 if err != nil {
46 return nil, fmt.Errorf("error generating container event: %w", err)
47 }
48 return event, nil
49 }
50
51 if deleteEvent, ok := event.(*eventtypes.ContainerDelete); ok {
52 log.Debug("container was deleted", "containerID", deleteEvent.ID)
53 delete(containers, deleteEvent.ID)
54 }
55 return nil, nil
56 }
57 }
58
59
60 func NewContainerEvent(container *containers.Container) (DeviceEvent, error) {
61 if container == nil {
62 return nil, errors.New("container cannot be nil")
63 }
64 container.Labels[class.DefaultClass] = requested
65 return &containerEvent{
66 event: &event{
67 containers: map[string]*containers.Container{
68 container.ID: container,
69 },
70 postHookFn: func(context.Context) {},
71 timestamp: time.Now(),
72 },
73 }, nil
74 }
75
View as plain text