...

Source file src/edge-infra.dev/pkg/sds/devices/agent/events/containers.go

Documentation: edge-infra.dev/pkg/sds/devices/agent/events

     1  package events
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"time"
     8  
     9  	"github.com/containerd/containerd"
    10  	eventtypes "github.com/containerd/containerd/api/events"
    11  	"github.com/containerd/containerd/containers"
    12  	ctrevents "github.com/containerd/containerd/events"
    13  	typeurl "github.com/containerd/typeurl/v2"
    14  
    15  	criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
    16  
    17  	devicecontainers "edge-infra.dev/pkg/sds/devices/agent/containers"
    18  
    19  	"edge-infra.dev/pkg/sds/devices/class"
    20  	"edge-infra.dev/pkg/sds/devices/logger"
    21  )
    22  
    23  // ContainerEventConstructor returns a constructor function that instantiates a new device container event for given container event
    24  func ContainerEventConstructor(containers map[string]*containers.Container, ctrClient *containerd.Client, runtimeClient criruntime.RuntimeServiceClient) func(ctx context.Context, ctrEvent *ctrevents.Envelope) (DeviceEvent, error) {
    25  	return func(ctx context.Context, ctrEvent *ctrevents.Envelope) (DeviceEvent, error) {
    26  		event, err := typeurl.UnmarshalAny(ctrEvent.Event)
    27  		if err != nil {
    28  			return nil, fmt.Errorf("error reading container event: %w", err)
    29  		}
    30  
    31  		log := logger.FromContext(ctx)
    32  		if createEvent, ok := event.(*eventtypes.ContainerCreate); ok {
    33  			log.Debug("container was created", "containerID", createEvent.ID)
    34  			ctr, err := devicecontainers.FetchContainer(ctx, ctrClient, runtimeClient, createEvent.ID)
    35  			if err != nil {
    36  				log.Error("could not fetch container", "containerID", createEvent.ID, "error", err)
    37  				return nil, nil
    38  			}
    39  			if ctr == nil {
    40  				return nil, nil
    41  			}
    42  
    43  			containers[ctr.ID] = ctr
    44  			event, err := NewContainerEvent(ctr)
    45  			if err != nil {
    46  				return nil, fmt.Errorf("error generating container event: %w", err)
    47  			}
    48  			return event, nil
    49  		}
    50  
    51  		if deleteEvent, ok := event.(*eventtypes.ContainerDelete); ok {
    52  			log.Debug("container was deleted", "containerID", deleteEvent.ID)
    53  			delete(containers, deleteEvent.ID)
    54  		}
    55  		return nil, nil
    56  	}
    57  }
    58  
    59  // NewContainerEvent returns a new device event from created container
    60  func NewContainerEvent(container *containers.Container) (DeviceEvent, error) {
    61  	if container == nil {
    62  		return nil, errors.New("container cannot be nil")
    63  	}
    64  	container.Labels[class.DefaultClass] = requested
    65  	return &containerEvent{
    66  		event: &event{
    67  			containers: map[string]*containers.Container{
    68  				container.ID: container,
    69  			},
    70  			postHookFn: func(context.Context) {},
    71  			timestamp:  time.Now(),
    72  		},
    73  	}, nil
    74  }
    75  

View as plain text