diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 9dde4b85ea21..dbfc569dc999 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -2201,7 +2201,7 @@ fn subnet_heap_delta_capacity( /// Aborts the paused execution, if any, of the given canister. /// -/// If a paused execution was aborted, resets the canister's priority credit to +/// If a paused execution was aborted, resets the canister's executed rounds to /// zero. Canisters must not be charged for aborted DTS executions. fn abort_canister( canister: &mut Arc, @@ -2214,7 +2214,7 @@ fn abort_canister( // Reset `executed_slices` to zero. subnet_schedule .get_mut(canister.canister_id()) - .executed_slices = 0; + .executed_rounds = 0; } } diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 27f0da3d6c30..72e9b04cf501 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -46,8 +46,9 @@ pub(super) struct CanisterRoundState { accumulated_priority: AccumulatedPriority, /// Copy of the canister's `SchedulerState::compute_allocation`. compute_allocation: AccumulatedPriority, - /// Number of DTS slices executed so far in the current long execution. - executed_slices: i64, + /// Number of rounds during which the current long execution has executed at + /// least one slice. + executed_rounds: i64, /// The round when the current long execution started. `None` means the canister /// is not in a long execution. long_execution_start_round: Option, @@ -70,7 +71,7 @@ impl CanisterRoundState { canister_id: canister.canister_id(), accumulated_priority: canister_priority.accumulated_priority + compute_allocation, compute_allocation, - executed_slices: canister_priority.executed_slices, + executed_rounds: canister_priority.executed_rounds, long_execution_start_round: canister_priority.long_execution_start_round, } } @@ -96,16 +97,16 @@ impl Ord for CanisterRoundState { .cmp(&self.accumulated_priority) .then_with(|| self.canister_id.cmp(&other.canister_id)), - // Among long executions, sort by executed slices; AP descending; start round + // Among long executions, sort by executed rounds; AP descending; start round // ascending; then break ties by canister ID. // - // An aborted execution (executed slices == 0) is considered to have the same - // priority as a newly started long execution (executed slices == 1). This is to + // An aborted execution (executed rounds == 0) is considered to have the same + // priority as a newly started long execution (executed rounds == 1). This is to // avoid starvation of aborted executions. (Some(self_start_round), Some(other_start_round)) => other - .executed_slices + .executed_rounds .max(1) - .cmp(&self.executed_slices.max(1)) + .cmp(&self.executed_rounds.max(1)) .then_with(|| other.accumulated_priority.cmp(&self.accumulated_priority)) .then_with(|| self_start_round.cmp(&other_start_round)) .then_with(|| self.canister_id.cmp(&other.canister_id)), @@ -444,7 +445,7 @@ impl RoundSchedule { /// /// * Grants canisters their compute allocations; charges for full executions; /// then calculates the subnet-wide free allocation and distributes it. - /// * Applies the priority credit where possible (no long execution). + /// * Charges for executed rounds where possible (no long execution). /// * Observes round-level metrics. pub fn finish_round( &self, @@ -457,14 +458,14 @@ impl RoundSchedule { // Update fully executed canisters' priorities. for canister_id in self.fully_executed_canisters.iter() { let canister_priority = subnet_schedule.get_mut(*canister_id); - canister_priority.executed_slices += 1; + canister_priority.executed_rounds += 1; canister_priority.last_full_execution_round = current_round; } - // Grant all canisters their compute allocation; apply the priority credit + // Grant all canisters their compute allocation; charge for executed rounds // where possible (no long execution); and calculate the subnet-wide free // allocation (as the deviation from zero of all canisters' total accumulated - // priority, including priority credit). + // priority, including executed rounds). let mut free_allocation = ZERO; for canister in canister_states.values() { // Add the canister to the subnet schedule, if not already there. @@ -472,26 +473,26 @@ impl RoundSchedule { canister_priority.accumulated_priority += from_ca(canister.compute_allocation()); - // On message completion (or short execution), charge for the executed slices. - if canister_priority.executed_slices > 0 + // On message completion (or short execution), charge for the executed rounds. + if canister_priority.executed_rounds > 0 && (!canister.has_long_execution() || self .canisters_with_completed_messages .contains(&canister.canister_id())) { canister_priority.accumulated_priority -= - ONE_HUNDRED_PERCENT * canister_priority.executed_slices; - canister_priority.executed_slices = 0; + ONE_HUNDRED_PERCENT * canister_priority.executed_rounds; + canister_priority.executed_rounds = 0; } free_allocation -= canister_priority.accumulated_priority - - ONE_HUNDRED_PERCENT * canister_priority.executed_slices; + - ONE_HUNDRED_PERCENT * canister_priority.executed_rounds; } self.grant_heap_delta_and_install_code_credits(state, metrics); // Only ever apply positive free allocation. If the sum of all canisters' - // accumulated priorities (including priority credit) is somehow positive + // accumulated priorities (including executed rounds) is somehow positive // (although this should never happen), then there is simply no free allocation // to distribute. if free_allocation.get() < 0 { diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index e6f523856e4d..a16fa07a84ec 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -69,7 +69,7 @@ fn state_sync_clears_paused_execution_registry() { test.state_mut().put_canister_state(clean_canister); let canister_priority = test.state_mut().canister_priority_mut(canister); canister_priority.long_execution_start_round = None; - canister_priority.executed_slices = 0; + canister_priority.executed_rounds = 0; assert!(!test.canister_state(canister).has_long_execution()); // Execute another round. The scheduler detects that no canister has a paused diff --git a/rs/execution_environment/src/scheduler/tests/dts.rs b/rs/execution_environment/src/scheduler/tests/dts.rs index 91d4b074208d..c8cd528c5c03 100644 --- a/rs/execution_environment/src/scheduler/tests/dts.rs +++ b/rs/execution_environment/src/scheduler/tests/dts.rs @@ -202,16 +202,16 @@ fn dts_long_execution_aborted_after_checkpoint() { test.execute_round(ExecutionRoundType::OrdinaryRound); - // Canister has a paused execution and non-zero priority credit. + // Canister has a paused execution and non-zero executed rounds. assert!(test.canister_state(canister).has_paused_execution()); - assert_ne!(test.state().canister_priority(&canister).executed_slices, 0); + assert_ne!(test.state().canister_priority(&canister).executed_rounds, 0); test.execute_round(ExecutionRoundType::CheckpointRound); // After a checkpoint round, the canister has an aborted execution and zero - // priority credit. + // executed rounds. assert!(test.canister_state(canister).has_aborted_execution()); - assert_eq!(test.state().canister_priority(&canister).executed_slices, 0); + assert_eq!(test.state().canister_priority(&canister).executed_rounds, 0); // Complete the long execution. for _ in 0..3 { @@ -222,10 +222,10 @@ fn dts_long_execution_aborted_after_checkpoint() { ErrorCode::CanisterDidNotReply, ); - // After completion, there is no paused or aborted execution. And the priority - // credit is again zero. + // After completion, there is no paused or aborted execution. And executed + // rounds is again zero. assert!(!test.canister_state(canister).has_long_execution()); - assert_eq!(test.state().canister_priority(&canister).executed_slices, 0); + assert_eq!(test.state().canister_priority(&canister).executed_rounds, 0); // 2 + 3 slices were executed. assert_eq!(test.scheduler().metrics.round.slices.get_sample_sum(), 5.0); @@ -304,12 +304,12 @@ fn respect_max_paused_executions( .filter(|canister| { let priority = subnet_schedule.get(&canister.canister_id()); if canister.has_paused_execution() { - // All paused executions have non-zero priority credit. - assert_ne!(priority.executed_slices, 0); + // All paused executions have non-zero executed rounds. + assert_ne!(priority.executed_rounds, 0); true } else { - // All aborted (or not started) executions have zero priority credit. - assert_eq!(priority.executed_slices, 0); + // All aborted (or not started) executions have zero executed rounds. + assert_eq!(priority.executed_rounds, 0); false } }) diff --git a/rs/execution_environment/src/scheduler/tests/scheduling.rs b/rs/execution_environment/src/scheduler/tests/scheduling.rs index fb670427060f..d379da58b06a 100644 --- a/rs/execution_environment/src/scheduler/tests/scheduling.rs +++ b/rs/execution_environment/src/scheduler/tests/scheduling.rs @@ -945,13 +945,16 @@ fn inner_round_first_execution_is_not_a_full_execution() { } } let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; + let mut total_executed_rounds = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; + total_executed_rounds += canister_priority.executed_rounds; } // The accumulated priority invariant should be respected. - assert_eq!(total_accumulated_priority - total_priority_credit, 0); + assert_eq!( + total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds, + 0 + ); } #[test] @@ -1004,13 +1007,16 @@ fn inner_round_long_execution_is_a_full_execution() { assert_eq!(priority.last_full_execution_round, test.last_round()); } let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; + let mut total_executed_rounds = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; + total_executed_rounds += canister_priority.executed_rounds; } // The accumulated priority invariant should be respected. - assert_eq!(total_accumulated_priority - total_priority_credit, 0); + assert_eq!( + total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds, + 0 + ); } #[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })] @@ -1058,12 +1064,15 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: } } let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; + let mut total_executed_rounds = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; + total_executed_rounds += canister_priority.executed_rounds; } - prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); + prop_assert_eq!( + total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds, + 0 + ); // Send one more message for first half of the canisters. for (i, canister) in canister_ids.iter().enumerate() { @@ -1103,12 +1112,15 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: } } let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; + let mut total_executed_rounds = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; + total_executed_rounds += canister_priority.executed_rounds; } - prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); + prop_assert_eq!( + total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds, + 0 + ); } #[test] @@ -1159,19 +1171,22 @@ fn charge_idle_canisters_for_full_execution_round() { } } let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; + let mut total_executed_rounds = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { // Assert there is no divergency in accumulated priorities. let priority = canister_priority.accumulated_priority - - ONE_HUNDRED_PERCENT * canister_priority.executed_slices; + - ONE_HUNDRED_PERCENT * canister_priority.executed_rounds; assert_le!(priority.get(), 100 * MULTIPLIER); assert_ge!(priority.get(), -100 * MULTIPLIER); total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; + total_executed_rounds += canister_priority.executed_rounds; } // The accumulated priority invariant should be respected. - assert_eq!(total_accumulated_priority - total_priority_credit, 0); + assert_eq!( + total_accumulated_priority - ONE_HUNDRED_PERCENT.get() * total_executed_rounds, + 0 + ); } } diff --git a/rs/protobuf/def/state/metadata/v1/metadata.proto b/rs/protobuf/def/state/metadata/v1/metadata.proto index 467ec8723529..c224a3a59ffb 100644 --- a/rs/protobuf/def/state/metadata/v1/metadata.proto +++ b/rs/protobuf/def/state/metadata/v1/metadata.proto @@ -415,7 +415,7 @@ message SystemMetadata { message CanisterPriority { types.v1.CanisterId canister_id = 1; int64 accumulated_priority = 2; - int64 executed_slices = 3; + int64 executed_rounds = 3; optional uint64 long_execution_start_round = 4; uint64 last_full_execution_round = 5; } diff --git a/rs/protobuf/src/gen/state/state.metadata.v1.rs b/rs/protobuf/src/gen/state/state.metadata.v1.rs index e9fd823aaf7d..c4dfef370a71 100644 --- a/rs/protobuf/src/gen/state/state.metadata.v1.rs +++ b/rs/protobuf/src/gen/state/state.metadata.v1.rs @@ -599,7 +599,7 @@ pub struct CanisterPriority { #[prost(int64, tag = "2")] pub accumulated_priority: i64, #[prost(int64, tag = "3")] - pub executed_slices: i64, + pub executed_rounds: i64, #[prost(uint64, optional, tag = "4")] pub long_execution_start_round: ::core::option::Option, #[prost(uint64, tag = "5")] diff --git a/rs/replicated_state/src/metadata_state/proto.rs b/rs/replicated_state/src/metadata_state/proto.rs index 72e0b2a248d7..e9d668608cbf 100644 --- a/rs/replicated_state/src/metadata_state/proto.rs +++ b/rs/replicated_state/src/metadata_state/proto.rs @@ -376,7 +376,7 @@ impl From<&SystemMetadata> for pb_metadata::SystemMetadata { .map(|(canister_id, priority)| pb_metadata::CanisterPriority { canister_id: Some(pb_types::CanisterId::from(*canister_id)), accumulated_priority: priority.accumulated_priority.get(), - executed_slices: priority.executed_slices, + executed_rounds: priority.executed_rounds, long_execution_start_round: priority .long_execution_start_round .map(|round| round.get()), @@ -426,7 +426,7 @@ impl canister_id, CanisterPriority { accumulated_priority: AccumulatedPriority::new(entry.accumulated_priority), - executed_slices: entry.executed_slices, + executed_rounds: entry.executed_rounds, long_execution_start_round: entry .long_execution_start_round .map(ExecutionRound::new), diff --git a/rs/replicated_state/src/metadata_state/subnet_schedule.rs b/rs/replicated_state/src/metadata_state/subnet_schedule.rs index a0bb5cbcb622..bc38eab72412 100644 --- a/rs/replicated_state/src/metadata_state/subnet_schedule.rs +++ b/rs/replicated_state/src/metadata_state/subnet_schedule.rs @@ -13,13 +13,15 @@ pub struct CanisterPriority { /// in the vector d that corresponds to this canister. pub accumulated_priority: AccumulatedPriority, - /// Number of DTS slices executed so far in the current long execution, if any. - /// (Also used transiently by `finish_round()` to charge for full executions.) + /// Number of rounds during which the current long execution, if any, has + /// executed at least one slice. (Also used transiently by `finish_round()` to + /// charge for full executions.) /// - /// During a long execution, this is incremented for each DTS slice executed. - /// In the meantime, the canister accumulates priority normally. It is only - /// charged for these slices when the long execution completes. - pub executed_slices: i64, + /// During a long execution, this is incremented for each round in which the + /// canister was executed. In the meantime, the canister accumulates priority + /// normally. It is only charged for these rounds when the long execution + /// completes. + pub executed_rounds: i64, /// The round when the current long execution started. `None` means the canister /// is not in a long execution. @@ -36,7 +38,7 @@ impl CanisterPriority { /// subnet schedule. pub const DEFAULT: CanisterPriority = CanisterPriority { accumulated_priority: AccumulatedPriority::new(0), - executed_slices: 0, + executed_rounds: 0, long_execution_start_round: None, last_full_execution_round: ExecutionRound::new(0), }; diff --git a/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs b/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs index 2bd4ccda7c88..b399c7d69d47 100644 --- a/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs +++ b/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs @@ -36,7 +36,7 @@ fn get() { fn validate_eq() { let some_priority = CanisterPriority { accumulated_priority: AccumulatedPriority::new(1), - executed_slices: 2, + executed_rounds: 2, long_execution_start_round: Some(ExecutionRound::new(3)), last_full_execution_round: ExecutionRound::new(4), }; diff --git a/rs/replicated_state/src/metadata_state/tests.rs b/rs/replicated_state/src/metadata_state/tests.rs index 8efa21d2ed86..1c02fc8e824d 100644 --- a/rs/replicated_state/src/metadata_state/tests.rs +++ b/rs/replicated_state/src/metadata_state/tests.rs @@ -391,7 +391,7 @@ fn system_metadata_roundtrip_encoding() { .subnet_schedule .get_mut(CanisterId::from_u64(1)) = CanisterPriority { accumulated_priority: 100.into(), - executed_slices: 2, + executed_rounds: 2, long_execution_start_round: Some(3.into()), last_full_execution_round: 4.into(), }; @@ -435,7 +435,7 @@ fn subnet_schedule_backward_compatibility() { let mut subnet_schedule = SubnetSchedule::default(); *subnet_schedule.get_mut(CanisterId::from_u64(1)) = CanisterPriority { accumulated_priority: 100.into(), - executed_slices: 2, + executed_rounds: 2, long_execution_start_round: Some(3.into()), last_full_execution_round: 4.into(), }; diff --git a/rs/state_manager/src/checkpoint.rs b/rs/state_manager/src/checkpoint.rs index 8c7616cb7ff5..4f8886f309bb 100644 --- a/rs/state_manager/src/checkpoint.rs +++ b/rs/state_manager/src/checkpoint.rs @@ -889,8 +889,8 @@ pub fn load_canister_state( }; let priority = CanisterPriority { accumulated_priority: canister_state_bits.accumulated_priority, - // We can only be loading an aborted execution, so zero executed slices. - executed_slices: 0, + // We can only be loading an aborted execution, so zero executed rounds. + executed_rounds: 0, // Set a long execution start round where necessary. long_execution_start_round: if canister_state.has_long_execution() { Some(0.into()) diff --git a/rs/state_manager/src/tip.rs b/rs/state_manager/src/tip.rs index b734d0f9405b..33ef6976331a 100644 --- a/rs/state_manager/src/tip.rs +++ b/rs/state_manager/src/tip.rs @@ -1210,7 +1210,7 @@ fn serialize_canister_protos_to_checkpoint_readwrite( // Ignored after the first checkpoint load during an upgrade. last_full_execution_round: 0.into(), compute_allocation: canister_state.compute_allocation(), - // Any long execution must have been aborted, priority credit is always zero. + // Value is ignored when loading. priority_credit: 0.into(), // Value is ignored when loading. long_execution_mode: Default::default(), diff --git a/rs/types/types/src/lib.rs b/rs/types/types/src/lib.rs index b4662d7ced54..89cb73519033 100644 --- a/rs/types/types/src/lib.rs +++ b/rs/types/types/src/lib.rs @@ -30,38 +30,37 @@ // // Public Specification of IC describes `compute_allocation`. Each canister is // initiated with an `accumulated_priority` of 0. The scheduler uses these values -// while calculating the priority of a canister at each round. The canisters +// while calculating the priority of a canister at each round. Active canisters // are scheduled at each round in the following way: // // * For each canister, we compute the `round_priority` of that canister as the -// sum of its `accumulated_priority` and the multiplication of its -// `compute_allocation` with the `multiplier` (see the scheduler). -// * We distribute the free capacity equally to all the canisters. +// sum of its `accumulated_priority` and its `compute_allocation`. // * For new executions: // - We sort the canisters according to their round priorities in // descending order. // * For pending long executions: -// - Sort the canisters first according to their execution mode, -// and then round priorities. +// - Sort the canisters by rounds already executed, round priority, and start +// round of the long execution. // - Calculate how many scheduler cores we dedicate for long executions // in this round using compute allocations of these long executions. -// - The first `long_execution_cores` many canisters are given the top -// priority in this round and get into the prioritized long execution mode. -// - The rest of the long executions are given an opportunity to be executed -// by scheduling them at the very end. -// * The first `scheduler_cores` many canisters are given the top priority in -// this round. Therefore, they are expected to be executed as the first of -// their threads. -// * As the last step, we credit the first `scheduler_cores` canisters -// with the sum of compute allocations of all canisters times `multiplier` -// divided by the number of canisters that are given top priority in -// this round. This `priority_credit` will be subtracted from the -// `accumulated_priority` at the end of the execution or at the checkpoint. +// * Allocate round-robin the new executions to the new execution cores; and the +// long executions to the long execution cores. +// * The first canister on each core receives a "full execution round" by +// definition, whether or not it consumes all its inputs. +// * [Canisters are executed in parallel on cores, in order of priority.] +// * After execution, all canisters that consumed all their inputs or executed a +// long execution slice are also considered to have received a "full execution +// round". +// * "Fully executed" canisters that do not have a long execution in progress +// are charged 100 accumulated priority. "Fully executed" canisters with a +// long execution in progress have their "rounds executed" incremented; they +// are charged for all these rounds when they complete their long execution. +// * We grant active canisters their compute allocation. +// * We distribute the free capacity equally to all active canisters. // -// As a result, at each round, the sum of accumulated priorities minus -// the sum of priority credits remains 0. -// Similarly, the sum of all round priorities equals to the multiplication of -// the sum of all compute allocations with the multiplier. +// As a result, after each round, the sum of accumulated priorities is zero (or +// positive, if we granted more compute allocation than we charged for full +// executions). pub mod artifact; pub mod batch;