Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/platform/a2a3/host/device_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "device_runner.h"

#include <dlfcn.h>
#include <thread>

// Include HAL constants from CANN (header only, library loaded dynamically)
#include "ascend_hal.h"
Expand Down Expand Up @@ -397,16 +398,23 @@ int DeviceRunner::run(Runtime& runtime,
return rc;
}

// Poll and collect performance data (must be before stream sync)
// Poll and collect performance data in a separate thread so it runs
// concurrently with stream synchronization. This prevents a deadlock
// where AICPU spins waiting for the host to consume a full perf buffer
// while the host has already stopped polling.
std::thread collector_thread;
if (runtime.enable_profiling) {
poll_and_collect_performance_data(runtime.worker_count, runtime.get_task_count());
collector_thread = std::thread([this, &runtime]() {
poll_and_collect_performance_data(runtime.worker_count, runtime.get_task_count());
});
}

std::cout << "\n=== rtStreamSynchronize stream_aicpu_===" << '\n';
// Synchronize streams
rc = rtStreamSynchronize(stream_aicpu_);
if (rc != 0) {
LOG_ERROR("rtStreamSynchronize (AICPU) failed: %d", rc);
if (collector_thread.joinable()) collector_thread.join();
kernel_args_.finalize_runtime_args();
return rc;
}
Expand All @@ -415,11 +423,17 @@ int DeviceRunner::run(Runtime& runtime,
rc = rtStreamSynchronize(stream_aicore_);
if (rc != 0) {
LOG_ERROR("rtStreamSynchronize (AICore) failed: %d", rc);
if (collector_thread.joinable()) collector_thread.join();
kernel_args_.finalize_runtime_args();
return rc;
}

// Print collected performance data (after stream sync)
// Wait for collector thread to finish
if (collector_thread.joinable()) {
collector_thread.join();
}

// Export collected performance data (after stream sync + collection done)
if (runtime.enable_profiling) {
export_swimlane_json();
}
Expand Down
2 changes: 1 addition & 1 deletion src/platform/include/common/platform_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ constexpr uint64_t PLATFORM_PROF_SYS_CNT_FREQ = 50000000; // 50 MHz
/**
* Timeout duration for performance data collection (seconds)
*/
constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 2;
constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 60;

/**
* Number of empty polling iterations before checking timeout
Expand Down
58 changes: 28 additions & 30 deletions src/platform/src/performance_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,47 +129,44 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks)
const auto timeout_duration = std::chrono::seconds(PLATFORM_PROF_TIMEOUT_SECONDS);
std::optional<std::chrono::steady_clock::time_point> idle_start;

// Poll for total_tasks if not provided
if (expected_tasks <= 0) {
LOG_INFO("Waiting for AICPU to write total_tasks to PerfDataHeader...");
idle_start = std::chrono::steady_clock::now();

while (true) {
rmb();
expected_tasks = static_cast<int>(header->total_tasks);

if (expected_tasks > 0) {
LOG_INFO("Task count read from PerfDataHeader: %d", expected_tasks);
break;
}

auto elapsed = std::chrono::steady_clock::now() - idle_start.value();
if (elapsed >= timeout_duration) {
LOG_ERROR("Timeout waiting for total_tasks from AICPU after %ld seconds",
std::chrono::duration_cast<std::chrono::seconds>(elapsed).count());
LOG_ERROR("AICPU may not have initialized performance profiling");
return;
}
}
}

LOG_DEBUG("Expected tasks: %d", expected_tasks);

uint32_t capacity = PLATFORM_PROF_READYQUEUE_SIZE;
int total_records_collected = 0;
int buffers_processed = 0;

collected_perf_records_.clear();
idle_start.reset();
int empty_poll_count = 0;

// Poll the ready queue
while (total_records_collected < expected_tasks) {
// When expected_tasks is unknown (<=0), we start consuming buffers immediately
// and dynamically check header->total_tasks to learn the final count.
bool tasks_known = (expected_tasks > 0);
if (!tasks_known) {
LOG_INFO("Task count unknown, will poll total_tasks from PerfDataHeader while consuming buffers");
}

// Unified polling loop: consume ready buffers and check for total_tasks
while (true) {
// Dynamically discover total_tasks if not yet known
if (!tasks_known) {
rmb();
int discovered = static_cast<int>(header->total_tasks);
if (discovered > 0) {
expected_tasks = discovered;
tasks_known = true;
LOG_INFO("Task count read from PerfDataHeader: %d", expected_tasks);
}
}

// Termination: collected enough records
if (tasks_known && total_records_collected >= expected_tasks) {
break;
}

rmb();
uint32_t head = header->queue_head;
uint32_t tail = header->queue_tail;

if (head == tail) {
// Queue empty — track idle time for timeout
if (!idle_start.has_value()) {
idle_start = std::chrono::steady_clock::now();
}
Expand All @@ -189,6 +186,7 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks)
continue;
}

// Got data — reset idle tracking
idle_start.reset();
empty_poll_count = 0;

Expand Down Expand Up @@ -230,7 +228,7 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks)
LOG_INFO("Total buffers processed: %d", buffers_processed);
LOG_INFO("Total records collected: %d", total_records_collected);

if (total_records_collected < expected_tasks) {
if (tasks_known && total_records_collected < expected_tasks) {
LOG_WARN("Incomplete collection (%d / %d records)",
total_records_collected, expected_tasks);
}
Expand Down
Loading