...
1
16
17
18
19 package streaming
20
21 import (
22 "bytes"
23 "fmt"
24 "io"
25
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/runtime/schema"
28 )
29
30
31 type Encoder interface {
32
33
34 Encode(obj runtime.Object) error
35 }
36
37
38 type Decoder interface {
39
40 Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error)
41
42 Close() error
43 }
44
45
46 type Serializer interface {
47 NewEncoder(w io.Writer) Encoder
48 NewDecoder(r io.ReadCloser) Decoder
49 }
50
51 type decoder struct {
52 reader io.ReadCloser
53 decoder runtime.Decoder
54 buf []byte
55 maxBytes int
56 resetRead bool
57 }
58
59
60
61
62 func NewDecoder(r io.ReadCloser, d runtime.Decoder) Decoder {
63 return &decoder{
64 reader: r,
65 decoder: d,
66 buf: make([]byte, 1024),
67 maxBytes: 16 * 1024 * 1024,
68 }
69 }
70
71 var ErrObjectTooLarge = fmt.Errorf("object to decode was longer than maximum allowed size")
72
73
74 func (d *decoder) Decode(defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
75 base := 0
76 for {
77 n, err := d.reader.Read(d.buf[base:])
78 if err == io.ErrShortBuffer {
79 if n == 0 {
80 return nil, nil, fmt.Errorf("got short buffer with n=0, base=%d, cap=%d", base, cap(d.buf))
81 }
82 if d.resetRead {
83 continue
84 }
85
86 if len(d.buf) < d.maxBytes {
87 base += n
88 d.buf = append(d.buf, make([]byte, len(d.buf))...)
89 continue
90 }
91
92 d.resetRead = true
93 return nil, nil, ErrObjectTooLarge
94 }
95 if err != nil {
96 return nil, nil, err
97 }
98 if d.resetRead {
99
100 d.resetRead = false
101 continue
102 }
103 base += n
104 break
105 }
106 return d.decoder.Decode(d.buf[:base], defaults, into)
107 }
108
109 func (d *decoder) Close() error {
110 return d.reader.Close()
111 }
112
113 type encoder struct {
114 writer io.Writer
115 encoder runtime.Encoder
116 buf *bytes.Buffer
117 }
118
119
120 func NewEncoder(w io.Writer, e runtime.Encoder) Encoder {
121 return &encoder{
122 writer: w,
123 encoder: e,
124 buf: &bytes.Buffer{},
125 }
126 }
127
128
129 func (e *encoder) Encode(obj runtime.Object) error {
130 if err := e.encoder.Encode(obj, e.buf); err != nil {
131 return err
132 }
133 _, err := e.writer.Write(e.buf.Bytes())
134 e.buf.Reset()
135 return err
136 }
137
View as plain text