1
16
17 package protobuf
18
19 import (
20 "bytes"
21 "fmt"
22 "io"
23 "net/http"
24 "reflect"
25
26 "github.com/gogo/protobuf/proto"
27
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/runtime/schema"
31 "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
32 "k8s.io/apimachinery/pkg/util/framer"
33 "k8s.io/klog/v2"
34 )
35
36 var (
37
38
39
40
41
42
43
44
45 protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
46 )
47
48 type errNotMarshalable struct {
49 t reflect.Type
50 }
51
52 func (e errNotMarshalable) Error() string {
53 return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
54 }
55
56 func (e errNotMarshalable) Status() metav1.Status {
57 return metav1.Status{
58 Status: metav1.StatusFailure,
59 Code: http.StatusNotAcceptable,
60 Reason: metav1.StatusReason("NotAcceptable"),
61 Message: e.Error(),
62 }
63 }
64
65
66 func IsNotMarshalable(err error) bool {
67 _, ok := err.(errNotMarshalable)
68 return err != nil && ok
69 }
70
71
72
73
74 func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
75 return &Serializer{
76 prefix: protoEncodingPrefix,
77 creater: creater,
78 typer: typer,
79 }
80 }
81
82
83 type Serializer struct {
84 prefix []byte
85 creater runtime.ObjectCreater
86 typer runtime.ObjectTyper
87 }
88
89 var _ runtime.Serializer = &Serializer{}
90 var _ runtime.EncoderWithAllocator = &Serializer{}
91 var _ recognizer.RecognizingDecoder = &Serializer{}
92
93 const serializerIdentifier runtime.Identifier = "protobuf"
94
95
96
97
98
99
100
101 func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
102 prefixLen := len(s.prefix)
103 switch {
104 case len(originalData) == 0:
105
106 return nil, nil, fmt.Errorf("empty data")
107 case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
108 return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
109 case len(originalData) == prefixLen:
110
111 return nil, nil, fmt.Errorf("empty body")
112 }
113
114 data := originalData[prefixLen:]
115 unk := runtime.Unknown{}
116 if err := unk.Unmarshal(data); err != nil {
117 return nil, nil, err
118 }
119
120 actual := unk.GroupVersionKind()
121 copyKindDefaults(&actual, gvk)
122
123 if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
124 *intoUnknown = unk
125 if ok, _, _ := s.RecognizesData(unk.Raw); ok {
126 intoUnknown.ContentType = runtime.ContentTypeProtobuf
127 }
128 return intoUnknown, &actual, nil
129 }
130
131 if into != nil {
132 types, _, err := s.typer.ObjectKinds(into)
133 switch {
134 case runtime.IsNotRegisteredError(err):
135 pb, ok := into.(proto.Message)
136 if !ok {
137 return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
138 }
139 if err := proto.Unmarshal(unk.Raw, pb); err != nil {
140 return nil, &actual, err
141 }
142 return into, &actual, nil
143 case err != nil:
144 return nil, &actual, err
145 default:
146 copyKindDefaults(&actual, &types[0])
147
148
149
150 if len(actual.Version) == 0 && len(actual.Group) == 0 {
151 actual.Group = types[0].Group
152 }
153 }
154 }
155
156 if len(actual.Kind) == 0 {
157 return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
158 }
159 if len(actual.Version) == 0 {
160 return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
161 }
162
163 return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
164 }
165
166
167
168 func (s *Serializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
169 return s.encode(obj, w, memAlloc)
170 }
171
172
173 func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
174 return s.encode(obj, w, &runtime.SimpleAllocator{})
175 }
176
177 func (s *Serializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
178 if co, ok := obj.(runtime.CacheableObject); ok {
179 return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
180 }
181 return s.doEncode(obj, w, memAlloc)
182 }
183
184 func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
185 if memAlloc == nil {
186 klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
187 memAlloc = &runtime.SimpleAllocator{}
188 }
189 prefixSize := uint64(len(s.prefix))
190
191 var unk runtime.Unknown
192 switch t := obj.(type) {
193 case *runtime.Unknown:
194 estimatedSize := prefixSize + uint64(t.Size())
195 data := memAlloc.Allocate(estimatedSize)
196 i, err := t.MarshalTo(data[prefixSize:])
197 if err != nil {
198 return err
199 }
200 copy(data, s.prefix)
201 _, err = w.Write(data[:prefixSize+uint64(i)])
202 return err
203 default:
204 kind := obj.GetObjectKind().GroupVersionKind()
205 unk = runtime.Unknown{
206 TypeMeta: runtime.TypeMeta{
207 Kind: kind.Kind,
208 APIVersion: kind.GroupVersion().String(),
209 },
210 }
211 }
212
213 switch t := obj.(type) {
214 case bufferedMarshaller:
215
216
217 encodedSize := uint64(t.Size())
218 estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
219 data := memAlloc.Allocate(estimatedSize)
220
221 i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
222 if err != nil {
223 return err
224 }
225
226 copy(data, s.prefix)
227
228 _, err = w.Write(data[:prefixSize+uint64(i)])
229 return err
230
231 case proto.Marshaler:
232
233 data, err := t.Marshal()
234 if err != nil {
235 return err
236 }
237 unk.Raw = data
238
239 estimatedSize := prefixSize + uint64(unk.Size())
240 data = memAlloc.Allocate(estimatedSize)
241
242 i, err := unk.MarshalTo(data[prefixSize:])
243 if err != nil {
244 return err
245 }
246
247 copy(data, s.prefix)
248
249 _, err = w.Write(data[:prefixSize+uint64(i)])
250 return err
251
252 default:
253
254 return errNotMarshalable{reflect.TypeOf(obj)}
255 }
256 }
257
258
259 func (s *Serializer) Identifier() runtime.Identifier {
260 return serializerIdentifier
261 }
262
263
264 func (s *Serializer) RecognizesData(data []byte) (bool, bool, error) {
265 return bytes.HasPrefix(data, s.prefix), false, nil
266 }
267
268
269 func copyKindDefaults(dst, src *schema.GroupVersionKind) {
270 if src == nil {
271 return
272 }
273
274 if len(dst.Kind) == 0 {
275 dst.Kind = src.Kind
276 }
277 if len(dst.Version) == 0 && len(src.Version) > 0 {
278 dst.Group = src.Group
279 dst.Version = src.Version
280 }
281 }
282
283
284
285 type bufferedMarshaller interface {
286 proto.Sizer
287 runtime.ProtobufMarshaller
288 }
289
290
291 type bufferedReverseMarshaller interface {
292 proto.Sizer
293 runtime.ProtobufReverseMarshaller
294 }
295
296
297
298
299 func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
300 size := uint64(unk.Size())
301
302
303 size += 1 + 8 + byteSize
304 return size
305 }
306
307
308
309
310
311
312 func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *RawSerializer {
313 return &RawSerializer{
314 creater: creater,
315 typer: typer,
316 }
317 }
318
319
320
321 type RawSerializer struct {
322 creater runtime.ObjectCreater
323 typer runtime.ObjectTyper
324 }
325
326 var _ runtime.Serializer = &RawSerializer{}
327
328 const rawSerializerIdentifier runtime.Identifier = "raw-protobuf"
329
330
331
332
333
334
335
336 func (s *RawSerializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
337 if into == nil {
338 return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
339 }
340
341 if len(originalData) == 0 {
342
343 return nil, nil, fmt.Errorf("empty data")
344 }
345 data := originalData
346
347 actual := &schema.GroupVersionKind{}
348 copyKindDefaults(actual, gvk)
349
350 if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
351 intoUnknown.Raw = data
352 intoUnknown.ContentEncoding = ""
353 intoUnknown.ContentType = runtime.ContentTypeProtobuf
354 intoUnknown.SetGroupVersionKind(*actual)
355 return intoUnknown, actual, nil
356 }
357
358 types, _, err := s.typer.ObjectKinds(into)
359 switch {
360 case runtime.IsNotRegisteredError(err):
361 pb, ok := into.(proto.Message)
362 if !ok {
363 return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
364 }
365 if err := proto.Unmarshal(data, pb); err != nil {
366 return nil, actual, err
367 }
368 return into, actual, nil
369 case err != nil:
370 return nil, actual, err
371 default:
372 copyKindDefaults(actual, &types[0])
373
374
375
376 if len(actual.Version) == 0 && len(actual.Group) == 0 {
377 actual.Group = types[0].Group
378 }
379 }
380
381 if len(actual.Kind) == 0 {
382 return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
383 }
384 if len(actual.Version) == 0 {
385 return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
386 }
387
388 return unmarshalToObject(s.typer, s.creater, actual, into, data)
389 }
390
391
392 func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *schema.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *schema.GroupVersionKind, error) {
393
394 obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
395 if err != nil {
396 return nil, actual, err
397 }
398
399 pb, ok := obj.(proto.Message)
400 if !ok {
401 return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
402 }
403 if err := proto.Unmarshal(data, pb); err != nil {
404 return nil, actual, err
405 }
406 if actual != nil {
407 obj.GetObjectKind().SetGroupVersionKind(*actual)
408 }
409 return obj, actual, nil
410 }
411
412
413 func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
414 return s.encode(obj, w, &runtime.SimpleAllocator{})
415 }
416
417
418
419 func (s *RawSerializer) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
420 return s.encode(obj, w, memAlloc)
421 }
422
423 func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
424 if co, ok := obj.(runtime.CacheableObject); ok {
425 return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w)
426 }
427 return s.doEncode(obj, w, memAlloc)
428 }
429
430 func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
431 if memAlloc == nil {
432 klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator")
433 memAlloc = &runtime.SimpleAllocator{}
434 }
435 switch t := obj.(type) {
436 case bufferedReverseMarshaller:
437
438
439 encodedSize := uint64(t.Size())
440 data := memAlloc.Allocate(encodedSize)
441
442 n, err := t.MarshalToSizedBuffer(data)
443 if err != nil {
444 return err
445 }
446 _, err = w.Write(data[:n])
447 return err
448
449 case bufferedMarshaller:
450
451
452 encodedSize := uint64(t.Size())
453 data := memAlloc.Allocate(encodedSize)
454
455 n, err := t.MarshalTo(data)
456 if err != nil {
457 return err
458 }
459 _, err = w.Write(data[:n])
460 return err
461
462 case proto.Marshaler:
463
464 data, err := t.Marshal()
465 if err != nil {
466 return err
467 }
468 _, err = w.Write(data)
469 return err
470
471 default:
472 return errNotMarshalable{reflect.TypeOf(obj)}
473 }
474 }
475
476
477 func (s *RawSerializer) Identifier() runtime.Identifier {
478 return rawSerializerIdentifier
479 }
480
481
482 var LengthDelimitedFramer = lengthDelimitedFramer{}
483
484
485 type lengthDelimitedFramer struct{}
486
487
488 func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
489 return framer.NewLengthDelimitedFrameWriter(w)
490 }
491
492
493 func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
494 return framer.NewLengthDelimitedFrameReader(r)
495 }
496
View as plain text