Skip to content

Commit e9af674

Browse files
committed
feat(sched): 实现初步的cpu多核心上的负载均衡
Signed-off-by: sparkzky <sparkhhhhhhhhhh@outlook.com>
1 parent ecf7749 commit e9af674

File tree

7 files changed

+839
-20
lines changed

7 files changed

+839
-20
lines changed

kernel/src/process/mod.rs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use system_error::SystemError;
2121

2222
use crate::{
2323
arch::{
24-
cpu::current_cpu_id,
2524
ipc::signal::{AtomicSignal, SigSet, Signal},
2625
process::ArchPCBInfo,
2726
CurrentIrqArch, SigStackArch,
@@ -243,15 +242,24 @@ impl ProcessManager {
243242
// avoid deadlock
244243
drop(writer);
245244

246-
let rq =
247-
cpu_rq(pcb.sched_info().on_cpu().unwrap_or(current_cpu_id()).data() as usize);
245+
let prev_cpu = pcb.sched_info().on_cpu().unwrap_or(smp_get_processor_id());
246+
247+
// 使用负载均衡器选择目标CPU
248+
let target_cpu =
249+
crate::sched::load_balance::LoadBalancer::select_task_rq(pcb, prev_cpu, 0);
250+
251+
let rq = cpu_rq(target_cpu.data() as usize);
248252

249253
let (rq, _guard) = rq.self_lock();
250254
rq.update_rq_clock();
251-
rq.activate_task(
252-
pcb,
253-
EnqueueFlag::ENQUEUE_WAKEUP | EnqueueFlag::ENQUEUE_NOCLOCK,
254-
);
255+
256+
// 如果目标CPU与原CPU不同,添加迁移标志
257+
let mut flags = EnqueueFlag::ENQUEUE_WAKEUP | EnqueueFlag::ENQUEUE_NOCLOCK;
258+
if target_cpu != prev_cpu {
259+
flags |= EnqueueFlag::ENQUEUE_MIGRATED;
260+
}
261+
262+
rq.activate_task(pcb, flags);
255263

256264
rq.check_preempt_currnet(pcb, WakeupFlags::empty());
257265

@@ -281,19 +289,24 @@ impl ProcessManager {
281289
// avoid deadlock
282290
drop(writer);
283291

284-
let rq = cpu_rq(
285-
pcb.sched_info()
286-
.on_cpu()
287-
.unwrap_or(smp_get_processor_id())
288-
.data() as usize,
289-
);
292+
let prev_cpu = pcb.sched_info().on_cpu().unwrap_or(smp_get_processor_id());
293+
294+
// 使用负载均衡器选择目标CPU
295+
let target_cpu =
296+
crate::sched::load_balance::LoadBalancer::select_task_rq(pcb, prev_cpu, 0);
297+
298+
let rq = cpu_rq(target_cpu.data() as usize);
290299

291300
let (rq, _guard) = rq.self_lock();
292301
rq.update_rq_clock();
293-
rq.activate_task(
294-
pcb,
295-
EnqueueFlag::ENQUEUE_WAKEUP | EnqueueFlag::ENQUEUE_NOCLOCK,
296-
);
302+
303+
// 如果目标CPU与原CPU不同,添加迁移标志
304+
let mut flags = EnqueueFlag::ENQUEUE_WAKEUP | EnqueueFlag::ENQUEUE_NOCLOCK;
305+
if target_cpu != prev_cpu {
306+
flags |= EnqueueFlag::ENQUEUE_MIGRATED;
307+
}
308+
309+
rq.activate_task(pcb, flags);
297310

298311
rq.check_preempt_currnet(pcb, WakeupFlags::empty());
299312

kernel/src/sched/load_balance.rs

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
//! 多核负载均衡模块
2+
//!
3+
//! 该模块实现了CPU之间的负载均衡,包括:
4+
//! - 选择唤醒任务时的目标CPU
5+
//! - 周期性负载均衡检查
6+
//! - 任务迁移
7+
8+
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9+
10+
use alloc::sync::Arc;
11+
12+
use crate::{
13+
libs::cpumask::CpuMask,
14+
process::ProcessControlBlock,
15+
smp::{
16+
core::smp_get_processor_id,
17+
cpu::{smp_cpu_manager, ProcessorId},
18+
},
19+
time::timer::clock,
20+
};
21+
22+
use super::{cpu_rq, CpuRunQueue, DequeueFlag, EnqueueFlag, SchedPolicy};
23+
24+
/// ## 负载均衡间隔(单位:jiffies),执行一次负载均衡检查
25+
const LOAD_BALANCE_INTERVAL: u64 = 100;
26+
27+
/// ## 负载不均衡阈值
28+
/// 当两个CPU的负载差距超过这个比例时,触发负载均衡
29+
const LOAD_IMBALANCE_THRESHOLD: u64 = 25;
30+
31+
/// ## 上次负载均衡时间(全局)
32+
static LAST_BALANCE_TIME: AtomicU64 = AtomicU64::new(0);
33+
34+
/// ## 负载均衡是否已启用
35+
/// 在SMP初始化完成后才启用负载均衡
36+
static LOAD_BALANCE_ENABLED: AtomicBool = AtomicBool::new(false);
37+
38+
/// ## 启用负载均衡
39+
/// 应该在SMP初始化完成后调用
40+
pub fn enable_load_balance() {
41+
LOAD_BALANCE_ENABLED.store(true, Ordering::SeqCst);
42+
}
43+
44+
/// ## 检查负载均衡是否已启用
45+
#[inline]
46+
fn is_load_balance_enabled() -> bool {
47+
LOAD_BALANCE_ENABLED.load(Ordering::Relaxed)
48+
}
49+
50+
/// ## 负载均衡器
51+
pub struct LoadBalancer;
52+
53+
impl LoadBalancer {
54+
/// 选择任务唤醒时的目标CPU
55+
///
56+
/// 这个函数在任务被唤醒时调用,用于选择最适合运行该任务的CPU。
57+
/// 选择策略:
58+
/// 1. 如果负载均衡未启用,保持在原CPU(不改变行为)
59+
/// 2. 如果当前CPU负载较低,选择当前CPU(缓存亲和性)
60+
/// 3. 如果原CPU负载较低,选择原CPU
61+
/// 4. 否则选择负载最低的CPU
62+
pub fn select_task_rq(
63+
pcb: &Arc<ProcessControlBlock>,
64+
prev_cpu: ProcessorId,
65+
_wake_flags: u8,
66+
) -> ProcessorId {
67+
// 如果负载均衡未启用,保持在原CPU(与原有行为一致)
68+
if !is_load_balance_enabled() {
69+
return prev_cpu;
70+
}
71+
72+
let current_cpu = smp_get_processor_id();
73+
let cpu_manager = smp_cpu_manager();
74+
75+
let present_cpus = cpu_manager.present_cpus();
76+
77+
if cpu_manager.present_cpus_count() <= 1 {
78+
return current_cpu;
79+
}
80+
81+
// 如果是IDLE策略,尝试找一个空闲CPU
82+
if pcb.sched_info().policy() == SchedPolicy::IDLE {
83+
return Self::find_idlest_cpu_lockless(present_cpus, current_cpu);
84+
}
85+
86+
let current_rq = cpu_rq(current_cpu.data() as usize);
87+
let current_load = Self::get_rq_load_lockless(&current_rq);
88+
89+
// 如果有原CPU信息且在present_cpus中
90+
if prev_cpu != ProcessorId::INVALID
91+
&& prev_cpu != current_cpu
92+
&& present_cpus.get(prev_cpu).unwrap_or(false)
93+
{
94+
let prev_rq = cpu_rq(prev_cpu.data() as usize);
95+
let prev_load = Self::get_rq_load_lockless(&prev_rq);
96+
97+
// 如果当前CPU负载低于原CPU,选择当前CPU
98+
if current_load < prev_load {
99+
return current_cpu;
100+
}
101+
102+
// 如果原CPU负载不高,保持缓存亲和性
103+
if prev_load <= 2 {
104+
return prev_cpu;
105+
}
106+
}
107+
108+
// 如果当前CPU负载低,直接使用当前cpu即可
109+
if current_load <= 1 {
110+
return current_cpu;
111+
}
112+
113+
Self::find_idlest_cpu_lockless(present_cpus, current_cpu)
114+
}
115+
116+
/// ## 找到负载最低的CPU(不加锁)
117+
fn find_idlest_cpu_lockless(possible_cpus: &CpuMask, fallback: ProcessorId) -> ProcessorId {
118+
let mut min_load = u64::MAX;
119+
let mut idlest_cpu = fallback;
120+
121+
for cpu in possible_cpus.iter_cpu() {
122+
let rq = cpu_rq(cpu.data() as usize);
123+
let load = Self::get_rq_load_lockless(&rq);
124+
125+
if load < min_load {
126+
min_load = load;
127+
idlest_cpu = cpu;
128+
129+
// 如果找到完全空闲的CPU,直接返回
130+
if load == 0 {
131+
break;
132+
}
133+
}
134+
}
135+
136+
idlest_cpu
137+
}
138+
139+
/// ## 获取运行队列的负载(不加锁)
140+
#[inline]
141+
fn get_rq_load_lockless(rq: &Arc<CpuRunQueue>) -> u64 {
142+
// 使用 nr_running_lockless 方法,不需要锁定
143+
// 因为这只是用于负载均衡决策的估算值
144+
rq.nr_running_lockless() as u64
145+
}
146+
147+
/// ## 检查是否需要进行负载均衡
148+
pub fn should_balance() -> bool {
149+
// 如果负载均衡未启用,直接返回false
150+
if !is_load_balance_enabled() {
151+
return false;
152+
}
153+
154+
let now = clock();
155+
let last = LAST_BALANCE_TIME.load(Ordering::Relaxed);
156+
157+
if now.saturating_sub(last) >= LOAD_BALANCE_INTERVAL {
158+
// 尝试更新时间戳,避免多个CPU同时进行负载均衡
159+
LAST_BALANCE_TIME
160+
.compare_exchange(last, now, Ordering::SeqCst, Ordering::Relaxed)
161+
.is_ok()
162+
} else {
163+
false
164+
}
165+
}
166+
167+
/// ## 执行负载均衡
168+
///
169+
/// 这个函数由scheduler_tick调用,检查并执行CPU之间的负载均衡
170+
pub fn run_load_balance() {
171+
// 如果负载均衡未启用,直接返回
172+
if !is_load_balance_enabled() {
173+
return;
174+
}
175+
176+
let cpu_manager = smp_cpu_manager();
177+
178+
if cpu_manager.present_cpus_count() <= 1 {
179+
return;
180+
}
181+
182+
let current_cpu = smp_get_processor_id();
183+
let current_rq = cpu_rq(current_cpu.data() as usize);
184+
185+
// 获取当前CPU的负载(不加锁)
186+
let current_load = Self::get_rq_load_lockless(&current_rq);
187+
188+
// 如果当前CPU负载很高,不主动拉取任务
189+
if current_load > 2 {
190+
return;
191+
}
192+
193+
let (busiest_cpu, busiest_load) =
194+
Self::find_busiest_cpu_lockless(cpu_manager.present_cpus());
195+
196+
// 如果没有负载不均衡,返回
197+
if busiest_cpu == current_cpu || busiest_load <= current_load + 1 {
198+
return;
199+
}
200+
201+
// 计算负载差距
202+
let load_diff = busiest_load.saturating_sub(current_load);
203+
let avg_load = (busiest_load + current_load) / 2;
204+
205+
if avg_load == 0 {
206+
return;
207+
}
208+
209+
// 检查负载不均衡是否超过阈值
210+
let imbalance_pct = (load_diff * 100) / avg_load;
211+
if imbalance_pct < LOAD_IMBALANCE_THRESHOLD {
212+
return;
213+
}
214+
215+
// 尝试从最忙的CPU迁移任务
216+
Self::migrate_tasks(busiest_cpu, current_cpu, load_diff / 2);
217+
}
218+
219+
/// ## 找到负载最高的CPU(不加锁)
220+
fn find_busiest_cpu_lockless(possible_cpus: &CpuMask) -> (ProcessorId, u64) {
221+
let mut max_load = 0u64;
222+
let mut busiest_cpu = smp_get_processor_id();
223+
224+
for cpu in possible_cpus.iter_cpu() {
225+
let rq = cpu_rq(cpu.data() as usize);
226+
let load = Self::get_rq_load_lockless(&rq);
227+
228+
if load > max_load {
229+
max_load = load;
230+
busiest_cpu = cpu;
231+
}
232+
}
233+
234+
(busiest_cpu, max_load)
235+
}
236+
237+
/// ## 从源CPU迁移任务到目标CPU
238+
///
239+
/// 注意:当前版本暂时禁用任务迁移功能,因为需要更复杂的 CFS 队列引用更新逻辑。
240+
/// 目前只启用唤醒时的 CPU 选择功能。
241+
#[allow(dead_code)]
242+
fn migrate_tasks(_src_cpu: ProcessorId, _dst_cpu: ProcessorId, _nr_migrate: u64) {
243+
// TODO: 实现安全的任务迁移
244+
// 当前暂时禁用,因为直接修改 CFS 引用会破坏调度器状态
245+
}
246+
247+
/// ## 执行单个任务的迁移
248+
///
249+
/// 注意:当前未使用,因为任务迁移功能暂时禁用
250+
#[allow(dead_code)]
251+
fn do_migrate_task(
252+
pcb: &Arc<ProcessControlBlock>,
253+
src_rq: &mut CpuRunQueue,
254+
dst_rq: &mut CpuRunQueue,
255+
dst_cpu: ProcessorId,
256+
) {
257+
// 从源队列出队
258+
src_rq.dequeue_task(pcb.clone(), DequeueFlag::DEQUEUE_NOCLOCK);
259+
260+
// 更新任务的CPU信息
261+
pcb.sched_info().set_on_cpu(Some(dst_cpu));
262+
263+
// 注意:不要直接修改 CFS 引用,让 enqueue_task 处理
264+
// 因为直接修改会破坏调度器的内部状态
265+
266+
// 加入目标队列
267+
dst_rq.enqueue_task(
268+
pcb.clone(),
269+
EnqueueFlag::ENQUEUE_WAKEUP | EnqueueFlag::ENQUEUE_MIGRATED,
270+
);
271+
}
272+
}

kernel/src/sched/mod.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod completion;
33
pub mod cputime;
44
pub mod fair;
55
pub mod idle;
6+
pub mod load_balance;
67
pub mod pelt;
78
pub mod prio;
89
pub mod syscall;
@@ -397,6 +398,12 @@ impl CpuRunQueue {
397398
guard
398399
}
399400

401+
/// 获取运行队列的任务数(不加锁),用于负载均衡决策
402+
#[inline]
403+
pub fn nr_running_lockless(&self) -> usize {
404+
self.nr_running
405+
}
406+
400407
pub fn enqueue_task(&mut self, pcb: Arc<ProcessControlBlock>, flags: EnqueueFlag) {
401408
if !flags.contains(EnqueueFlag::ENQUEUE_NOCLOCK) {
402409
self.update_rq_clock();
@@ -455,7 +462,10 @@ impl CpuRunQueue {
455462
}
456463

457464
if flags.contains(EnqueueFlag::ENQUEUE_MIGRATED) {
458-
todo!()
465+
// 任务被迁移到新的CPU,需要更新其CFS队列引用
466+
// 在入队之前更新,确保 h_nr_running 计数器在正确的队列上增加
467+
let se = pcb.sched_info().sched_entity();
468+
se.force_mut().set_cfs(Arc::downgrade(&self.cfs));
459469
}
460470

461471
self.enqueue_task(pcb.clone(), flags);
@@ -814,7 +824,11 @@ pub fn scheduler_tick() {
814824
rq.calculate_global_load_tick();
815825

816826
drop(guard);
817-
// TODO:处理负载均衡
827+
828+
//todo 检查并执行负载均衡(只检测不均衡,migrate_tasks 内部是空的)
829+
if load_balance::LoadBalancer::should_balance() {
830+
load_balance::LoadBalancer::run_load_balance();
831+
}
818832
}
819833

820834
/// ## 执行调度

0 commit comments

Comments
 (0)