...
1
16
17 package checkpointmanager
18
19 import (
20 "fmt"
21 "sync"
22
23 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
24 utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
25 utilfs "k8s.io/kubernetes/pkg/util/filesystem"
26 )
27
28
29 type Checkpoint interface {
30 MarshalCheckpoint() ([]byte, error)
31 UnmarshalCheckpoint(blob []byte) error
32 VerifyChecksum() error
33 }
34
35
36 type CheckpointManager interface {
37
38
39 CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error
40
41 GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error
42
43 RemoveCheckpoint(checkpointKey string) error
44
45 ListCheckpoints() ([]string, error)
46 }
47
48
49 type impl struct {
50 path string
51 store utilstore.Store
52 mutex sync.Mutex
53 }
54
55
56 func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) {
57 fstore, err := utilstore.NewFileStore(checkpointDir, &utilfs.DefaultFs{})
58 if err != nil {
59 return nil, err
60 }
61
62 return &impl{path: checkpointDir, store: fstore}, nil
63 }
64
65
66 func (manager *impl) CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
67 manager.mutex.Lock()
68 defer manager.mutex.Unlock()
69 blob, err := checkpoint.MarshalCheckpoint()
70 if err != nil {
71 return err
72 }
73 return manager.store.Write(checkpointKey, blob)
74 }
75
76
77 func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
78 manager.mutex.Lock()
79 defer manager.mutex.Unlock()
80 blob, err := manager.store.Read(checkpointKey)
81 if err != nil {
82 if err == utilstore.ErrKeyNotFound {
83 return errors.ErrCheckpointNotFound
84 }
85 return err
86 }
87 err = checkpoint.UnmarshalCheckpoint(blob)
88 if err == nil {
89 err = checkpoint.VerifyChecksum()
90 }
91 return err
92 }
93
94
95 func (manager *impl) RemoveCheckpoint(checkpointKey string) error {
96 manager.mutex.Lock()
97 defer manager.mutex.Unlock()
98 return manager.store.Delete(checkpointKey)
99 }
100
101
102 func (manager *impl) ListCheckpoints() ([]string, error) {
103 manager.mutex.Lock()
104 defer manager.mutex.Unlock()
105 keys, err := manager.store.List()
106 if err != nil {
107 return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err)
108 }
109 return keys, nil
110 }
111
View as plain text