...
1
16
17 package fakeclient
18
19 import (
20 "context"
21 "testing"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/apimachinery/pkg/watch"
28 "k8s.io/client-go/informers"
29 "k8s.io/client-go/kubernetes/fake"
30 clienttesting "k8s.io/client-go/testing"
31 "k8s.io/client-go/tools/cache"
32 )
33
34
35 func TestFakeClient(t *testing.T) {
36 ctx, cancel := context.WithCancel(context.Background())
37 defer cancel()
38
39 watcherStarted := make(chan struct{})
40
41 client := fake.NewSimpleClientset()
42
43 client.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
44 gvr := action.GetResource()
45 ns := action.GetNamespace()
46 watch, err := client.Tracker().Watch(gvr, ns)
47 if err != nil {
48 return false, nil, err
49 }
50 close(watcherStarted)
51 return true, watch, nil
52 })
53
54
55 pods := make(chan *v1.Pod, 1)
56 informers := informers.NewSharedInformerFactory(client, 0)
57 podInformer := informers.Core().V1().Pods().Informer()
58 podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
59 AddFunc: func(obj interface{}) {
60 pod := obj.(*v1.Pod)
61 t.Logf("pod added: %s/%s", pod.Namespace, pod.Name)
62 pods <- pod
63 },
64 })
65
66
67 informers.Start(ctx.Done())
68
69
70
71
72 cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)
73
74
75
76
77
78
79
80
81
82 <-watcherStarted
83
84 p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "my-pod"}}
85 _, err := client.CoreV1().Pods("test-ns").Create(context.TODO(), p, metav1.CreateOptions{})
86 if err != nil {
87 t.Fatalf("error injecting pod add: %v", err)
88 }
89
90 select {
91 case pod := <-pods:
92 t.Logf("Got pod from channel: %s/%s", pod.Namespace, pod.Name)
93 case <-time.After(wait.ForeverTestTimeout):
94 t.Error("Informer did not get the added pod")
95 }
96 }
97
View as plain text