Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2201,7 +2201,7 @@ fn subnet_heap_delta_capacity(

/// Aborts the paused execution, if any, of the given canister.
///
/// If a paused execution was aborted, resets the canister's priority credit to
/// If a paused execution was aborted, resets the canister's executed rounds to
/// zero. Canisters must not be charged for aborted DTS executions.
fn abort_canister(
canister: &mut Arc<CanisterState>,
Expand All @@ -2214,7 +2214,7 @@ fn abort_canister(
// Reset `executed_slices` to zero.
subnet_schedule
.get_mut(canister.canister_id())
.executed_slices = 0;
.executed_rounds = 0;
}
}

Expand Down
37 changes: 19 additions & 18 deletions rs/execution_environment/src/scheduler/round_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ pub(super) struct CanisterRoundState {
accumulated_priority: AccumulatedPriority,
/// Copy of the canister's `SchedulerState::compute_allocation`.
compute_allocation: AccumulatedPriority,
/// Number of DTS slices executed so far in the current long execution.
executed_slices: i64,
/// Number of rounds during which the current long execution has executed at
/// least one slice.
executed_rounds: i64,
/// The round when the current long execution started. `None` means the canister
/// is not in a long execution.
long_execution_start_round: Option<ExecutionRound>,
Expand All @@ -70,7 +71,7 @@ impl CanisterRoundState {
canister_id: canister.canister_id(),
accumulated_priority: canister_priority.accumulated_priority + compute_allocation,
compute_allocation,
executed_slices: canister_priority.executed_slices,
executed_rounds: canister_priority.executed_rounds,
long_execution_start_round: canister_priority.long_execution_start_round,
}
}
Expand All @@ -96,16 +97,16 @@ impl Ord for CanisterRoundState {
.cmp(&self.accumulated_priority)
.then_with(|| self.canister_id.cmp(&other.canister_id)),

// Among long executions, sort by executed slices; AP descending; start round
// Among long executions, sort by executed rounds; AP descending; start round
// ascending; then break ties by canister ID.
//
// An aborted execution (executed slices == 0) is considered to have the same
// priority as a newly started long execution (executed slices == 1). This is to
// An aborted execution (executed rounds == 0) is considered to have the same
// priority as a newly started long execution (executed rounds == 1). This is to
// avoid starvation of aborted executions.
(Some(self_start_round), Some(other_start_round)) => other
.executed_slices
.executed_rounds
.max(1)
.cmp(&self.executed_slices.max(1))
.cmp(&self.executed_rounds.max(1))
.then_with(|| other.accumulated_priority.cmp(&self.accumulated_priority))
.then_with(|| self_start_round.cmp(&other_start_round))
.then_with(|| self.canister_id.cmp(&other.canister_id)),
Expand Down Expand Up @@ -444,7 +445,7 @@ impl RoundSchedule {
///
/// * Grants canisters their compute allocations; charges for full executions;
/// then calculates the subnet-wide free allocation and distributes it.
/// * Applies the priority credit where possible (no long execution).
/// * Charges for executed rounds where possible (no long execution).
/// * Observes round-level metrics.
pub fn finish_round(
&self,
Expand All @@ -457,41 +458,41 @@ impl RoundSchedule {
// Update fully executed canisters' priorities.
for canister_id in self.fully_executed_canisters.iter() {
let canister_priority = subnet_schedule.get_mut(*canister_id);
canister_priority.executed_slices += 1;
canister_priority.executed_rounds += 1;
canister_priority.last_full_execution_round = current_round;
}

// Grant all canisters their compute allocation; apply the priority credit
// Grant all canisters their compute allocation; charge for executed rounds
// where possible (no long execution); and calculate the subnet-wide free
// allocation (as the deviation from zero of all canisters' total accumulated
// priority, including priority credit).
// priority, including executed rounds).
let mut free_allocation = ZERO;
for canister in canister_states.values() {
// Add the canister to the subnet schedule, if not already there.
let canister_priority = subnet_schedule.get_mut(canister.canister_id());

canister_priority.accumulated_priority += from_ca(canister.compute_allocation());

// On message completion (or short execution), charge for the executed slices.
if canister_priority.executed_slices > 0
// On message completion (or short execution), charge for the executed rounds.
if canister_priority.executed_rounds > 0
&& (!canister.has_long_execution()
|| self
.canisters_with_completed_messages
.contains(&canister.canister_id()))
{
canister_priority.accumulated_priority -=
ONE_HUNDRED_PERCENT * canister_priority.executed_slices;
canister_priority.executed_slices = 0;
ONE_HUNDRED_PERCENT * canister_priority.executed_rounds;
canister_priority.executed_rounds = 0;
}

free_allocation -= canister_priority.accumulated_priority
- ONE_HUNDRED_PERCENT * canister_priority.executed_slices;
- ONE_HUNDRED_PERCENT * canister_priority.executed_rounds;
}

self.grant_heap_delta_and_install_code_credits(state, metrics);

// Only ever apply positive free allocation. If the sum of all canisters'
// accumulated priorities (including priority credit) is somehow positive
// accumulated priorities (including executed rounds) is somehow positive
// (although this should never happen), then there is simply no free allocation
// to distribute.
if free_allocation.get() < 0 {
Expand Down
2 changes: 1 addition & 1 deletion rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn state_sync_clears_paused_execution_registry() {
test.state_mut().put_canister_state(clean_canister);
let canister_priority = test.state_mut().canister_priority_mut(canister);
canister_priority.long_execution_start_round = None;
canister_priority.executed_slices = 0;
canister_priority.executed_rounds = 0;
assert!(!test.canister_state(canister).has_long_execution());

// Execute another round. The scheduler detects that no canister has a paused
Expand Down
22 changes: 11 additions & 11 deletions rs/execution_environment/src/scheduler/tests/dts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,16 @@ fn dts_long_execution_aborted_after_checkpoint() {

test.execute_round(ExecutionRoundType::OrdinaryRound);

// Canister has a paused execution and non-zero priority credit.
// Canister has a paused execution and non-zero executed rounds.
assert!(test.canister_state(canister).has_paused_execution());
assert_ne!(test.state().canister_priority(&canister).executed_slices, 0);
assert_ne!(test.state().canister_priority(&canister).executed_rounds, 0);

test.execute_round(ExecutionRoundType::CheckpointRound);

// After a checkpoint round, the canister has an aborted execution and zero
// priority credit.
// executed rounds.
assert!(test.canister_state(canister).has_aborted_execution());
assert_eq!(test.state().canister_priority(&canister).executed_slices, 0);
assert_eq!(test.state().canister_priority(&canister).executed_rounds, 0);

// Complete the long execution.
for _ in 0..3 {
Expand All @@ -222,10 +222,10 @@ fn dts_long_execution_aborted_after_checkpoint() {
ErrorCode::CanisterDidNotReply,
);

// After completion, there is no paused or aborted execution. And the priority
// credit is again zero.
// After completion, there is no paused or aborted execution. And executed
// rounds is again zero.
assert!(!test.canister_state(canister).has_long_execution());
assert_eq!(test.state().canister_priority(&canister).executed_slices, 0);
assert_eq!(test.state().canister_priority(&canister).executed_rounds, 0);

// 2 + 3 slices were executed.
assert_eq!(test.scheduler().metrics.round.slices.get_sample_sum(), 5.0);
Expand Down Expand Up @@ -304,12 +304,12 @@ fn respect_max_paused_executions(
.filter(|canister| {
let priority = subnet_schedule.get(&canister.canister_id());
if canister.has_paused_execution() {
// All paused executions have non-zero priority credit.
assert_ne!(priority.executed_slices, 0);
// All paused executions have non-zero executed rounds.
assert_ne!(priority.executed_rounds, 0);
true
} else {
// All aborted (or not started) executions have zero priority credit.
assert_eq!(priority.executed_slices, 0);
// All aborted (or not started) executions have zero executed rounds.
assert_eq!(priority.executed_rounds, 0);
false
}
})
Expand Down
47 changes: 31 additions & 16 deletions rs/execution_environment/src/scheduler/tests/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,13 +945,16 @@ fn inner_round_first_execution_is_not_a_full_execution() {
}
}
let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
let mut total_executed_rounds = 0;
for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() {
total_accumulated_priority += canister_priority.accumulated_priority.get();
total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices;
total_executed_rounds += canister_priority.executed_rounds;
}
// The accumulated priority invariant should be respected.
assert_eq!(total_accumulated_priority - total_priority_credit, 0);
assert_eq!(
total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds,
0
);
}

#[test]
Expand Down Expand Up @@ -1004,13 +1007,16 @@ fn inner_round_long_execution_is_a_full_execution() {
assert_eq!(priority.last_full_execution_round, test.last_round());
}
let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
let mut total_executed_rounds = 0;
for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() {
total_accumulated_priority += canister_priority.accumulated_priority.get();
total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices;
total_executed_rounds += canister_priority.executed_rounds;
}
// The accumulated priority invariant should be respected.
assert_eq!(total_accumulated_priority - total_priority_credit, 0);
assert_eq!(
total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds,
0
);
}

#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })]
Expand Down Expand Up @@ -1058,12 +1064,15 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores:
}
}
let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
let mut total_executed_rounds = 0;
for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() {
total_accumulated_priority += canister_priority.accumulated_priority.get();
total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices;
total_executed_rounds += canister_priority.executed_rounds;
}
prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0);
prop_assert_eq!(
total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds,
0
);

// Send one more message for first half of the canisters.
for (i, canister) in canister_ids.iter().enumerate() {
Expand Down Expand Up @@ -1103,12 +1112,15 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores:
}
}
let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
let mut total_executed_rounds = 0;
for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() {
total_accumulated_priority += canister_priority.accumulated_priority.get();
total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices;
total_executed_rounds += canister_priority.executed_rounds;
}
prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0);
prop_assert_eq!(
total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds,
0
);
}

#[test]
Expand Down Expand Up @@ -1159,19 +1171,22 @@ fn charge_idle_canisters_for_full_execution_round() {
}
}
let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
let mut total_executed_rounds = 0;
for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() {
// Assert there is no divergency in accumulated priorities.
let priority = canister_priority.accumulated_priority
- ONE_HUNDRED_PERCENT * canister_priority.executed_slices;
- ONE_HUNDRED_PERCENT * canister_priority.executed_rounds;
assert_le!(priority.get(), 100 * MULTIPLIER);
assert_ge!(priority.get(), -100 * MULTIPLIER);

total_accumulated_priority += canister_priority.accumulated_priority.get();
total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices;
total_executed_rounds += canister_priority.executed_rounds;
}
// The accumulated priority invariant should be respected.
assert_eq!(total_accumulated_priority - total_priority_credit, 0);
assert_eq!(
total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds,
0
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rs/protobuf/def/state/metadata/v1/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ message SystemMetadata {
message CanisterPriority {
types.v1.CanisterId canister_id = 1;
int64 accumulated_priority = 2;
int64 executed_slices = 3;
int64 executed_rounds = 3;
optional uint64 long_execution_start_round = 4;
uint64 last_full_execution_round = 5;
}
Expand Down
2 changes: 1 addition & 1 deletion rs/protobuf/src/gen/state/state.metadata.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ pub struct CanisterPriority {
#[prost(int64, tag = "2")]
pub accumulated_priority: i64,
#[prost(int64, tag = "3")]
pub executed_slices: i64,
pub executed_rounds: i64,
#[prost(uint64, optional, tag = "4")]
pub long_execution_start_round: ::core::option::Option<u64>,
#[prost(uint64, tag = "5")]
Expand Down
4 changes: 2 additions & 2 deletions rs/replicated_state/src/metadata_state/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl From<&SystemMetadata> for pb_metadata::SystemMetadata {
.map(|(canister_id, priority)| pb_metadata::CanisterPriority {
canister_id: Some(pb_types::CanisterId::from(*canister_id)),
accumulated_priority: priority.accumulated_priority.get(),
executed_slices: priority.executed_slices,
executed_rounds: priority.executed_rounds,
long_execution_start_round: priority
.long_execution_start_round
.map(|round| round.get()),
Expand Down Expand Up @@ -426,7 +426,7 @@ impl
canister_id,
CanisterPriority {
accumulated_priority: AccumulatedPriority::new(entry.accumulated_priority),
executed_slices: entry.executed_slices,
executed_rounds: entry.executed_rounds,
long_execution_start_round: entry
.long_execution_start_round
.map(ExecutionRound::new),
Expand Down
16 changes: 9 additions & 7 deletions rs/replicated_state/src/metadata_state/subnet_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ pub struct CanisterPriority {
/// in the vector d that corresponds to this canister.
pub accumulated_priority: AccumulatedPriority,

/// Number of DTS slices executed so far in the current long execution, if any.
/// (Also used transiently by `finish_round()` to charge for full executions.)
/// Number of rounds during which the current long execution, if any, has
/// executed at least one slice. (Also used transiently by `finish_round()` to
/// charge for full executions.)
///
/// During a long execution, this is incremented for each DTS slice executed.
/// In the meantime, the canister accumulates priority normally. It is only
/// charged for these slices when the long execution completes.
pub executed_slices: i64,
/// During a long execution, this is incremented for each round in which the
/// canister was executed. In the meantime, the canister accumulates priority
/// normally. It is only charged for these rounds when the long execution
/// completes.
pub executed_rounds: i64,

/// The round when the current long execution started. `None` means the canister
/// is not in a long execution.
Expand All @@ -36,7 +38,7 @@ impl CanisterPriority {
/// subnet schedule.
pub const DEFAULT: CanisterPriority = CanisterPriority {
accumulated_priority: AccumulatedPriority::new(0),
executed_slices: 0,
executed_rounds: 0,
long_execution_start_round: None,
last_full_execution_round: ExecutionRound::new(0),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn get() {
fn validate_eq() {
let some_priority = CanisterPriority {
accumulated_priority: AccumulatedPriority::new(1),
executed_slices: 2,
executed_rounds: 2,
long_execution_start_round: Some(ExecutionRound::new(3)),
last_full_execution_round: ExecutionRound::new(4),
};
Expand Down
4 changes: 2 additions & 2 deletions rs/replicated_state/src/metadata_state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ fn system_metadata_roundtrip_encoding() {
.subnet_schedule
.get_mut(CanisterId::from_u64(1)) = CanisterPriority {
accumulated_priority: 100.into(),
executed_slices: 2,
executed_rounds: 2,
long_execution_start_round: Some(3.into()),
last_full_execution_round: 4.into(),
};
Expand Down Expand Up @@ -435,7 +435,7 @@ fn subnet_schedule_backward_compatibility() {
let mut subnet_schedule = SubnetSchedule::default();
*subnet_schedule.get_mut(CanisterId::from_u64(1)) = CanisterPriority {
accumulated_priority: 100.into(),
executed_slices: 2,
executed_rounds: 2,
long_execution_start_round: Some(3.into()),
last_full_execution_round: 4.into(),
};
Expand Down
4 changes: 2 additions & 2 deletions rs/state_manager/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,8 @@ pub fn load_canister_state(
};
let priority = CanisterPriority {
accumulated_priority: canister_state_bits.accumulated_priority,
// We can only be loading an aborted execution, so zero executed slices.
executed_slices: 0,
// We can only be loading an aborted execution, so zero executed rounds.
executed_rounds: 0,
// Set a long execution start round where necessary.
long_execution_start_round: if canister_state.has_long_execution() {
Some(0.into())
Expand Down
Loading
Loading