...
1
16
17 package versioned_test
18
19 import (
20 "encoding/json"
21 "fmt"
22 "io"
23 "testing"
24 "time"
25
26 "k8s.io/api/core/v1"
27 apiequality "k8s.io/apimachinery/pkg/api/equality"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json"
31 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apimachinery/pkg/watch"
34 "k8s.io/client-go/kubernetes/scheme"
35 restclientwatch "k8s.io/client-go/rest/watch"
36 )
37
38
39 func getDecoder() runtime.Decoder {
40 jsonSerializer := runtimejson.NewSerializer(runtimejson.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, false)
41 directCodecFactory := scheme.Codecs.WithoutConversion()
42 return directCodecFactory.DecoderToVersion(jsonSerializer, v1.SchemeGroupVersion)
43 }
44
45 func TestDecoder(t *testing.T) {
46 table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error, watch.Bookmark}
47
48 for _, eventType := range table {
49 out, in := io.Pipe()
50
51 decoder := restclientwatch.NewDecoder(streaming.NewDecoder(out, getDecoder()), getDecoder())
52 expect := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
53 encoder := json.NewEncoder(in)
54 eType := eventType
55 errc := make(chan error)
56
57 go func() {
58 data, err := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expect)
59 if err != nil {
60 errc <- fmt.Errorf("Unexpected error %v", err)
61 return
62 }
63 event := metav1.WatchEvent{
64 Type: string(eType),
65 Object: runtime.RawExtension{Raw: json.RawMessage(data)},
66 }
67 if err := encoder.Encode(&event); err != nil {
68 t.Errorf("Unexpected error %v", err)
69 }
70 in.Close()
71 }()
72
73 done := make(chan struct{})
74 go func() {
75 action, got, err := decoder.Decode()
76 if err != nil {
77 errc <- fmt.Errorf("Unexpected error %v", err)
78 return
79 }
80 if e, a := eType, action; e != a {
81 t.Errorf("Expected %v, got %v", e, a)
82 }
83 if e, a := expect, got; !apiequality.Semantic.DeepDerivative(e, a) {
84 t.Errorf("Expected %v, got %v", e, a)
85 }
86 t.Logf("Exited read")
87 close(done)
88 }()
89 select {
90 case err := <-errc:
91 t.Fatal(err)
92 case <-done:
93 }
94
95 done = make(chan struct{})
96 go func() {
97 _, _, err := decoder.Decode()
98 if err == nil {
99 t.Errorf("Unexpected nil error")
100 }
101 close(done)
102 }()
103 <-done
104
105 decoder.Close()
106 }
107 }
108
109 func TestDecoder_SourceClose(t *testing.T) {
110 out, in := io.Pipe()
111 decoder := restclientwatch.NewDecoder(streaming.NewDecoder(out, getDecoder()), getDecoder())
112
113 done := make(chan struct{})
114
115 go func() {
116 _, _, err := decoder.Decode()
117 if err == nil {
118 t.Errorf("Unexpected nil error")
119 }
120 close(done)
121 }()
122
123 in.Close()
124
125 select {
126 case <-done:
127 break
128 case <-time.After(wait.ForeverTestTimeout):
129 t.Error("Timeout")
130 }
131 }
132
View as plain text