From 01df043637f147779b8846c006db1580d0da784a Mon Sep 17 00:00:00 2001 From: Ali Ibtehaj Date: Sun, 2 Feb 2025 17:47:08 +0000 Subject: [PATCH 1/8] is_general_kernel in command_graph_generator --- include/command_graph_generator.h | 3 + src/command_graph_generator.cc | 273 +++++++++++++++++----------- test/command_graph_general_tests.cc | 8 + 3 files changed, 181 insertions(+), 103 deletions(-) diff --git a/include/command_graph_generator.h b/include/command_graph_generator.h index d5afa3f37..8be887f37 100644 --- a/include/command_graph_generator.h +++ b/include/command_graph_generator.h @@ -93,6 +93,8 @@ class command_graph_generator { std::optional pending_reduction; std::string debug_name; + + const task* generator_task = nullptr; }; struct host_object_state { @@ -111,6 +113,7 @@ class command_graph_generator { }; public: + bool is_generator_kernel(const task& tsk) const; struct policy_set { error_policy uninitialized_read_error = error_policy::panic; error_policy overlapping_write_error = error_policy::panic; diff --git a/src/command_graph_generator.cc b/src/command_graph_generator.cc index c69618421..ecd0ea9a0 100644 --- a/src/command_graph_generator.cc +++ b/src/command_graph_generator.cc @@ -28,38 +28,38 @@ namespace celerity::detail { command_graph_generator::command_graph_generator( const size_t num_nodes, const node_id local_nid, command_graph& cdag, detail::command_recorder* const recorder, const policy_set& policy) : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(&cdag), m_recorder(recorder) { - if(m_num_nodes > max_num_nodes) { - throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes)); - } + if(m_num_nodes > max_num_nodes) { + throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes)); + } } void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) { - assert(m_epoch_for_new_commands != nullptr); - assert(!m_buffers.contains(bid)); - // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands). - // This is required when tasks access host-initialized or uninitialized buffers. - auto& buffer = m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, m_epoch_for_new_commands, node_bitset().set())).first->second; - if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = box(subrange({}, range)); } + assert(m_epoch_for_new_commands != nullptr); + assert(!m_buffers.contains(bid)); + // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands). + // This is required when tasks access host-initialized or uninitialized buffers. + auto& buffer = m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, m_epoch_for_new_commands, node_bitset().set())).first->second; + if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = box(subrange({}, range)); } } void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) { - m_buffers.at(bid).debug_name = debug_name; + m_buffers.at(bid).debug_name = debug_name; } void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) { - assert(m_buffers.contains(bid)); - m_buffers.erase(bid); + assert(m_buffers.contains(bid)); + m_buffers.erase(bid); } void command_graph_generator::notify_host_object_created(const host_object_id hoid) { - assert(m_epoch_for_new_commands != nullptr); - assert(!m_host_objects.contains(hoid)); - m_host_objects.emplace(hoid, m_epoch_for_new_commands); + assert(m_epoch_for_new_commands != nullptr); + assert(!m_host_objects.contains(hoid)); + m_host_objects.emplace(hoid, m_epoch_for_new_commands); } void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) { - assert(m_host_objects.contains(hoid)); - m_host_objects.erase(hoid); + assert(m_host_objects.contains(hoid)); + m_host_objects.erase(hoid); } /// Returns whether an iterator range of commands is topologically sorted, i.e. sequential execution would satisfy all internal dependencies. @@ -73,47 +73,64 @@ bool is_topologically_sorted(Iterator begin, Iterator end) { return true; } -std::vector command_graph_generator::build_task(const task& tsk) { - const auto epoch_to_prune_before = m_epoch_for_new_commands; - batch current_batch; - - switch(tsk.get_type()) { - case task_type::epoch: generate_epoch_command(current_batch, tsk); break; - case task_type::horizon: generate_horizon_command(current_batch, tsk); break; - case task_type::device_compute: - case task_type::host_compute: - case task_type::master_node: - case task_type::collective: - case task_type::fence: generate_distributed_commands(current_batch, tsk); break; - default: throw std::runtime_error("Task type NYI"); - } - - // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node: - // - Per-node reduction intermediate results are stored with fixed access to a single backing buffer, - // so multiple chunks on the same node will race on this buffer access - // - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish - // more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative - // or non-commutative operations - if(!tsk.get_reductions().empty()) { - assert(std::count_if(current_batch.begin(), current_batch.end(), [](const command* cmd) { return utils::isa(cmd); }) <= 1); - } - - // If a new epoch was completed in the CDAG before the current task, we can erase all tracking information from earlier commands. - // After the epoch (or horizon) command has been executed, the scheduler will then delete all obsolete commands from the CDAG. - if(epoch_to_prune_before != nullptr) { - std::erase_if(m_command_buffer_reads, [=](const auto& cid_reads) { return cid_reads.first < epoch_to_prune_before->get_id(); }); - } - - // Check that all commands have been recorded - if(is_recording()) { - assert(std::all_of(current_batch.begin(), current_batch.end(), [this](const command* cmd) { - return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(), - [cmd](const std::unique_ptr& rec) { return rec->id == cmd->get_id(); }); - })); - } +bool command_graph_generator::is_generator_kernel(const task& tsk) const { + if (tsk.get_type() != task_type::device_compute) return false; + + // Check for split hints + if (tsk.get_hint() != nullptr || + tsk.get_hint() != nullptr) { + return false; + } + + const auto& bam = tsk.get_buffer_access_map(); + if (bam.get_num_accesses() != 1) return false; + + const auto [bid, mode] = bam.get_nth_access(0); + return mode == access_mode::discard_write + && bam.get_task_produced_region(bid) == box(subrange({}, tsk.get_global_size())); +} - assert(is_topologically_sorted(current_batch.begin(), current_batch.end())); - return current_batch; +std::vector command_graph_generator::build_task(const task& tsk) { + const auto epoch_to_prune_before = m_epoch_for_new_commands; + batch current_batch; + + switch(tsk.get_type()) { + case task_type::epoch: generate_epoch_command(current_batch, tsk); break; + case task_type::horizon: generate_horizon_command(current_batch, tsk); break; + case task_type::device_compute: + case task_type::host_compute: + case task_type::master_node: + case task_type::collective: + case task_type::fence: generate_distributed_commands(current_batch, tsk); break; + default: throw std::runtime_error("Task type NYI"); + } + + // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node: + // - Per-node reduction intermediate results are stored with fixed access to a single backing buffer, + // so multiple chunks on the same node will race on this buffer access + // - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish + // more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative + // or non-commutative operations + if(!tsk.get_reductions().empty()) { + assert(std::count_if(current_batch.begin(), current_batch.end(), [](const command* cmd) { return utils::isa(cmd); }) <= 1); + } + + // If a new epoch was completed in the CDAG before the current task, we can erase all tracking information from earlier commands. + // After the epoch (or horizon) command has been executed, the scheduler will then delete all obsolete commands from the CDAG. + if(epoch_to_prune_before != nullptr) { + std::erase_if(m_command_buffer_reads, [=](const auto& cid_reads) { return cid_reads.first < epoch_to_prune_before->get_id(); }); + } + + // Check that all commands have been recorded + if(is_recording()) { + assert(std::all_of(current_batch.begin(), current_batch.end(), [this](const command* cmd) { + return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(), + [cmd](const std::unique_ptr& rec) { return rec->id == cmd->get_id(); }); + })); + } + + assert(is_topologically_sorted(current_batch.begin(), current_batch.end())); + return current_batch; } void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const { @@ -455,7 +472,33 @@ void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, } void command_graph_generator::generate_distributed_commands(batch& current_batch, const task& tsk) { - const auto chunks = split_task_and_assign_chunks(tsk); + if (is_generator_kernel(tsk)) { + const auto [bid, _] = tsk.get_buffer_access_map().get_nth_access(0); + auto& buffer = m_buffers.at(bid); + + // Execute the generator kernel for the full range on each node + for (node_id nid = 0; nid < m_num_nodes; ++nid) { + const auto full_chunk = chunk<3>{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; + auto* gen_cmd = create_command(current_batch, &tsk, + full_chunk, false, + [&](const auto& record_debug_info) { + record_debug_info(tsk, [this](const buffer_id bid) { + return m_buffers.at(bid).debug_name; + }); + }); + + // Update buffer state for each node + if (nid == m_local_nid) { + buffer.local_last_writer.update_region(box<3>(subrange({}, tsk.get_global_size())), gen_cmd); + buffer.initialized_region = box<3>(subrange({}, tsk.get_global_size())); + } + } + + buffer.generator_task = nullptr; // Clear the generator task after it's been executed + return; // Exit early for generator kernels + } + + const auto chunks = split_task_and_assign_chunks(tsk); const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks); // Check for and report overlapping writes between local chunks, and between local and remote chunks. @@ -475,7 +518,7 @@ void command_graph_generator::generate_distributed_commands(batch& current_batch std::unordered_map> per_buffer_local_writes; // Create command for each local chunk and resolve local data dependencies. - for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) { + for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) { command* cmd = nullptr; if(tsk.get_type() == task_type::fence) { cmd = create_command(current_batch, &tsk, @@ -504,14 +547,38 @@ void command_graph_generator::generate_distributed_commands(batch& current_batch } } - for(const auto& [bid, consumed, produced] : requirements) { - auto& buffer = m_buffers.at(bid); - - // Process consuming accesses first, so we don't add dependencies onto our own writes + for(const auto& [bid, consumed, produced] : requirements) { + auto& buffer = m_buffers.at(bid); + + // Process consuming accesses first, so we don't add dependencies onto our own writes + if(!consumed.empty()) { + for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) { + if(box.empty()) continue; + if (!wcs.is_fresh()) { + utils::report_error(m_policy.uninitialized_read_error, + "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", + cmd->get_id(), m_local_nid, + a_chunk.chnk, + print_task_debug_label(tsk), print_buffer_debug_label(bid), box); + } + add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow); + } + + // Store the read access for determining anti-dependencies later on + m_command_buffer_reads[cmd->get_id()].emplace(bid, consumed); + } + + // Process consuming accesses first, so we don't add dependencies onto our own writes if(!consumed.empty()) { for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) { if(box.empty()) continue; - assert(wcs.is_fresh() && "Unresolved remote data dependency"); + if (!wcs.is_fresh()) { + utils::report_error(m_policy.uninitialized_read_error, + "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", + cmd->get_id(), m_local_nid, + a_chunk.chnk, + print_task_debug_label(tsk), print_buffer_debug_label(bid), box); + } add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow); } @@ -646,54 +713,54 @@ void command_graph_generator::reduce_execution_front_to(command* const new_front } void command_graph_generator::generate_epoch_command(batch& current_batch, const task& tsk) { - assert(tsk.get_type() == task_type::epoch); - m_cdag->begin_epoch(tsk.get_id()); - auto* const epoch = create_command( - current_batch, &tsk, tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); - set_epoch_for_new_commands(epoch); - m_current_horizon = no_command; - // Make the epoch depend on the previous execution front - reduce_execution_front_to(epoch); + assert(tsk.get_type() == task_type::epoch); + m_cdag->begin_epoch(tsk.get_id()); + auto* const epoch = create_command( + current_batch, &tsk, tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); + set_epoch_for_new_commands(epoch); + m_current_horizon = no_command; + // Make the epoch depend on the previous execution front + reduce_execution_front_to(epoch); } void command_graph_generator::generate_horizon_command(batch& current_batch, const task& tsk) { - assert(tsk.get_type() == task_type::horizon); - m_cdag->begin_epoch(tsk.get_id()); - auto* const new_horizon = - create_command(current_batch, &tsk, std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); - - if(m_current_horizon != nullptr) { - // Apply the previous horizon - set_epoch_for_new_commands(m_current_horizon); - } - m_current_horizon = new_horizon; - - // Make the horizon depend on the previous execution front - reduce_execution_front_to(new_horizon); + assert(tsk.get_type() == task_type::horizon); + m_cdag->begin_epoch(tsk.get_id()); + auto* const new_horizon = + create_command(current_batch, &tsk, std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); + + if(m_current_horizon != nullptr) { + // Apply the previous horizon + set_epoch_for_new_commands(m_current_horizon); + } + m_current_horizon = new_horizon; + + // Make the horizon depend on the previous execution front + reduce_execution_front_to(new_horizon); } void command_graph_generator::generate_epoch_dependencies(command* cmd) { - // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs. - // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency - // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof. - - // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only - // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true - // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes - // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the - // dependency chain from the executor perspective. - - if(const auto deps = cmd->get_dependencies(); - std::none_of(deps.begin(), deps.end(), [](const command::dependency d) { return d.kind == dependency_kind::true_dep; })) { - if(!utils::isa(cmd) || utils::as(cmd)->get_epoch_action() != epoch_action::init) { - assert(cmd != m_epoch_for_new_commands); - add_dependency(cmd, m_epoch_for_new_commands, dependency_kind::true_dep, dependency_origin::last_epoch); - } - } + // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs. + // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency + // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof. + + // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only + // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true + // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes + // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the + // dependency chain from the executor perspective. + + if(const auto deps = cmd->get_dependencies(); + std::none_of(deps.begin(), deps.end(), [](const command::dependency d) { return d.kind == dependency_kind::true_dep; })) { + if(!utils::isa(cmd) || utils::as(cmd)->get_epoch_action() != epoch_action::init) { + assert(cmd != m_epoch_for_new_commands); + add_dependency(cmd, m_epoch_for_new_commands, dependency_kind::true_dep, dependency_origin::last_epoch); + } + } } std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const { - return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name); + return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name); } } // namespace celerity::detail diff --git a/test/command_graph_general_tests.cc b/test/command_graph_general_tests.cc index b51ac7a77..7b724dab1 100644 --- a/test/command_graph_general_tests.cc +++ b/test/command_graph_general_tests.cc @@ -437,3 +437,11 @@ TEST_CASE("command_graph_generator throws in tests if it detects overlapping wri "range mapper for this write access or constrain the split via experimental::constrain_split to make the access non-overlapping."); } } + +TEST_CASE("results form generator kernels are never communicated between nodes", "[command_graph_generator][owner-computes]") { + cdag_test_context cctx(4); // 4 nodes, so we can get a true 2D work assignment for the timestep kernel + auto buf = cctx.create_buffer<2>({256, 256}); // a 256x256 buffer + cctx.device_compute(buf.get_range()).discard_write(buf, celerity::access::one_to_one()).name("init").submit(); + cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 0").submit(); + cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 1").submit(); +} From 77f41d7e8164eb8d49d5f2819157c3fa2e72cd4c Mon Sep 17 00:00:00 2001 From: Ali Ibtehaj Date: Sun, 2 Feb 2025 18:37:09 +0000 Subject: [PATCH 2/8] remove extra --- src/command_graph_generator.cc | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/src/command_graph_generator.cc b/src/command_graph_generator.cc index ecd0ea9a0..23a7d28b7 100644 --- a/src/command_graph_generator.cc +++ b/src/command_graph_generator.cc @@ -549,24 +549,6 @@ void command_graph_generator::generate_distributed_commands(batch& current_batch for(const auto& [bid, consumed, produced] : requirements) { auto& buffer = m_buffers.at(bid); - - // Process consuming accesses first, so we don't add dependencies onto our own writes - if(!consumed.empty()) { - for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) { - if(box.empty()) continue; - if (!wcs.is_fresh()) { - utils::report_error(m_policy.uninitialized_read_error, - "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", - cmd->get_id(), m_local_nid, - a_chunk.chnk, - print_task_debug_label(tsk), print_buffer_debug_label(bid), box); - } - add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow); - } - - // Store the read access for determining anti-dependencies later on - m_command_buffer_reads[cmd->get_id()].emplace(bid, consumed); - } // Process consuming accesses first, so we don't add dependencies onto our own writes if(!consumed.empty()) { From 387abebdf90957a39994ca5f9f27d1bf2e0110b8 Mon Sep 17 00:00:00 2001 From: Ali Ibtehaj Date: Thu, 6 Feb 2025 10:00:51 +0000 Subject: [PATCH 3/8] Applied clang-format on command_graph_generator.cc(1) --- src/command_graph_generator.cc | 283 ++++++++++++++++----------------- 1 file changed, 136 insertions(+), 147 deletions(-) diff --git a/src/command_graph_generator.cc b/src/command_graph_generator.cc index 23a7d28b7..f8caa9541 100644 --- a/src/command_graph_generator.cc +++ b/src/command_graph_generator.cc @@ -28,38 +28,38 @@ namespace celerity::detail { command_graph_generator::command_graph_generator( const size_t num_nodes, const node_id local_nid, command_graph& cdag, detail::command_recorder* const recorder, const policy_set& policy) : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(&cdag), m_recorder(recorder) { - if(m_num_nodes > max_num_nodes) { - throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes)); - } + if(m_num_nodes > max_num_nodes) { + throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes)); + } } void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) { - assert(m_epoch_for_new_commands != nullptr); - assert(!m_buffers.contains(bid)); - // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands). - // This is required when tasks access host-initialized or uninitialized buffers. - auto& buffer = m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, m_epoch_for_new_commands, node_bitset().set())).first->second; - if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = box(subrange({}, range)); } + assert(m_epoch_for_new_commands != nullptr); + assert(!m_buffers.contains(bid)); + // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands). + // This is required when tasks access host-initialized or uninitialized buffers. + auto& buffer = m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, m_epoch_for_new_commands, node_bitset().set())).first->second; + if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = box(subrange({}, range)); } } void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) { - m_buffers.at(bid).debug_name = debug_name; + m_buffers.at(bid).debug_name = debug_name; } void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) { - assert(m_buffers.contains(bid)); - m_buffers.erase(bid); + assert(m_buffers.contains(bid)); + m_buffers.erase(bid); } void command_graph_generator::notify_host_object_created(const host_object_id hoid) { - assert(m_epoch_for_new_commands != nullptr); - assert(!m_host_objects.contains(hoid)); - m_host_objects.emplace(hoid, m_epoch_for_new_commands); + assert(m_epoch_for_new_commands != nullptr); + assert(!m_host_objects.contains(hoid)); + m_host_objects.emplace(hoid, m_epoch_for_new_commands); } void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) { - assert(m_host_objects.contains(hoid)); - m_host_objects.erase(hoid); + assert(m_host_objects.contains(hoid)); + m_host_objects.erase(hoid); } /// Returns whether an iterator range of commands is topologically sorted, i.e. sequential execution would satisfy all internal dependencies. @@ -74,63 +74,59 @@ bool is_topologically_sorted(Iterator begin, Iterator end) { } bool command_graph_generator::is_generator_kernel(const task& tsk) const { - if (tsk.get_type() != task_type::device_compute) return false; - - // Check for split hints - if (tsk.get_hint() != nullptr || - tsk.get_hint() != nullptr) { - return false; - } - - const auto& bam = tsk.get_buffer_access_map(); - if (bam.get_num_accesses() != 1) return false; - - const auto [bid, mode] = bam.get_nth_access(0); - return mode == access_mode::discard_write - && bam.get_task_produced_region(bid) == box(subrange({}, tsk.get_global_size())); + if(tsk.get_type() != task_type::device_compute) return false; + + // Check for split hints + if(tsk.get_hint() != nullptr || tsk.get_hint() != nullptr) { return false; } + + const auto& bam = tsk.get_buffer_access_map(); + if(bam.get_num_accesses() != 1) return false; + + const auto [bid, mode] = bam.get_nth_access(0); + return mode == access_mode::discard_write && bam.get_task_produced_region(bid) == box(subrange({}, tsk.get_global_size())); } std::vector command_graph_generator::build_task(const task& tsk) { - const auto epoch_to_prune_before = m_epoch_for_new_commands; - batch current_batch; - - switch(tsk.get_type()) { - case task_type::epoch: generate_epoch_command(current_batch, tsk); break; - case task_type::horizon: generate_horizon_command(current_batch, tsk); break; - case task_type::device_compute: - case task_type::host_compute: - case task_type::master_node: - case task_type::collective: - case task_type::fence: generate_distributed_commands(current_batch, tsk); break; - default: throw std::runtime_error("Task type NYI"); - } - - // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node: - // - Per-node reduction intermediate results are stored with fixed access to a single backing buffer, - // so multiple chunks on the same node will race on this buffer access - // - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish - // more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative - // or non-commutative operations - if(!tsk.get_reductions().empty()) { - assert(std::count_if(current_batch.begin(), current_batch.end(), [](const command* cmd) { return utils::isa(cmd); }) <= 1); - } - - // If a new epoch was completed in the CDAG before the current task, we can erase all tracking information from earlier commands. - // After the epoch (or horizon) command has been executed, the scheduler will then delete all obsolete commands from the CDAG. - if(epoch_to_prune_before != nullptr) { - std::erase_if(m_command_buffer_reads, [=](const auto& cid_reads) { return cid_reads.first < epoch_to_prune_before->get_id(); }); - } - - // Check that all commands have been recorded - if(is_recording()) { - assert(std::all_of(current_batch.begin(), current_batch.end(), [this](const command* cmd) { - return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(), - [cmd](const std::unique_ptr& rec) { return rec->id == cmd->get_id(); }); - })); - } - - assert(is_topologically_sorted(current_batch.begin(), current_batch.end())); - return current_batch; + const auto epoch_to_prune_before = m_epoch_for_new_commands; + batch current_batch; + + switch(tsk.get_type()) { + case task_type::epoch: generate_epoch_command(current_batch, tsk); break; + case task_type::horizon: generate_horizon_command(current_batch, tsk); break; + case task_type::device_compute: + case task_type::host_compute: + case task_type::master_node: + case task_type::collective: + case task_type::fence: generate_distributed_commands(current_batch, tsk); break; + default: throw std::runtime_error("Task type NYI"); + } + + // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node: + // - Per-node reduction intermediate results are stored with fixed access to a single backing buffer, + // so multiple chunks on the same node will race on this buffer access + // - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish + // more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative + // or non-commutative operations + if(!tsk.get_reductions().empty()) { + assert(std::count_if(current_batch.begin(), current_batch.end(), [](const command* cmd) { return utils::isa(cmd); }) <= 1); + } + + // If a new epoch was completed in the CDAG before the current task, we can erase all tracking information from earlier commands. + // After the epoch (or horizon) command has been executed, the scheduler will then delete all obsolete commands from the CDAG. + if(epoch_to_prune_before != nullptr) { + std::erase_if(m_command_buffer_reads, [=](const auto& cid_reads) { return cid_reads.first < epoch_to_prune_before->get_id(); }); + } + + // Check that all commands have been recorded + if(is_recording()) { + assert(std::all_of(current_batch.begin(), current_batch.end(), [this](const command* cmd) { + return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(), + [cmd](const std::unique_ptr& rec) { return rec->id == cmd->get_id(); }); + })); + } + + assert(is_topologically_sorted(current_batch.begin(), current_batch.end())); + return current_batch; } void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const { @@ -472,33 +468,28 @@ void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, } void command_graph_generator::generate_distributed_commands(batch& current_batch, const task& tsk) { - if (is_generator_kernel(tsk)) { - const auto [bid, _] = tsk.get_buffer_access_map().get_nth_access(0); - auto& buffer = m_buffers.at(bid); - - // Execute the generator kernel for the full range on each node - for (node_id nid = 0; nid < m_num_nodes; ++nid) { - const auto full_chunk = chunk<3>{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; - auto* gen_cmd = create_command(current_batch, &tsk, - full_chunk, false, - [&](const auto& record_debug_info) { - record_debug_info(tsk, [this](const buffer_id bid) { - return m_buffers.at(bid).debug_name; - }); - }); - - // Update buffer state for each node - if (nid == m_local_nid) { - buffer.local_last_writer.update_region(box<3>(subrange({}, tsk.get_global_size())), gen_cmd); - buffer.initialized_region = box<3>(subrange({}, tsk.get_global_size())); - } - } - - buffer.generator_task = nullptr; // Clear the generator task after it's been executed - return; // Exit early for generator kernels - } - - const auto chunks = split_task_and_assign_chunks(tsk); + if(is_generator_kernel(tsk)) { + const auto [bid, _] = tsk.get_buffer_access_map().get_nth_access(0); + auto& buffer = m_buffers.at(bid); + + // Execute the generator kernel for the full range on each node + for(node_id nid = 0; nid < m_num_nodes; ++nid) { + const auto full_chunk = chunk<3>{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; + auto* gen_cmd = create_command(current_batch, &tsk, full_chunk, false, + [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); }); + + // Update buffer state for each node + if(nid == m_local_nid) { + buffer.local_last_writer.update_region(box<3>(subrange({}, tsk.get_global_size())), gen_cmd); + buffer.initialized_region = box<3>(subrange({}, tsk.get_global_size())); + } + } + + buffer.generator_task = nullptr; // Clear the generator task after it's been executed + return; // Exit early for generator kernels + } + + const auto chunks = split_task_and_assign_chunks(tsk); const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks); // Check for and report overlapping writes between local chunks, and between local and remote chunks. @@ -518,7 +509,7 @@ void command_graph_generator::generate_distributed_commands(batch& current_batch std::unordered_map> per_buffer_local_writes; // Create command for each local chunk and resolve local data dependencies. - for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) { + for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) { command* cmd = nullptr; if(tsk.get_type() == task_type::fence) { cmd = create_command(current_batch, &tsk, @@ -547,19 +538,17 @@ void command_graph_generator::generate_distributed_commands(batch& current_batch } } - for(const auto& [bid, consumed, produced] : requirements) { - auto& buffer = m_buffers.at(bid); - - // Process consuming accesses first, so we don't add dependencies onto our own writes + for(const auto& [bid, consumed, produced] : requirements) { + auto& buffer = m_buffers.at(bid); + + // Process consuming accesses first, so we don't add dependencies onto our own writes if(!consumed.empty()) { for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) { if(box.empty()) continue; - if (!wcs.is_fresh()) { + if(!wcs.is_fresh()) { utils::report_error(m_policy.uninitialized_read_error, - "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", - cmd->get_id(), m_local_nid, - a_chunk.chnk, - print_task_debug_label(tsk), print_buffer_debug_label(bid), box); + "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_id(), m_local_nid, + a_chunk.chnk, print_task_debug_label(tsk), print_buffer_debug_label(bid), box); } add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow); } @@ -695,54 +684,54 @@ void command_graph_generator::reduce_execution_front_to(command* const new_front } void command_graph_generator::generate_epoch_command(batch& current_batch, const task& tsk) { - assert(tsk.get_type() == task_type::epoch); - m_cdag->begin_epoch(tsk.get_id()); - auto* const epoch = create_command( - current_batch, &tsk, tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); - set_epoch_for_new_commands(epoch); - m_current_horizon = no_command; - // Make the epoch depend on the previous execution front - reduce_execution_front_to(epoch); + assert(tsk.get_type() == task_type::epoch); + m_cdag->begin_epoch(tsk.get_id()); + auto* const epoch = create_command( + current_batch, &tsk, tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); + set_epoch_for_new_commands(epoch); + m_current_horizon = no_command; + // Make the epoch depend on the previous execution front + reduce_execution_front_to(epoch); } void command_graph_generator::generate_horizon_command(batch& current_batch, const task& tsk) { - assert(tsk.get_type() == task_type::horizon); - m_cdag->begin_epoch(tsk.get_id()); - auto* const new_horizon = - create_command(current_batch, &tsk, std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); - - if(m_current_horizon != nullptr) { - // Apply the previous horizon - set_epoch_for_new_commands(m_current_horizon); - } - m_current_horizon = new_horizon; - - // Make the horizon depend on the previous execution front - reduce_execution_front_to(new_horizon); + assert(tsk.get_type() == task_type::horizon); + m_cdag->begin_epoch(tsk.get_id()); + auto* const new_horizon = + create_command(current_batch, &tsk, std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); }); + + if(m_current_horizon != nullptr) { + // Apply the previous horizon + set_epoch_for_new_commands(m_current_horizon); + } + m_current_horizon = new_horizon; + + // Make the horizon depend on the previous execution front + reduce_execution_front_to(new_horizon); } void command_graph_generator::generate_epoch_dependencies(command* cmd) { - // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs. - // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency - // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof. - - // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only - // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true - // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes - // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the - // dependency chain from the executor perspective. - - if(const auto deps = cmd->get_dependencies(); - std::none_of(deps.begin(), deps.end(), [](const command::dependency d) { return d.kind == dependency_kind::true_dep; })) { - if(!utils::isa(cmd) || utils::as(cmd)->get_epoch_action() != epoch_action::init) { - assert(cmd != m_epoch_for_new_commands); - add_dependency(cmd, m_epoch_for_new_commands, dependency_kind::true_dep, dependency_origin::last_epoch); - } - } + // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs. + // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency + // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof. + + // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only + // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true + // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes + // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the + // dependency chain from the executor perspective. + + if(const auto deps = cmd->get_dependencies(); + std::none_of(deps.begin(), deps.end(), [](const command::dependency d) { return d.kind == dependency_kind::true_dep; })) { + if(!utils::isa(cmd) || utils::as(cmd)->get_epoch_action() != epoch_action::init) { + assert(cmd != m_epoch_for_new_commands); + add_dependency(cmd, m_epoch_for_new_commands, dependency_kind::true_dep, dependency_origin::last_epoch); + } + } } std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const { - return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name); + return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name); } } // namespace celerity::detail From 3a742d742124e6a70233b78f03426a6aa8e5fa0f Mon Sep 17 00:00:00 2001 From: Ali Ibtehaj Date: Thu, 6 Feb 2025 10:03:56 +0000 Subject: [PATCH 4/8] Applied clang-format on all --- include/command_graph_generator.h | 2 +- test/command_graph_general_tests.cc | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/command_graph_generator.h b/include/command_graph_generator.h index 8be887f37..f17d31092 100644 --- a/include/command_graph_generator.h +++ b/include/command_graph_generator.h @@ -113,7 +113,7 @@ class command_graph_generator { }; public: - bool is_generator_kernel(const task& tsk) const; + bool is_generator_kernel(const task& tsk) const; struct policy_set { error_policy uninitialized_read_error = error_policy::panic; error_policy overlapping_write_error = error_policy::panic; diff --git a/test/command_graph_general_tests.cc b/test/command_graph_general_tests.cc index 7b724dab1..3ad6769f1 100644 --- a/test/command_graph_general_tests.cc +++ b/test/command_graph_general_tests.cc @@ -439,9 +439,9 @@ TEST_CASE("command_graph_generator throws in tests if it detects overlapping wri } TEST_CASE("results form generator kernels are never communicated between nodes", "[command_graph_generator][owner-computes]") { - cdag_test_context cctx(4); // 4 nodes, so we can get a true 2D work assignment for the timestep kernel - auto buf = cctx.create_buffer<2>({256, 256}); // a 256x256 buffer - cctx.device_compute(buf.get_range()).discard_write(buf, celerity::access::one_to_one()).name("init").submit(); - cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 0").submit(); - cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 1").submit(); + cdag_test_context cctx(4); // 4 nodes, so we can get a true 2D work assignment for the timestep kernel + auto buf = cctx.create_buffer<2>({256, 256}); // a 256x256 buffer + cctx.device_compute(buf.get_range()).discard_write(buf, celerity::access::one_to_one()).name("init").submit(); + cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 0").submit(); + cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 1").submit(); } From b3396de1a053b6dcb2d77b411593950f4d306f53 Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Thu, 6 Feb 2025 12:10:45 +0100 Subject: [PATCH 5/8] Turn communication-avoidance scenario into proper test case --- test/command_graph_general_tests.cc | 57 +++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/test/command_graph_general_tests.cc b/test/command_graph_general_tests.cc index 3ad6769f1..06fcbee25 100644 --- a/test/command_graph_general_tests.cc +++ b/test/command_graph_general_tests.cc @@ -439,9 +439,58 @@ TEST_CASE("command_graph_generator throws in tests if it detects overlapping wri } TEST_CASE("results form generator kernels are never communicated between nodes", "[command_graph_generator][owner-computes]") { - cdag_test_context cctx(4); // 4 nodes, so we can get a true 2D work assignment for the timestep kernel + const size_t num_nodes = 4; + + cdag_test_context cctx(num_nodes); // 4 nodes, so we can get a true 2D work assignment for the timestep kernel auto buf = cctx.create_buffer<2>({256, 256}); // a 256x256 buffer - cctx.device_compute(buf.get_range()).discard_write(buf, celerity::access::one_to_one()).name("init").submit(); - cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 0").submit(); - cctx.device_compute(buf.get_range()).hint(experimental::hints::split_2d()).read_write(buf, celerity::access::one_to_one()).name("timestep 1").submit(); + + const auto tid_init = cctx.device_compute(buf.get_range()) // + .discard_write(buf, celerity::access::one_to_one()) + .name("init") + .submit(); + const auto tid_ts0 = cctx.device_compute(buf.get_range()) // + .hint(experimental::hints::split_2d()) + .read_write(buf, celerity::access::one_to_one()) + .name("timestep 0") + .submit(); + const auto tid_ts1 = cctx.device_compute(buf.get_range()) // + .hint(experimental::hints::split_2d()) + .read_write(buf, celerity::access::one_to_one()) + .name("timestep 1") + .submit(); + + CHECK(cctx.query().count_per_node() == 3); // one for each task above + CHECK(cctx.query().total_count() == 0); + CHECK(cctx.query().total_count() == 0); + + const auto inits = cctx.query(tid_init); + const auto ts0s = cctx.query(tid_ts0); + const auto ts1s = cctx.query(tid_ts1); + CHECK(inits.count_per_node() == 1); + CHECK(ts0s.count_per_node() == 1); + CHECK(ts1s.count_per_node() == 1); + + for(node_id nid = 0; nid < num_nodes; ++nid) { + const auto n_init = inits.on(nid); + REQUIRE(n_init->accesses.size() == 1); + + const auto generate = n_init->accesses.front(); + CHECK(generate.bid == buf.get_id()); + CHECK(generate.mode == access_mode::discard_write); + + const auto n_ts0 = ts0s.on(nid); + CHECK(n_ts0.predecessors().contains(n_init)); + REQUIRE(n_ts0->accesses.size() == 1); + + const auto consume = n_ts0->accesses.front(); + CHECK(consume.bid == buf.get_id()); + CHECK(consume.mode == access_mode::read_write); + + // generator kernel "init" has generated exactly the buffer subrange that is consumed by "timestep 0" + CHECK(consume.req == generate.req); + + const auto n_ts1 = ts1s.on(nid); + CHECK(n_ts1.predecessors().contains(n_ts0)); + CHECK_FALSE(n_ts1.predecessors().contains(n_init)); + } } From e80e1acca52d514959feaf89ac95854ce7262ed1 Mon Sep 17 00:00:00 2001 From: Ali Ibtehaj Date: Fri, 21 Feb 2025 17:16:24 +0000 Subject: [PATCH 6/8] virtual function in range mapper for ono_to_one, CDAG output fixed, Test case passed --- include/range_mapper.h | 15 ++++ include/task.h | 9 +++ src/command_graph_generator.cc | 133 +++++++++++++++++++++++---------- 3 files changed, 116 insertions(+), 41 deletions(-) diff --git a/include/range_mapper.h b/include/range_mapper.h index 1faa301cc..8b3959ff0 100644 --- a/include/range_mapper.h +++ b/include/range_mapper.h @@ -13,6 +13,12 @@ namespace celerity { + +// Forward-declaration so we can detect whether the functor is one_to_one +namespace access { + struct one_to_one; +} + namespace detail { template @@ -85,6 +91,8 @@ namespace detail { virtual region<3> map_3(const chunk<3>& chnk) const = 0; virtual ~range_mapper_base() = default; + + virtual bool is_one_to_one() const { return false; } }; template @@ -107,6 +115,13 @@ namespace detail { region<3> map_3(const chunk<2>& chnk) const override { return map<3>(chnk); } region<3> map_3(const chunk<3>& chnk) const override { return map<3>(chnk); } + // Override the s_one_to_one() to detect if the functor is specifically celerity::access::one_to_one: + bool is_one_to_one() const override { + // If the Functor is celerity::access::one_to_one, return true + if constexpr(std::is_same_v) { return true; } + return false; + } + private: Functor m_rmfn; range m_buffer_size; diff --git a/include/task.h b/include/task.h index ef84a6cbd..046560ddd 100644 --- a/include/task.h +++ b/include/task.h @@ -64,6 +64,15 @@ namespace detail { /// Returns a set of bounding boxes, one for each accessed region, that must be allocated contiguously. box_vector<3> compute_required_contiguous_boxes(const buffer_id bid, const box<3>& execution_range) const; + // Retrieves the range mapper associated with a specific buffer ID, or nullptr if not found. + const range_mapper_base* get_range_mapper(buffer_id search_bid) const { + for(const auto& ba : m_accesses) { + if(ba.bid == search_bid) { return ba.range_mapper.get(); } + } + return nullptr; // Not found + } + + private: std::vector m_accesses; std::unordered_set m_accessed_buffers; ///< Cached set of buffer ids found in m_accesses diff --git a/src/command_graph_generator.cc b/src/command_graph_generator.cc index f8caa9541..bbb99ca00 100644 --- a/src/command_graph_generator.cc +++ b/src/command_graph_generator.cc @@ -76,14 +76,26 @@ bool is_topologically_sorted(Iterator begin, Iterator end) { bool command_graph_generator::is_generator_kernel(const task& tsk) const { if(tsk.get_type() != task_type::device_compute) return false; - // Check for split hints + // Must not have a hint that modifies splitting: if(tsk.get_hint() != nullptr || tsk.get_hint() != nullptr) { return false; } + // Must have exactly one buffer access const auto& bam = tsk.get_buffer_access_map(); if(bam.get_num_accesses() != 1) return false; + // That single access must be discard_write const auto [bid, mode] = bam.get_nth_access(0); - return mode == access_mode::discard_write && bam.get_task_produced_region(bid) == box(subrange({}, tsk.get_global_size())); + if(mode != access_mode::discard_write) return false; + + // Must produce exactly the entire buffer + const auto full_box = box(subrange({}, tsk.get_global_size())); + if(bam.get_task_produced_region(bid) != full_box) return false; + + // Confirm the *range mapper* is truly one_to_one: + const auto rm = bam.get_range_mapper(bid); + if(rm == nullptr || !rm->is_one_to_one()) { return false; } + + return true; } std::vector command_graph_generator::build_task(const task& tsk) { @@ -469,24 +481,44 @@ void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, void command_graph_generator::generate_distributed_commands(batch& current_batch, const task& tsk) { if(is_generator_kernel(tsk)) { - const auto [bid, _] = tsk.get_buffer_access_map().get_nth_access(0); - auto& buffer = m_buffers.at(bid); - - // Execute the generator kernel for the full range on each node - for(node_id nid = 0; nid < m_num_nodes; ++nid) { - const auto full_chunk = chunk<3>{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; - auto* gen_cmd = create_command(current_batch, &tsk, full_chunk, false, - [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); }); - - // Update buffer state for each node - if(nid == m_local_nid) { - buffer.local_last_writer.update_region(box<3>(subrange({}, tsk.get_global_size())), gen_cmd); - buffer.initialized_region = box<3>(subrange({}, tsk.get_global_size())); - } + // Identify the single discard_write buffer + const auto [gen_bid, gen_mode] = tsk.get_buffer_access_map().get_nth_access(0); + + // 2D-split the full iteration space + const chunk<3> full{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; + auto splitted = split_2d(full, tsk.get_granularity(), m_num_nodes); + + // Assign each chunk to a node in the same way as split_task_and_assign_chunks: + std::vector forced_chunks; + forced_chunks.reserve(splitted.size()); + const auto chunks_per_node = std::max(1, splitted.size() / m_num_nodes); + for(size_t i = 0; i < splitted.size(); ++i) { + const node_id nid = (i / chunks_per_node) % m_num_nodes; + forced_chunks.push_back({nid, splitted[i]}); } - buffer.generator_task = nullptr; // Clear the generator task after it's been executed - return; // Exit early for generator kernels + // For each assigned_chunk, create a command if it belongs to our node + auto& bstate = m_buffers.at(gen_bid); + for(const auto& a_chunk : forced_chunks) { + if(a_chunk.executed_on != m_local_nid) continue; // skip remote nodes + + // Create local execution command that covers a_chunk.chnk + auto* cmd = create_command(current_batch, &tsk, subrange<3>{a_chunk.chnk}, false, [&](auto&& record_debug_info) { + // record debug + record_debug_info(tsk, [this](buffer_id b) { return m_buffers.at(b).debug_name; }); + }); + + // Mark that region as freshly written + const auto& off = a_chunk.chnk.offset; + const auto& rng = a_chunk.chnk.range; + box<3> bx(off, off + rng); + region_builder<3> rb; + rb.add(bx); + auto region_covered = std::move(rb).into_region(); + bstate.local_last_writer.update_region(region_covered, cmd); + bstate.initialized_region = region_union(bstate.initialized_region, region_covered); + } + return; } const auto chunks = split_task_and_assign_chunks(tsk); @@ -539,46 +571,65 @@ void command_graph_generator::generate_distributed_commands(batch& current_batch } for(const auto& [bid, consumed, produced] : requirements) { - auto& buffer = m_buffers.at(bid); + auto& bstate = m_buffers.at(bid); + + // PARTIAL GENERATION: if this buffer belongs to a generator kernel and we read data + if(bstate.generator_task != nullptr && !consumed.empty()) { + // Which region is still uninitialized? + auto missing = region_difference(consumed, bstate.initialized_region); + if(!missing.empty()) { + auto missing_box = bounding_box(missing); + + // Create partial execution command for the “init” kernel + auto* gen_cmd = + create_command(current_batch, bstate.generator_task, subrange<3>{missing_box.get_min(), missing_box.get_range()}, + /* is_reduction_init */ false, [&](auto&& record_debug_info) { + // debug info optional + }); + + // Mark that region as locally fresh + bstate.local_last_writer.update_region(missing, gen_cmd); + bstate.initialized_region = region_union(bstate.initialized_region, missing); + + // If you never expect more partial subranges, optionally: + // bstate.generator_task = nullptr; + } + } - // Process consuming accesses first, so we don't add dependencies onto our own writes if(!consumed.empty()) { - for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) { - if(box.empty()) continue; - if(!wcs.is_fresh()) { - utils::report_error(m_policy.uninitialized_read_error, - "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_id(), m_local_nid, - a_chunk.chnk, print_task_debug_label(tsk), print_buffer_debug_label(bid), box); + for(const auto& [subbox, wcs] : bstate.local_last_writer.get_region_values(consumed)) { + if(!subbox.empty() && !wcs.is_fresh()) { + utils::report_error(m_policy.uninitialized_read_error, "Command C{} on N{} reads uninitialized region {} of buffer {}", cmd->get_id(), + m_local_nid, subbox, print_buffer_debug_label(bid)); } add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow); } - - // Store the read access for determining anti-dependencies later on - m_command_buffer_reads[cmd->get_id()].emplace(bid, consumed); + // record these reads + m_command_buffer_reads[cmd->get_id()][bid] = region_union(m_command_buffer_reads[cmd->get_id()][bid], consumed); } if(!produced.empty()) { - generate_anti_dependencies(tsk, bid, buffer.local_last_writer, produced, cmd); + generate_anti_dependencies(tsk, bid, bstate.local_last_writer, produced, cmd); // Update last writer - buffer.local_last_writer.update_region(produced, cmd); - buffer.replicated_regions.update_region(produced, node_bitset{}); + bstate.local_last_writer.update_region(produced, cmd); + bstate.replicated_regions.update_region(produced, node_bitset{}); // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction. - if(buffer.pending_reduction.has_value()) { - m_completed_reductions.push_back(buffer.pending_reduction->rid); - buffer.pending_reduction = std::nullopt; + if(bstate.pending_reduction.has_value()) { + m_completed_reductions.push_back(bstate.pending_reduction->rid); + bstate.pending_reduction.reset(); } - per_buffer_local_writes.emplace(bid, produced); + // Track local writes + per_buffer_local_writes[bid] = region_union(per_buffer_local_writes[bid], produced); } if(m_policy.uninitialized_read_error != error_policy::ignore) { - if(const auto uninitialized_reads = region_difference(consumed, buffer.initialized_region); !uninitialized_reads.empty()) { - utils::report_error(m_policy.uninitialized_read_error, - "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_id(), m_local_nid, - box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid), - uninitialized_reads); + auto uninit = region_difference(consumed, bstate.initialized_region); + if(!uninit.empty()) { + utils::report_error(m_policy.uninitialized_read_error, "Command C{} on N{} reads region {} of buffer {} that was never written", + cmd->get_id(), m_local_nid, uninit, print_buffer_debug_label(bid)); } } } From 66218be8fa9a8342be9378be2a4fdaa816e5e400 Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Fri, 21 Feb 2025 18:31:59 +0100 Subject: [PATCH 7/8] Test both 1D ad 2D consumer spilt --- test/command_graph_general_tests.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/command_graph_general_tests.cc b/test/command_graph_general_tests.cc index 06fcbee25..d51491119 100644 --- a/test/command_graph_general_tests.cc +++ b/test/command_graph_general_tests.cc @@ -439,8 +439,10 @@ TEST_CASE("command_graph_generator throws in tests if it detects overlapping wri } TEST_CASE("results form generator kernels are never communicated between nodes", "[command_graph_generator][owner-computes]") { - const size_t num_nodes = 4; + const bool split_2d = GENERATE(values({0, 1})); + CAPTURE(split_2d); + const size_t num_nodes = 4; cdag_test_context cctx(num_nodes); // 4 nodes, so we can get a true 2D work assignment for the timestep kernel auto buf = cctx.create_buffer<2>({256, 256}); // a 256x256 buffer @@ -449,12 +451,12 @@ TEST_CASE("results form generator kernels are never communicated between nodes", .name("init") .submit(); const auto tid_ts0 = cctx.device_compute(buf.get_range()) // - .hint(experimental::hints::split_2d()) + .hint_if(split_2d, experimental::hints::split_2d()) .read_write(buf, celerity::access::one_to_one()) .name("timestep 0") .submit(); const auto tid_ts1 = cctx.device_compute(buf.get_range()) // - .hint(experimental::hints::split_2d()) + .hint_if(split_2d, experimental::hints::split_2d()) .read_write(buf, celerity::access::one_to_one()) .name("timestep 1") .submit(); From 6206dc87cb653bdae73b9b5b57fe4f4d8d09862e Mon Sep 17 00:00:00 2001 From: Ali Ibtehaj Date: Tue, 25 Feb 2025 11:25:12 +0000 Subject: [PATCH 8/8] clean code, remove force 2D split --- src/command_graph_generator.cc | 117 +++++++++++---------------------- 1 file changed, 37 insertions(+), 80 deletions(-) diff --git a/src/command_graph_generator.cc b/src/command_graph_generator.cc index bbb99ca00..99257f33b 100644 --- a/src/command_graph_generator.cc +++ b/src/command_graph_generator.cc @@ -480,44 +480,26 @@ void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, } void command_graph_generator::generate_distributed_commands(batch& current_batch, const task& tsk) { + // If it's a generator kernel, we generate commands immediately and skip partial generation altogether. if(is_generator_kernel(tsk)) { - // Identify the single discard_write buffer - const auto [gen_bid, gen_mode] = tsk.get_buffer_access_map().get_nth_access(0); - - // 2D-split the full iteration space - const chunk<3> full{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; - auto splitted = split_2d(full, tsk.get_granularity(), m_num_nodes); - - // Assign each chunk to a node in the same way as split_task_and_assign_chunks: - std::vector forced_chunks; - forced_chunks.reserve(splitted.size()); - const auto chunks_per_node = std::max(1, splitted.size() / m_num_nodes); - for(size_t i = 0; i < splitted.size(); ++i) { - const node_id nid = (i / chunks_per_node) % m_num_nodes; - forced_chunks.push_back({nid, splitted[i]}); - } - - // For each assigned_chunk, create a command if it belongs to our node + // Identify which buffer is discard-written + const auto [gen_bid, _] = tsk.get_buffer_access_map().get_nth_access(0); auto& bstate = m_buffers.at(gen_bid); - for(const auto& a_chunk : forced_chunks) { - if(a_chunk.executed_on != m_local_nid) continue; // skip remote nodes - - // Create local execution command that covers a_chunk.chnk - auto* cmd = create_command(current_batch, &tsk, subrange<3>{a_chunk.chnk}, false, [&](auto&& record_debug_info) { - // record debug - record_debug_info(tsk, [this](buffer_id b) { return m_buffers.at(b).debug_name; }); - }); - - // Mark that region as freshly written - const auto& off = a_chunk.chnk.offset; - const auto& rng = a_chunk.chnk.range; - box<3> bx(off, off + rng); - region_builder<3> rb; - rb.add(bx); - auto region_covered = std::move(rb).into_region(); - bstate.local_last_writer.update_region(region_covered, cmd); - bstate.initialized_region = region_union(bstate.initialized_region, region_covered); + const auto chunks = split_task_and_assign_chunks(tsk); + + // Create a command for each chunk that belongs to our local node. + for(const auto& a_chunk : chunks) { + if(a_chunk.executed_on != m_local_nid) continue; + auto* cmd = create_command(current_batch, &tsk, subrange<3>{a_chunk.chnk}, false, + [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); }); + + // Mark that subrange as freshly written, so there’s no “uninitialized read” later. + box<3> write_box(a_chunk.chnk.offset, a_chunk.chnk.offset + a_chunk.chnk.range); + region<3> written_region{write_box}; + bstate.local_last_writer.update_region(written_region, cmd); + bstate.initialized_region = region_union(bstate.initialized_region, written_region); } + // Return here so we skip the normal device-logic below. return; } @@ -571,65 +553,40 @@ void command_graph_generator::generate_distributed_commands(batch& current_batch } for(const auto& [bid, consumed, produced] : requirements) { - auto& bstate = m_buffers.at(bid); - - // PARTIAL GENERATION: if this buffer belongs to a generator kernel and we read data - if(bstate.generator_task != nullptr && !consumed.empty()) { - // Which region is still uninitialized? - auto missing = region_difference(consumed, bstate.initialized_region); - if(!missing.empty()) { - auto missing_box = bounding_box(missing); - - // Create partial execution command for the “init” kernel - auto* gen_cmd = - create_command(current_batch, bstate.generator_task, subrange<3>{missing_box.get_min(), missing_box.get_range()}, - /* is_reduction_init */ false, [&](auto&& record_debug_info) { - // debug info optional - }); - - // Mark that region as locally fresh - bstate.local_last_writer.update_region(missing, gen_cmd); - bstate.initialized_region = region_union(bstate.initialized_region, missing); - - // If you never expect more partial subranges, optionally: - // bstate.generator_task = nullptr; - } - } + auto& buffer = m_buffers.at(bid); + // Process consuming accesses first, so we don't add dependencies onto our own writes if(!consumed.empty()) { - for(const auto& [subbox, wcs] : bstate.local_last_writer.get_region_values(consumed)) { - if(!subbox.empty() && !wcs.is_fresh()) { - utils::report_error(m_policy.uninitialized_read_error, "Command C{} on N{} reads uninitialized region {} of buffer {}", cmd->get_id(), - m_local_nid, subbox, print_buffer_debug_label(bid)); - } + for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) { + if(box.empty()) continue; + assert(wcs.is_fresh() && "Unresolved remote data dependency"); add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow); } - // record these reads - m_command_buffer_reads[cmd->get_id()][bid] = region_union(m_command_buffer_reads[cmd->get_id()][bid], consumed); + + // Store the read access for determining anti-dependencies later on + m_command_buffer_reads[cmd->get_id()].emplace(bid, consumed); } if(!produced.empty()) { - generate_anti_dependencies(tsk, bid, bstate.local_last_writer, produced, cmd); - - // Update last writer - bstate.local_last_writer.update_region(produced, cmd); - bstate.replicated_regions.update_region(produced, node_bitset{}); + generate_anti_dependencies(tsk, bid, buffer.local_last_writer, produced, cmd); + buffer.local_last_writer.update_region(produced, cmd); + buffer.replicated_regions.update_region(produced, node_bitset{}); // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction. - if(bstate.pending_reduction.has_value()) { - m_completed_reductions.push_back(bstate.pending_reduction->rid); - bstate.pending_reduction.reset(); + if(buffer.pending_reduction.has_value()) { + m_completed_reductions.push_back(buffer.pending_reduction->rid); + buffer.pending_reduction = std::nullopt; } - // Track local writes - per_buffer_local_writes[bid] = region_union(per_buffer_local_writes[bid], produced); + per_buffer_local_writes.emplace(bid, produced); } if(m_policy.uninitialized_read_error != error_policy::ignore) { - auto uninit = region_difference(consumed, bstate.initialized_region); - if(!uninit.empty()) { - utils::report_error(m_policy.uninitialized_read_error, "Command C{} on N{} reads region {} of buffer {} that was never written", - cmd->get_id(), m_local_nid, uninit, print_buffer_debug_label(bid)); + if(const auto uninitialized_reads = region_difference(consumed, buffer.initialized_region); !uninitialized_reads.empty()) { + utils::report_error(m_policy.uninitialized_read_error, + "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_id(), m_local_nid, + box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid), + uninitialized_reads); } } }