...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/runtime/waiting_pods_map.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/runtime

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package runtime
    18  
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"time"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/kubernetes/pkg/scheduler/framework"
    27  )
    28  
    29  // waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
    30  type waitingPodsMap struct {
    31  	pods map[types.UID]*waitingPod
    32  	mu   sync.RWMutex
    33  }
    34  
    35  // newWaitingPodsMap returns a new waitingPodsMap.
    36  func newWaitingPodsMap() *waitingPodsMap {
    37  	return &waitingPodsMap{
    38  		pods: make(map[types.UID]*waitingPod),
    39  	}
    40  }
    41  
    42  // add a new WaitingPod to the map.
    43  func (m *waitingPodsMap) add(wp *waitingPod) {
    44  	m.mu.Lock()
    45  	defer m.mu.Unlock()
    46  	m.pods[wp.GetPod().UID] = wp
    47  }
    48  
    49  // remove a WaitingPod from the map.
    50  func (m *waitingPodsMap) remove(uid types.UID) {
    51  	m.mu.Lock()
    52  	defer m.mu.Unlock()
    53  	delete(m.pods, uid)
    54  }
    55  
    56  // get a WaitingPod from the map.
    57  func (m *waitingPodsMap) get(uid types.UID) *waitingPod {
    58  	m.mu.RLock()
    59  	defer m.mu.RUnlock()
    60  	return m.pods[uid]
    61  }
    62  
    63  // iterate acquires a read lock and iterates over the WaitingPods map.
    64  func (m *waitingPodsMap) iterate(callback func(framework.WaitingPod)) {
    65  	m.mu.RLock()
    66  	defer m.mu.RUnlock()
    67  	for _, v := range m.pods {
    68  		callback(v)
    69  	}
    70  }
    71  
    72  // waitingPod represents a pod waiting in the permit phase.
    73  type waitingPod struct {
    74  	pod            *v1.Pod
    75  	pendingPlugins map[string]*time.Timer
    76  	s              chan *framework.Status
    77  	mu             sync.RWMutex
    78  }
    79  
    80  var _ framework.WaitingPod = &waitingPod{}
    81  
    82  // newWaitingPod returns a new waitingPod instance.
    83  func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
    84  	wp := &waitingPod{
    85  		pod: pod,
    86  		// Allow() and Reject() calls are non-blocking. This property is guaranteed
    87  		// by using non-blocking send to this channel. This channel has a buffer of size 1
    88  		// to ensure that non-blocking send will not be ignored - possible situation when
    89  		// receiving from this channel happens after non-blocking send.
    90  		s: make(chan *framework.Status, 1),
    91  	}
    92  
    93  	wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
    94  	// The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the
    95  	// lock here so that time.AfterFunc can only execute after newWaitingPod finishes.
    96  	wp.mu.Lock()
    97  	defer wp.mu.Unlock()
    98  	for k, v := range pluginsMaxWaitTime {
    99  		plugin, waitTime := k, v
   100  		wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
   101  			msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
   102  				waitTime, plugin)
   103  			wp.Reject(plugin, msg)
   104  		})
   105  	}
   106  
   107  	return wp
   108  }
   109  
   110  // GetPod returns a reference to the waiting pod.
   111  func (w *waitingPod) GetPod() *v1.Pod {
   112  	return w.pod
   113  }
   114  
   115  // GetPendingPlugins returns a list of pending permit plugin's name.
   116  func (w *waitingPod) GetPendingPlugins() []string {
   117  	w.mu.RLock()
   118  	defer w.mu.RUnlock()
   119  	plugins := make([]string, 0, len(w.pendingPlugins))
   120  	for p := range w.pendingPlugins {
   121  		plugins = append(plugins, p)
   122  	}
   123  
   124  	return plugins
   125  }
   126  
   127  // Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
   128  // If this is the last remaining plugin to allow, then a success signal is delivered
   129  // to unblock the pod.
   130  func (w *waitingPod) Allow(pluginName string) {
   131  	w.mu.Lock()
   132  	defer w.mu.Unlock()
   133  	if timer, exist := w.pendingPlugins[pluginName]; exist {
   134  		timer.Stop()
   135  		delete(w.pendingPlugins, pluginName)
   136  	}
   137  
   138  	// Only signal success status after all plugins have allowed
   139  	if len(w.pendingPlugins) != 0 {
   140  		return
   141  	}
   142  
   143  	// The select clause works as a non-blocking send.
   144  	// If there is no receiver, it's a no-op (default case).
   145  	select {
   146  	case w.s <- framework.NewStatus(framework.Success, ""):
   147  	default:
   148  	}
   149  }
   150  
   151  // Reject declares the waiting pod unschedulable.
   152  func (w *waitingPod) Reject(pluginName, msg string) {
   153  	w.mu.RLock()
   154  	defer w.mu.RUnlock()
   155  	for _, timer := range w.pendingPlugins {
   156  		timer.Stop()
   157  	}
   158  
   159  	// The select clause works as a non-blocking send.
   160  	// If there is no receiver, it's a no-op (default case).
   161  	select {
   162  	case w.s <- framework.NewStatus(framework.Unschedulable, msg).WithPlugin(pluginName):
   163  	default:
   164  	}
   165  }
   166  

View as plain text