Skip to content

Commit 40df300

Browse files
committed
fix: index queue issue
1 parent 22b3c17 commit 40df300

File tree

13 files changed

+203
-63
lines changed

13 files changed

+203
-63
lines changed

internal/alert/evaluator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func renderQueryTemplate(rule *config.AlertRule) (string, error) {
108108
}
109109

110110
var buf bytes.Buffer
111-
data := map[string]interface{}{
111+
data := map[string]any{
112112
"Threshold": rule.Threshold,
113113
"Conditions": fmt.Sprintf("ts >= now() - '%s'::INTERVAL", rule.EvaluationInterval),
114114
"Severity": rule.Severity,
@@ -169,16 +169,16 @@ func (e *AlertEvaluator) processQueryResults(rows *sql.Rows, rule *config.AlertR
169169
return nil, fmt.Errorf("failed to get columns: %w", err)
170170
}
171171

172-
values := make([]interface{}, len(columns))
173-
valuePtrs := make([]interface{}, len(columns))
172+
values := make([]any, len(columns))
173+
valuePtrs := make([]any, len(columns))
174174
for i := range values {
175175
valuePtrs[i] = &values[i]
176176
}
177177
if err := rows.Scan(valuePtrs...); err != nil {
178178
return nil, fmt.Errorf("failed to scan row: %w", err)
179179
}
180180

181-
rowData := make(map[string]interface{})
181+
rowData := make(map[string]any)
182182
for i, col := range columns {
183183
rowData[col] = values[i]
184184
}

internal/config/rules.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (r *AlertRule) String() string {
6060
r.Name, r.Query, r.Threshold, r.EvaluationInterval, r.ConsecutiveCount, r.Severity)
6161
}
6262

63-
func (r *AlertRule) AddFiringAlertAndCheckResolved(alertQueryResult map[string]interface{}) (*PostableAlert, bool, string) {
63+
func (r *AlertRule) AddFiringAlertAndCheckResolved(alertQueryResult map[string]any) (*PostableAlert, bool, string) {
6464
if r.FiringAlerts == nil {
6565
r.FiringAlerts = make(map[string]*FiringAlertCache)
6666
}
@@ -122,7 +122,7 @@ func (r *AlertRule) IsTestMode() bool {
122122
return r.TestMode
123123
}
124124

125-
func (r *AlertRule) toPostableAlert(alertQueryResult map[string]interface{}, startsAt time.Time, isResolved bool) PostableAlert {
125+
func (r *AlertRule) toPostableAlert(alertQueryResult map[string]any, startsAt time.Time, isResolved bool) PostableAlert {
126126
summary, description, instance, err := r.renderAlertContentTemplate(alertQueryResult)
127127

128128
if err != nil {
@@ -147,7 +147,7 @@ func (r *AlertRule) toPostableAlert(alertQueryResult map[string]interface{}, sta
147147
return alert
148148
}
149149

150-
func (rule *AlertRule) renderAlertContentTemplate(data interface{}) (string, string, string, error) {
150+
func (rule *AlertRule) renderAlertContentTemplate(data any) (string, string, string, error) {
151151
if rule.summaryTmplParsed == nil {
152152
summaryTmplParsed, err := template.New("summary").Parse(rule.Summary)
153153
rule.summaryTmplParsed = summaryTmplParsed

internal/controller/pod_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
7474
_ = r.Expander.RemovePreSchedulePod(req.Name, true)
7575
r.Allocator.DeallocByPodIdentifier(ctx, req.NamespacedName)
7676
metrics.RemoveWorkerMetrics(req.Name, time.Now())
77+
r.IndexAllocator.RemoveNodeIndexQueueForPod(req.NamespacedName)
7778
log.Info("Released GPU resources when pod deleted", "pod", req.NamespacedName)
7879
return ctrl.Result{}, nil
7980
}
@@ -113,7 +114,12 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
113114
}
114115
}
115116

117+
if utils.IsPodStopped(pod) {
118+
r.Allocator.DeallocByPodIdentifier(ctx, req.NamespacedName)
119+
}
120+
116121
if pod.Labels[constants.LabelComponent] == constants.ComponentWorker {
122+
r.IndexAllocator.ReconcileLockState(pod)
117123
if pod.DeletionTimestamp.IsZero() {
118124
metrics.SetWorkerMetricsByWorkload(pod)
119125
}

internal/gpuallocator/gpuallocator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,6 +1515,10 @@ func (s *GpuAllocator) reconcileAllocationState() {
15151515
s.uniqueAllocation[string(worker.UID)] = allocRequest
15161516
s.podNamespaceNsToPodUID[worker.Namespace+"/"+worker.Name] = string(worker.UID)
15171517
s.addAllocationMap(worker.Spec.NodeName, worker.ObjectMeta)
1518+
1519+
if utils.IsPodPending(&worker) {
1520+
s.indexAllocator.ReconcileLockState(&worker)
1521+
}
15181522
}
15191523
return scheduled && !deletedAndDeAllocated
15201524
})

internal/hypervisor/backend/kubernetes/external_dp/detector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (m *MockAPIServer) UpdateGPUStatus(gpu *tfv1.GPU) error {
3232
// MockKubeletClient is a mock implementation of KubeletClientInterface
3333
type MockKubeletClient struct {
3434
mock.Mock
35-
pods map[string]interface{}
35+
pods map[string]any
3636
}
3737

3838
func (m *MockKubeletClient) GetAllPods() map[string]any {

internal/hypervisor/server/handlers/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ func (h *WorkerHandler) HandleGetWorker(c *gin.Context) {
7575

7676
metrics, exists := workerMetrics[workerID]
7777
if !exists || metrics == nil {
78-
c.JSON(http.StatusOK, api.DataResponse[map[string]interface{}]{
79-
Data: map[string]interface{}{
78+
c.JSON(http.StatusOK, api.DataResponse[map[string]any]{
79+
Data: map[string]any{
8080
"worker_uid": workerID,
8181
"allocation": allocation,
8282
},

internal/hypervisor/tui/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func NewClient(host string, port int) *Client {
4646
// doRequest performs an HTTP request and decodes the JSON response
4747
//
4848
//nolint:unparam // method parameter is kept for API consistency, even though it's always "GET"
49-
func (c *Client) doRequest(ctx context.Context, method, path string, result interface{}) error {
49+
func (c *Client) doRequest(ctx context.Context, method, path string, result any) error {
5050
url := fmt.Sprintf("%s/%s", c.baseURL, path)
5151
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
5252
if err != nil {

internal/indexallocator/indexallocator.go

Lines changed: 129 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"math"
8-
"strconv"
98
"sync"
109
"sync/atomic"
1110
"time"
@@ -40,6 +39,15 @@ type IndexAllocator struct {
4039
// in use index from 0x01 -> 0xf8, indicates the pod using this index
4140
// When pod completed CDI and started or pending image pulling, should be removed from the queue
4241
nodeIndexQueue map[string]map[int]types.NamespacedName
42+
43+
podIndexMap map[types.NamespacedName]indexIdentifier
44+
45+
asyncCheckingMap map[types.NamespacedName]struct{}
46+
}
47+
48+
type indexIdentifier struct {
49+
nodeName string
50+
index int
4351
}
4452

4553
func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocator, error) {
@@ -53,6 +61,10 @@ func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocat
5361
currentIndex: 0, // Will start from 1 on first assignment
5462
ctx: ctx,
5563
initializedCh: make(chan struct{}),
64+
65+
nodeIndexQueue: make(map[string]map[int]types.NamespacedName, 128),
66+
67+
podIndexMap: make(map[types.NamespacedName]indexIdentifier, 128),
5668
}
5769

5870
return allocator, nil
@@ -85,66 +97,156 @@ func (s *IndexAllocator) AssignIndex(podName string) (int, error) {
8597
}
8698

8799
// ReconcileLockState maintains memory state for node level index assign and release queue
88-
func (s *IndexAllocator) ReconcileLockState(pod *v1.Pod) bool {
100+
func (s *IndexAllocator) ReconcileLockState(pod *v1.Pod) {
89101
if pod.Labels[constants.LabelComponent] != constants.ComponentWorker {
90-
return false
102+
return
91103
}
92104
// Check if it's TF indexed Pod by container resource limits
93105
// If isIndex But PodIndex not set, check phase, if pending, should assign index, next check
94106
if pod.Spec.NodeName == "" {
95-
return false
107+
return
96108
}
97109

98-
index := pod.Annotations[constants.PodIndexAnnotation]
99-
if index == "" {
100-
return false
101-
}
102-
indexInt, err := strconv.Atoi(index)
110+
index, err := utils.ParsePodIndexResourceClaim(pod)
103111
if err != nil {
104-
return false
112+
log.FromContext(s.ctx).Error(err, "not TF indexed Pod, skip reconcile lock state", "pod", pod.Name)
113+
return
114+
}
115+
_, indexAllocated := pod.Annotations[constants.PodIndexAnnotation]
116+
117+
// Only pending pods can occupy the node level index
118+
if utils.IsPodPending(pod) {
119+
s.storeMutex.Lock()
120+
indexQueue := s.nodeIndexQueue[pod.Spec.NodeName]
121+
if indexQueue == nil {
122+
indexQueue = make(map[int]types.NamespacedName)
123+
s.nodeIndexQueue[pod.Spec.NodeName] = indexQueue
124+
}
125+
126+
// If just started and missing in memory, should complement the index queue and pod index map
127+
if indexAllocated {
128+
// occupy the index if missing (when scheduler restarted)
129+
if _, exists := indexQueue[index]; !exists {
130+
podMeta := types.NamespacedName{
131+
Namespace: pod.Namespace,
132+
Name: pod.Name,
133+
}
134+
indexQueue[index] = podMeta
135+
s.podIndexMap[podMeta] = indexIdentifier{
136+
nodeName: pod.Spec.NodeName,
137+
index: index,
138+
}
139+
}
140+
s.storeMutex.Unlock()
141+
return
142+
}
143+
144+
if podMeta, exists := indexQueue[index]; exists {
145+
// If already occupied by other Pod, check if it's the same Pod
146+
if podMeta.Namespace != pod.Namespace || podMeta.Name != pod.Name {
147+
log.FromContext(s.ctx).Error(fmt.Errorf("pod index conflict"), "can not reconcile index lock, more than one pending pods occupy the same index", "pod", pod.Name, "index", index)
148+
s.storeMutex.Unlock()
149+
return
150+
}
151+
} else {
152+
// new Pod occupy the index, add to index queue
153+
indexQueue[index] = types.NamespacedName{
154+
Namespace: pod.Namespace,
155+
Name: pod.Name,
156+
}
157+
s.podIndexMap[types.NamespacedName{
158+
Namespace: pod.Namespace,
159+
Name: pod.Name,
160+
}] = indexIdentifier{
161+
nodeName: pod.Spec.NodeName,
162+
index: index,
163+
}
164+
s.storeMutex.Unlock()
165+
// Brand new pending pod, ensure the async checking loop for assigning index annotation
166+
s.AsyncCheckNodeIndexAvailableAndAssign(pod, index)
167+
}
168+
} else if utils.IsPodRunning(pod) {
169+
s.RemoveNodeIndexQueueForPod(types.NamespacedName{
170+
Namespace: pod.Namespace,
171+
Name: pod.Name,
172+
})
105173
}
174+
}
106175

176+
func (s *IndexAllocator) RemoveNodeIndexQueueForPod(namespacedName types.NamespacedName) {
107177
s.storeMutex.Lock()
108178
defer s.storeMutex.Unlock()
109179

110-
// Check Pod status
111-
// TODO: call in Pod controller and gpu Allocator init stage
112-
113-
indexQueue := s.nodeIndexQueue[pod.Spec.NodeName]
114-
if indexQueue == nil {
115-
indexQueue = make(map[int]types.NamespacedName)
116-
s.nodeIndexQueue[pod.Spec.NodeName] = indexQueue
180+
indexIdentifier, exists := s.podIndexMap[namespacedName]
181+
if !exists {
182+
return
117183
}
118-
indexQueue[indexInt] = types.NamespacedName{
119-
Namespace: pod.Namespace,
120-
Name: pod.Name,
184+
if indexQueue, exists := s.nodeIndexQueue[indexIdentifier.nodeName]; exists {
185+
if val, exists := indexQueue[indexIdentifier.index]; exists {
186+
if val.Namespace == namespacedName.Namespace && val.Name == namespacedName.Name {
187+
delete(indexQueue, indexIdentifier.index)
188+
log.FromContext(s.ctx).Info("Removed pod from node index queue after pod running/stopped/deleted", "pod", namespacedName, "index", indexIdentifier.index)
189+
}
190+
delete(s.podIndexMap, namespacedName)
191+
}
121192
}
122-
return true
123193
}
124194

125-
func (s *IndexAllocator) CheckNodeIndexAvailableForPod(pod *v1.Pod, index int) bool {
195+
func (s *IndexAllocator) CheckNodeIndexAndTryOccupy(pod *v1.Pod, index int) bool {
126196
<-s.initializedCh
127197
nodeName := pod.Spec.NodeName
128198
if nodeName == "" {
129199
// should not happen, unscheduled pod
130200
return false
131201
}
132202
s.storeMutex.RLock()
133-
defer s.storeMutex.RUnlock()
134203
indexQueue := s.nodeIndexQueue[nodeName]
135204
if len(indexQueue) == 0 {
205+
s.storeMutex.RUnlock()
136206
return false
137207
}
138208
_, exists := indexQueue[index]
139-
return !exists
209+
s.storeMutex.RUnlock()
210+
// Occupy index for node
211+
if !exists {
212+
s.storeMutex.Lock()
213+
indexQueue[index] = types.NamespacedName{
214+
Namespace: pod.Namespace,
215+
Name: pod.Name,
216+
}
217+
s.storeMutex.Unlock()
218+
return true
219+
}
220+
return false
140221
}
141222

142223
func (s *IndexAllocator) SetReady() {
143224
close(s.initializedCh)
144225
}
145226

146-
func (s *IndexAllocator) CheckNodeIndexAvailableAndAssign(pod *v1.Pod, index int) {
227+
func (s *IndexAllocator) AsyncCheckNodeIndexAvailableAndAssign(pod *v1.Pod, index int) {
228+
s.storeMutex.Lock()
229+
defer s.storeMutex.Unlock()
230+
podMeta := types.NamespacedName{
231+
Namespace: pod.Namespace,
232+
Name: pod.Name,
233+
}
234+
if _, exists := s.asyncCheckingMap[podMeta]; exists {
235+
// already started checking loop, skip
236+
return
237+
}
238+
s.asyncCheckingMap[podMeta] = struct{}{}
239+
147240
go func() {
241+
defer func() {
242+
s.storeMutex.Lock()
243+
delete(s.asyncCheckingMap, types.NamespacedName{
244+
Namespace: pod.Namespace,
245+
Name: pod.Name,
246+
})
247+
s.storeMutex.Unlock()
248+
}()
249+
148250
// Infinity backoff retry until index is available, and also reconcile started
149251
_ = retry.OnError(wait.Backoff{
150252
Duration: 3 * time.Second,
@@ -173,9 +275,10 @@ func (s *IndexAllocator) CheckNodeIndexAvailableAndAssign(pod *v1.Pod, index int
173275
"pod", pod.Name, "node", pod.Spec.NodeName)
174276
return nil
175277
}
278+
// else do nothing, may caused by duplicated reconciling
176279
}
177280

178-
if !s.CheckNodeIndexAvailableForPod(pod, index) {
281+
if !s.CheckNodeIndexAndTryOccupy(pod, index) {
179282
return fmt.Errorf("index is not available")
180283
}
181284
// Index available, patch annotation to transit Pod from Pending to DeviceAllocating in hypervisor

internal/metrics/connect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (t *TimeSeriesDB) SetTableTTL(ttl string) error {
153153

154154
func (t *TimeSeriesDB) FindRecentNodeMetrics() ([]NodeResourceMetrics, error) {
155155
var monitors []NodeResourceMetrics
156-
err := t.DB.Find(&monitors, map[string]interface{}{
156+
err := t.DB.Find(&monitors, map[string]any{
157157
"ts": gorm.Expr("now() - interval 1 hour"),
158158
}).Error
159159
return monitors, err

0 commit comments

Comments
 (0)