...
1
16
17 package versioned
18
19 import (
20 "fmt"
21
22 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23 "k8s.io/apimachinery/pkg/runtime"
24 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
25 "k8s.io/apimachinery/pkg/watch"
26 )
27
28
29
30
31
32 type Decoder struct {
33 decoder streaming.Decoder
34 embeddedDecoder runtime.Decoder
35 }
36
37
38 func NewDecoder(decoder streaming.Decoder, embeddedDecoder runtime.Decoder) *Decoder {
39 return &Decoder{
40 decoder: decoder,
41 embeddedDecoder: embeddedDecoder,
42 }
43 }
44
45
46
47 func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
48 var got metav1.WatchEvent
49 res, _, err := d.decoder.Decode(nil, &got)
50 if err != nil {
51 return "", nil, err
52 }
53 if res != &got {
54 return "", nil, fmt.Errorf("unable to decode to metav1.Event")
55 }
56 switch got.Type {
57 case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error), string(watch.Bookmark):
58 default:
59 return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
60 }
61
62 obj, err := runtime.Decode(d.embeddedDecoder, got.Object.Raw)
63 if err != nil {
64 return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
65 }
66 return watch.EventType(got.Type), obj, nil
67 }
68
69
70 func (d *Decoder) Close() {
71 d.decoder.Close()
72 }
73
View as plain text