Skip to content

Commit 91c6339

Browse files
committed
fix: index queue issue
1 parent e54e50f commit 91c6339

File tree

13 files changed

+203
-63
lines changed

13 files changed

+203
-63
lines changed

internal/alert/evaluator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func renderQueryTemplate(rule *config.AlertRule) (string, error) {
108108
}
109109

110110
var buf bytes.Buffer
111-
data := map[string]interface{}{
111+
data := map[string]any{
112112
"Threshold": rule.Threshold,
113113
"Conditions": fmt.Sprintf("ts >= now() - '%s'::INTERVAL", rule.EvaluationInterval),
114114
"Severity": rule.Severity,
@@ -169,16 +169,16 @@ func (e *AlertEvaluator) processQueryResults(rows *sql.Rows, rule *config.AlertR
169169
return nil, fmt.Errorf("failed to get columns: %w", err)
170170
}
171171

172-
values := make([]interface{}, len(columns))
173-
valuePtrs := make([]interface{}, len(columns))
172+
values := make([]any, len(columns))
173+
valuePtrs := make([]any, len(columns))
174174
for i := range values {
175175
valuePtrs[i] = &values[i]
176176
}
177177
if err := rows.Scan(valuePtrs...); err != nil {
178178
return nil, fmt.Errorf("failed to scan row: %w", err)
179179
}
180180

181-
rowData := make(map[string]interface{})
181+
rowData := make(map[string]any)
182182
for i, col := range columns {
183183
rowData[col] = values[i]
184184
}

internal/config/rules.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (r *AlertRule) String() string {
6060
r.Name, r.Query, r.Threshold, r.EvaluationInterval, r.ConsecutiveCount, r.Severity)
6161
}
6262

63-
func (r *AlertRule) AddFiringAlertAndCheckResolved(alertQueryResult map[string]interface{}) (*PostableAlert, bool, string) {
63+
func (r *AlertRule) AddFiringAlertAndCheckResolved(alertQueryResult map[string]any) (*PostableAlert, bool, string) {
6464
if r.FiringAlerts == nil {
6565
r.FiringAlerts = make(map[string]*FiringAlertCache)
6666
}
@@ -122,7 +122,7 @@ func (r *AlertRule) IsTestMode() bool {
122122
return r.TestMode
123123
}
124124

125-
func (r *AlertRule) toPostableAlert(alertQueryResult map[string]interface{}, startsAt time.Time, isResolved bool) PostableAlert {
125+
func (r *AlertRule) toPostableAlert(alertQueryResult map[string]any, startsAt time.Time, isResolved bool) PostableAlert {
126126
summary, description, instance, err := r.renderAlertContentTemplate(alertQueryResult)
127127

128128
if err != nil {
@@ -147,7 +147,7 @@ func (r *AlertRule) toPostableAlert(alertQueryResult map[string]interface{}, sta
147147
return alert
148148
}
149149

150-
func (rule *AlertRule) renderAlertContentTemplate(data interface{}) (string, string, string, error) {
150+
func (rule *AlertRule) renderAlertContentTemplate(data any) (string, string, string, error) {
151151
if rule.summaryTmplParsed == nil {
152152
summaryTmplParsed, err := template.New("summary").Parse(rule.Summary)
153153
rule.summaryTmplParsed = summaryTmplParsed

internal/controller/pod_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
7474
_ = r.Expander.RemovePreSchedulePod(req.Name, true)
7575
r.Allocator.DeallocByPodIdentifier(ctx, req.NamespacedName)
7676
metrics.RemoveWorkerMetrics(req.Name, time.Now())
77+
r.IndexAllocator.RemoveNodeIndexQueueForPod(req.NamespacedName)
7778
log.Info("Released GPU resources when pod deleted", "pod", req.NamespacedName)
7879
return ctrl.Result{}, nil
7980
}
@@ -113,7 +114,12 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
113114
}
114115
}
115116

117+
if utils.IsPodStopped(pod) {
118+
r.Allocator.DeallocByPodIdentifier(ctx, req.NamespacedName)
119+
}
120+
116121
if pod.Labels[constants.LabelComponent] == constants.ComponentWorker {
122+
r.IndexAllocator.ReconcileLockState(pod)
117123
if pod.DeletionTimestamp.IsZero() {
118124
metrics.SetWorkerMetricsByWorkload(pod)
119125
}

internal/gpuallocator/gpuallocator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,6 +1515,10 @@ func (s *GpuAllocator) reconcileAllocationState() {
15151515
s.uniqueAllocation[string(worker.UID)] = allocRequest
15161516
s.podNamespaceNsToPodUID[worker.Namespace+"/"+worker.Name] = string(worker.UID)
15171517
s.addAllocationMap(worker.Spec.NodeName, worker.ObjectMeta)
1518+
1519+
if utils.IsPodPending(&worker) {
1520+
s.indexAllocator.ReconcileLockState(&worker)
1521+
}
15181522
}
15191523
return scheduled && !deletedAndDeAllocated
15201524
})

internal/hypervisor/backend/kubernetes/external_dp/detector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (m *MockAPIServer) UpdateGPUStatus(gpu *tfv1.GPU) error {
3232
// MockKubeletClient is a mock implementation of KubeletClientInterface
3333
type MockKubeletClient struct {
3434
mock.Mock
35-
pods map[string]interface{}
35+
pods map[string]any
3636
}
3737

3838
func (m *MockKubeletClient) GetAllPods() map[string]any {

internal/hypervisor/server/handlers/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ func (h *WorkerHandler) HandleGetWorker(c *gin.Context) {
7575

7676
metrics, exists := workerMetrics[workerID]
7777
if !exists || metrics == nil {
78-
c.JSON(http.StatusOK, api.DataResponse[map[string]interface{}]{
79-
Data: map[string]interface{}{
78+
c.JSON(http.StatusOK, api.DataResponse[map[string]any]{
79+
Data: map[string]any{
8080
"worker_uid": workerID,
8181
"allocation": allocation,
8282
},

internal/hypervisor/tui/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func NewClient(host string, port int) *Client {
4646
// doRequest performs an HTTP request and decodes the JSON response
4747
//
4848
//nolint:unparam // method parameter is kept for API consistency, even though it's always "GET"
49-
func (c *Client) doRequest(ctx context.Context, method, path string, result interface{}) error {
49+
func (c *Client) doRequest(ctx context.Context, method, path string, result any) error {
5050
url := fmt.Sprintf("%s/%s", c.baseURL, path)
5151
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
5252
if err != nil {

internal/indexallocator/indexallocator.go

Lines changed: 129 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"math"
8-
"strconv"
98
"sync"
109
"sync/atomic"
1110
"time"
@@ -40,6 +39,15 @@ type IndexAllocator struct {
4039
// in use index from 0x01 -> 0xf8, indicates the pod using this index
4140
// When pod completed CDI and started or pending image pulling, should be removed from the queue
4241
nodeIndexQueue map[string]map[int]types.NamespacedName
42+
43+
podIndexMap map[types.NamespacedName]indexIdentifier
44+
45+
asyncCheckingMap map[types.NamespacedName]struct{}
46+
}
47+
48+
type indexIdentifier struct {
49+
nodeName string
50+
index int
4351
}
4452

4553
func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocator, error) {
@@ -53,6 +61,10 @@ func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocat
5361
currentIndex: 0, // Will start from 1 on first assignment
5462
ctx: ctx,
5563
initializedCh: make(chan struct{}),
64+
65+
nodeIndexQueue: make(map[string]map[int]types.NamespacedName, 128),
66+
67+
podIndexMap: make(map[types.NamespacedName]indexIdentifier, 128),
5668
}
5769

5870
return allocator, nil
@@ -84,66 +96,156 @@ func (s *IndexAllocator) AssignIndex(podName string) (int, error) {
8496
}
8597

8698
// ReconcileLockState maintains memory state for node level index assign and release queue
87-
func (s *IndexAllocator) ReconcileLockState(pod *v1.Pod) bool {
99+
func (s *IndexAllocator) ReconcileLockState(pod *v1.Pod) {
88100
if pod.Labels[constants.LabelComponent] != constants.ComponentWorker {
89-
return false
101+
return
90102
}
91103
// Check if it's TF indexed Pod by container resource limits
92104
// If isIndex But PodIndex not set, check phase, if pending, should assign index, next check
93105
if pod.Spec.NodeName == "" {
94-
return false
106+
return
95107
}
96108

97-
index := pod.Annotations[constants.PodIndexAnnotation]
98-
if index == "" {
99-
return false
100-
}
101-
indexInt, err := strconv.Atoi(index)
109+
index, err := utils.ParsePodIndexResourceClaim(pod)
102110
if err != nil {
103-
return false
111+
log.FromContext(s.ctx).Error(err, "not TF indexed Pod, skip reconcile lock state", "pod", pod.Name)
112+
return
113+
}
114+
_, indexAllocated := pod.Annotations[constants.PodIndexAnnotation]
115+
116+
// Only pending pods can occupy the node level index
117+
if utils.IsPodPending(pod) {
118+
s.storeMutex.Lock()
119+
indexQueue := s.nodeIndexQueue[pod.Spec.NodeName]
120+
if indexQueue == nil {
121+
indexQueue = make(map[int]types.NamespacedName)
122+
s.nodeIndexQueue[pod.Spec.NodeName] = indexQueue
123+
}
124+
125+
// If just started and missing in memory, should complement the index queue and pod index map
126+
if indexAllocated {
127+
// occupy the index if missing (when scheduler restarted)
128+
if _, exists := indexQueue[index]; !exists {
129+
podMeta := types.NamespacedName{
130+
Namespace: pod.Namespace,
131+
Name: pod.Name,
132+
}
133+
indexQueue[index] = podMeta
134+
s.podIndexMap[podMeta] = indexIdentifier{
135+
nodeName: pod.Spec.NodeName,
136+
index: index,
137+
}
138+
}
139+
s.storeMutex.Unlock()
140+
return
141+
}
142+
143+
if podMeta, exists := indexQueue[index]; exists {
144+
// If already occupied by other Pod, check if it's the same Pod
145+
if podMeta.Namespace != pod.Namespace || podMeta.Name != pod.Name {
146+
log.FromContext(s.ctx).Error(fmt.Errorf("pod index conflict"), "can not reconcile index lock, more than one pending pods occupy the same index", "pod", pod.Name, "index", index)
147+
s.storeMutex.Unlock()
148+
return
149+
}
150+
} else {
151+
// new Pod occupy the index, add to index queue
152+
indexQueue[index] = types.NamespacedName{
153+
Namespace: pod.Namespace,
154+
Name: pod.Name,
155+
}
156+
s.podIndexMap[types.NamespacedName{
157+
Namespace: pod.Namespace,
158+
Name: pod.Name,
159+
}] = indexIdentifier{
160+
nodeName: pod.Spec.NodeName,
161+
index: index,
162+
}
163+
s.storeMutex.Unlock()
164+
// Brand new pending pod, ensure the async checking loop for assigning index annotation
165+
s.AsyncCheckNodeIndexAvailableAndAssign(pod, index)
166+
}
167+
} else if utils.IsPodRunning(pod) {
168+
s.RemoveNodeIndexQueueForPod(types.NamespacedName{
169+
Namespace: pod.Namespace,
170+
Name: pod.Name,
171+
})
104172
}
173+
}
105174

175+
func (s *IndexAllocator) RemoveNodeIndexQueueForPod(namespacedName types.NamespacedName) {
106176
s.storeMutex.Lock()
107177
defer s.storeMutex.Unlock()
108178

109-
// Check Pod status
110-
// TODO: call in Pod controller and gpu Allocator init stage
111-
112-
indexQueue := s.nodeIndexQueue[pod.Spec.NodeName]
113-
if indexQueue == nil {
114-
indexQueue = make(map[int]types.NamespacedName)
115-
s.nodeIndexQueue[pod.Spec.NodeName] = indexQueue
179+
indexIdentifier, exists := s.podIndexMap[namespacedName]
180+
if !exists {
181+
return
116182
}
117-
indexQueue[indexInt] = types.NamespacedName{
118-
Namespace: pod.Namespace,
119-
Name: pod.Name,
183+
if indexQueue, exists := s.nodeIndexQueue[indexIdentifier.nodeName]; exists {
184+
if val, exists := indexQueue[indexIdentifier.index]; exists {
185+
if val.Namespace == namespacedName.Namespace && val.Name == namespacedName.Name {
186+
delete(indexQueue, indexIdentifier.index)
187+
log.FromContext(s.ctx).Info("Removed pod from node index queue after pod running/stopped/deleted", "pod", namespacedName, "index", indexIdentifier.index)
188+
}
189+
delete(s.podIndexMap, namespacedName)
190+
}
120191
}
121-
return true
122192
}
123193

124-
func (s *IndexAllocator) CheckNodeIndexAvailableForPod(pod *v1.Pod, index int) bool {
194+
func (s *IndexAllocator) CheckNodeIndexAndTryOccupy(pod *v1.Pod, index int) bool {
125195
<-s.initializedCh
126196
nodeName := pod.Spec.NodeName
127197
if nodeName == "" {
128198
// should not happen, unscheduled pod
129199
return false
130200
}
131201
s.storeMutex.RLock()
132-
defer s.storeMutex.RUnlock()
133202
indexQueue := s.nodeIndexQueue[nodeName]
134203
if len(indexQueue) == 0 {
204+
s.storeMutex.RUnlock()
135205
return false
136206
}
137207
_, exists := indexQueue[index]
138-
return !exists
208+
s.storeMutex.RUnlock()
209+
// Occupy index for node
210+
if !exists {
211+
s.storeMutex.Lock()
212+
indexQueue[index] = types.NamespacedName{
213+
Namespace: pod.Namespace,
214+
Name: pod.Name,
215+
}
216+
s.storeMutex.Unlock()
217+
return true
218+
}
219+
return false
139220
}
140221

141222
func (s *IndexAllocator) SetReady() {
142223
close(s.initializedCh)
143224
}
144225

145-
func (s *IndexAllocator) CheckNodeIndexAvailableAndAssign(pod *v1.Pod, index int) {
226+
func (s *IndexAllocator) AsyncCheckNodeIndexAvailableAndAssign(pod *v1.Pod, index int) {
227+
s.storeMutex.Lock()
228+
defer s.storeMutex.Unlock()
229+
podMeta := types.NamespacedName{
230+
Namespace: pod.Namespace,
231+
Name: pod.Name,
232+
}
233+
if _, exists := s.asyncCheckingMap[podMeta]; exists {
234+
// already started checking loop, skip
235+
return
236+
}
237+
s.asyncCheckingMap[podMeta] = struct{}{}
238+
146239
go func() {
240+
defer func() {
241+
s.storeMutex.Lock()
242+
delete(s.asyncCheckingMap, types.NamespacedName{
243+
Namespace: pod.Namespace,
244+
Name: pod.Name,
245+
})
246+
s.storeMutex.Unlock()
247+
}()
248+
147249
// Infinity backoff retry until index is available, and also reconcile started
148250
_ = retry.OnError(wait.Backoff{
149251
Duration: 3 * time.Second,
@@ -172,9 +274,10 @@ func (s *IndexAllocator) CheckNodeIndexAvailableAndAssign(pod *v1.Pod, index int
172274
"pod", pod.Name, "node", pod.Spec.NodeName)
173275
return nil
174276
}
277+
// else do nothing, may caused by duplicated reconciling
175278
}
176279

177-
if !s.CheckNodeIndexAvailableForPod(pod, index) {
280+
if !s.CheckNodeIndexAndTryOccupy(pod, index) {
178281
return fmt.Errorf("index is not available")
179282
}
180283
// Index available, patch annotation to transit Pod from Pending to DeviceAllocating in hypervisor

internal/metrics/connect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (t *TimeSeriesDB) SetTableTTL(ttl string) error {
153153

154154
func (t *TimeSeriesDB) FindRecentNodeMetrics() ([]NodeResourceMetrics, error) {
155155
var monitors []NodeResourceMetrics
156-
err := t.DB.Find(&monitors, map[string]interface{}{
156+
err := t.DB.Find(&monitors, map[string]any{
157157
"ts": gorm.Expr("now() - interval 1 hour"),
158158
}).Error
159159
return monitors, err

0 commit comments

Comments
 (0)