1
16
17 package util
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/types"
29 utilerrors "k8s.io/apimachinery/pkg/util/errors"
30 "k8s.io/apimachinery/pkg/util/net"
31 "k8s.io/apimachinery/pkg/util/strategicpatch"
32 "k8s.io/client-go/kubernetes"
33 "k8s.io/client-go/tools/cache"
34 "k8s.io/client-go/util/retry"
35 corev1helpers "k8s.io/component-helpers/scheduling/corev1"
36 "k8s.io/klog/v2"
37 extenderv1 "k8s.io/kube-scheduler/extender/v1"
38 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
39 )
40
41
42 func GetPodFullName(pod *v1.Pod) string {
43
44
45 return pod.Name + "_" + pod.Namespace
46 }
47
48
49
50 func GetPodStartTime(pod *v1.Pod) *metav1.Time {
51 if pod.Status.StartTime != nil {
52 return pod.Status.StartTime
53 }
54
55 return &metav1.Time{Time: time.Now()}
56 }
57
58
59
60 func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time {
61 if len(victims.Pods) == 0 {
62
63 klog.Background().Error(nil, "victims.Pods is empty. Should not reach here")
64 return nil
65 }
66
67 earliestPodStartTime := GetPodStartTime(victims.Pods[0])
68 maxPriority := corev1helpers.PodPriority(victims.Pods[0])
69
70 for _, pod := range victims.Pods {
71 if podPriority := corev1helpers.PodPriority(pod); podPriority == maxPriority {
72 if podStartTime := GetPodStartTime(pod); podStartTime.Before(earliestPodStartTime) {
73 earliestPodStartTime = podStartTime
74 }
75 } else if podPriority > maxPriority {
76 maxPriority = podPriority
77 earliestPodStartTime = GetPodStartTime(pod)
78 }
79 }
80
81 return earliestPodStartTime
82 }
83
84
85
86
87
88 func MoreImportantPod(pod1, pod2 *v1.Pod) bool {
89 p1 := corev1helpers.PodPriority(pod1)
90 p2 := corev1helpers.PodPriority(pod2)
91 if p1 != p2 {
92 return p1 > p2
93 }
94 return GetPodStartTime(pod1).Before(GetPodStartTime(pod2))
95 }
96
97
98 func Retriable(err error) bool {
99 return apierrors.IsInternalError(err) || apierrors.IsServiceUnavailable(err) ||
100 net.IsConnectionRefused(err)
101 }
102
103
104
105 func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error {
106 if newStatus == nil {
107 return nil
108 }
109
110 oldData, err := json.Marshal(v1.Pod{Status: old.Status})
111 if err != nil {
112 return err
113 }
114
115 newData, err := json.Marshal(v1.Pod{Status: *newStatus})
116 if err != nil {
117 return err
118 }
119 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
120 if err != nil {
121 return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err)
122 }
123
124 if "{}" == string(patchBytes) {
125 return nil
126 }
127
128 patchFn := func() error {
129 _, err := cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
130 return err
131 }
132
133 return retry.OnError(retry.DefaultBackoff, Retriable, patchFn)
134 }
135
136
137 func DeletePod(ctx context.Context, cs kubernetes.Interface, pod *v1.Pod) error {
138 return cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
139 }
140
141
142
143 func ClearNominatedNodeName(ctx context.Context, cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {
144 var errs []error
145 for _, p := range pods {
146 if len(p.Status.NominatedNodeName) == 0 {
147 continue
148 }
149 podStatusCopy := p.Status.DeepCopy()
150 podStatusCopy.NominatedNodeName = ""
151 if err := PatchPodStatus(ctx, cs, p, podStatusCopy); err != nil {
152 errs = append(errs, err)
153 }
154 }
155 return utilerrors.NewAggregate(errs)
156 }
157
158
159 func IsScalarResourceName(name v1.ResourceName) bool {
160 return v1helper.IsExtendedResourceName(name) || v1helper.IsHugePageResourceName(name) ||
161 v1helper.IsPrefixedNativeResource(name) || v1helper.IsAttachableVolumeResourceName(name)
162 }
163
164
165
166
167
168
169 func As[T any](oldObj, newobj interface{}) (T, T, error) {
170 var oldTyped T
171 var newTyped T
172 var ok bool
173 if newobj != nil {
174 newTyped, ok = newobj.(T)
175 if !ok {
176 return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", newTyped, newobj)
177 }
178 }
179
180 if oldObj != nil {
181 if realOldObj, ok := oldObj.(cache.DeletedFinalStateUnknown); ok {
182 oldObj = realOldObj.Obj
183 }
184 oldTyped, ok = oldObj.(T)
185 if !ok {
186 return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", oldTyped, oldObj)
187 }
188 }
189 return oldTyped, newTyped, nil
190 }
191
View as plain text