...
1
16
17 package runtime
18
19 import (
20 "fmt"
21 "sync"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/kubernetes/pkg/scheduler/framework"
27 )
28
29
30 type waitingPodsMap struct {
31 pods map[types.UID]*waitingPod
32 mu sync.RWMutex
33 }
34
35
36 func newWaitingPodsMap() *waitingPodsMap {
37 return &waitingPodsMap{
38 pods: make(map[types.UID]*waitingPod),
39 }
40 }
41
42
43 func (m *waitingPodsMap) add(wp *waitingPod) {
44 m.mu.Lock()
45 defer m.mu.Unlock()
46 m.pods[wp.GetPod().UID] = wp
47 }
48
49
50 func (m *waitingPodsMap) remove(uid types.UID) {
51 m.mu.Lock()
52 defer m.mu.Unlock()
53 delete(m.pods, uid)
54 }
55
56
57 func (m *waitingPodsMap) get(uid types.UID) *waitingPod {
58 m.mu.RLock()
59 defer m.mu.RUnlock()
60 return m.pods[uid]
61 }
62
63
64 func (m *waitingPodsMap) iterate(callback func(framework.WaitingPod)) {
65 m.mu.RLock()
66 defer m.mu.RUnlock()
67 for _, v := range m.pods {
68 callback(v)
69 }
70 }
71
72
73 type waitingPod struct {
74 pod *v1.Pod
75 pendingPlugins map[string]*time.Timer
76 s chan *framework.Status
77 mu sync.RWMutex
78 }
79
80 var _ framework.WaitingPod = &waitingPod{}
81
82
83 func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
84 wp := &waitingPod{
85 pod: pod,
86
87
88
89
90 s: make(chan *framework.Status, 1),
91 }
92
93 wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
94
95
96 wp.mu.Lock()
97 defer wp.mu.Unlock()
98 for k, v := range pluginsMaxWaitTime {
99 plugin, waitTime := k, v
100 wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
101 msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
102 waitTime, plugin)
103 wp.Reject(plugin, msg)
104 })
105 }
106
107 return wp
108 }
109
110
111 func (w *waitingPod) GetPod() *v1.Pod {
112 return w.pod
113 }
114
115
116 func (w *waitingPod) GetPendingPlugins() []string {
117 w.mu.RLock()
118 defer w.mu.RUnlock()
119 plugins := make([]string, 0, len(w.pendingPlugins))
120 for p := range w.pendingPlugins {
121 plugins = append(plugins, p)
122 }
123
124 return plugins
125 }
126
127
128
129
130 func (w *waitingPod) Allow(pluginName string) {
131 w.mu.Lock()
132 defer w.mu.Unlock()
133 if timer, exist := w.pendingPlugins[pluginName]; exist {
134 timer.Stop()
135 delete(w.pendingPlugins, pluginName)
136 }
137
138
139 if len(w.pendingPlugins) != 0 {
140 return
141 }
142
143
144
145 select {
146 case w.s <- framework.NewStatus(framework.Success, ""):
147 default:
148 }
149 }
150
151
152 func (w *waitingPod) Reject(pluginName, msg string) {
153 w.mu.RLock()
154 defer w.mu.RUnlock()
155 for _, timer := range w.pendingPlugins {
156 timer.Stop()
157 }
158
159
160
161 select {
162 case w.s <- framework.NewStatus(framework.Unschedulable, msg).WithPlugin(pluginName):
163 default:
164 }
165 }
166
View as plain text