Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/platform/include/common/platform_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ constexpr int PLATFORM_MAX_CORES =
* Performance buffer capacity for each double buffer
* Number of PerfRecord entries per buffer (ping or pong)
*/
constexpr int PLATFORM_PROF_BUFFER_SIZE = 20;
constexpr int PLATFORM_PROF_BUFFER_SIZE = 100;

/**
* Ready queue capacity for performance data collection
Expand All @@ -94,7 +94,7 @@ constexpr uint64_t PLATFORM_PROF_SYS_CNT_FREQ = 50000000; // 50 MHz
/**
* Timeout duration for performance data collection (seconds)
*/
constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 2;
constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 30;

/**
* Number of empty polling iterations before checking timeout
Expand Down
35 changes: 28 additions & 7 deletions src/platform/src/performance_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ int PerformanceCollector::initialize(Runtime& runtime,
header->queue_head = 0;
header->queue_tail = 0;
header->num_cores = num_aicore;
header->total_tasks = 0xFFFFFFFF; // Special value: uninitialized (AICPU will overwrite with actual count)

LOG_DEBUG("Initialized PerfDataHeader:");
LOG_DEBUG(" num_cores: %d", header->num_cores);
Expand Down Expand Up @@ -131,29 +132,31 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks)

// Poll for total_tasks if not provided
if (expected_tasks <= 0) {
LOG_INFO("Waiting for AICPU to write total_tasks to PerfDataHeader...");
LOG_INFO("Waiting for AICPU to write total_tasks in PerfDataHeader...");
idle_start = std::chrono::steady_clock::now();

while (true) {
rmb();
expected_tasks = static_cast<int>(header->total_tasks);
uint32_t raw_total_tasks = header->total_tasks;

if (expected_tasks > 0) {
LOG_INFO("Task count read from PerfDataHeader: %d", expected_tasks);
// Check if AICPU has written a valid task count (> 0 and != 0xFFFFFFFF)
if (raw_total_tasks != 0xFFFFFFFF && raw_total_tasks > 0) {
expected_tasks = static_cast<int>(raw_total_tasks);
LOG_INFO("AICPU reported task count: %d", expected_tasks);
break;
}

auto elapsed = std::chrono::steady_clock::now() - idle_start.value();
if (elapsed >= timeout_duration) {
LOG_ERROR("Timeout waiting for total_tasks from AICPU after %ld seconds",
LOG_ERROR("Timeout waiting for AICPU task count after %ld seconds",
std::chrono::duration_cast<std::chrono::seconds>(elapsed).count());
LOG_ERROR("AICPU may not have initialized performance profiling");
LOG_ERROR("AICPU may not have started orchestration or graph is empty");
return;
}
}
}

LOG_DEBUG("Expected tasks: %d", expected_tasks);
LOG_DEBUG("Initial expected tasks: %d", expected_tasks);

uint32_t capacity = PLATFORM_PROF_READYQUEUE_SIZE;
int total_records_collected = 0;
Expand All @@ -162,10 +165,23 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks)
collected_perf_records_.clear();
idle_start.reset();
int empty_poll_count = 0;
int last_logged_expected = -1; // -1 triggers first update

// Poll the ready queue
while (total_records_collected < expected_tasks) {
rmb();

// Dynamically refresh expected_tasks from header (AICPU updates it continuously)
int current_expected = static_cast<int>(header->total_tasks);
if (current_expected > expected_tasks) {
expected_tasks = current_expected;
// Only log first update or when orchestration completes (total_tasks stops changing)
if (last_logged_expected < 0) {
LOG_INFO("Updated expected_tasks to %d (orchestrator progress)", expected_tasks);
last_logged_expected = expected_tasks;
}
}

uint32_t head = header->queue_head;
uint32_t tail = header->queue_tail;

Expand Down Expand Up @@ -227,6 +243,11 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks)
buffers_processed++;
}

// Log final task count if it changed from initial
if (last_logged_expected >= 0 && expected_tasks != last_logged_expected) {
LOG_INFO("Final expected_tasks: %d (orchestration complete)", expected_tasks);
}

LOG_INFO("Total buffers processed: %d", buffers_processed);
LOG_INFO("Total records collected: %d", total_records_collected);

Expand Down
45 changes: 44 additions & 1 deletion src/runtime/host_build_graph/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,15 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread
LOG_WARN("Thread %d: Core %d cannot switch, buffer%u status=%u, spinning until Host reads it",
thread_idx, core_id, alternate_buffer_id, static_cast<uint32_t>(alternate_status));

// Spin wait: continuously check alternate buffer status until Host sets it to IDLE
// Time-based timeout: use get_sys_cnt_aicpu for accurate timing
constexpr uint64_t TIMEOUT_SECONDS = 1;
constexpr uint64_t TIMEOUT_CYCLES = TIMEOUT_SECONDS * PLATFORM_PROF_SYS_CNT_FREQ;
constexpr uint64_t WARN_INTERVAL_CYCLES = PLATFORM_PROF_SYS_CNT_FREQ; // 1 second

uint64_t start_time = get_sys_cnt_aicpu();
uint64_t last_warn_time = start_time;
bool timeout = false;

while (true) {
rmb(); // Read barrier: ensure reading latest status modified by Host
alternate_status = *alternate_status_ptr;
Expand All @@ -887,6 +895,41 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread
thread_idx, core_id, alternate_buffer_id);
break;
}

uint64_t current_time = get_sys_cnt_aicpu();
uint64_t elapsed = current_time - start_time;

// Check timeout
if (elapsed >= TIMEOUT_CYCLES) {
LOG_ERROR("Thread %d: Core %d buffer%u timeout after %lu seconds (status=%u)",
thread_idx, core_id, alternate_buffer_id, TIMEOUT_SECONDS,
static_cast<uint32_t>(alternate_status));
LOG_ERROR("Host may have stopped collecting performance data");
LOG_ERROR("Forcing buffer%u to IDLE and discarding performance data to prevent deadlock",
alternate_buffer_id);
timeout = true;
break;
}

// Periodic warning every second
if (current_time - last_warn_time >= WARN_INTERVAL_CYCLES) {
uint64_t elapsed_sec = elapsed / PLATFORM_PROF_SYS_CNT_FREQ;
LOG_WARN("Thread %d: Core %d buffer%u still busy after %lu seconds (status=%u)",
thread_idx, core_id, alternate_buffer_id, elapsed_sec,
static_cast<uint32_t>(alternate_status));
last_warn_time = current_time;
}
}

// Handle timeout: force clear alternate buffer and continue
if (timeout) {
// Force reset alternate buffer to IDLE state
alternate_buf->count = 0;
*alternate_status_ptr = BufferStatus::IDLE;
wmb();

LOG_ERROR("Thread %d: Core %d forced buffer%u to IDLE, performance data lost",
thread_idx, core_id, alternate_buffer_id);
}
}

Expand Down
92 changes: 73 additions & 19 deletions src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ struct AicpuExecutor {
std::atomic<bool> pto2_init_done_{false};
std::atomic<bool> pto2_init_complete_{false}; // init block finished; others wait for this
std::atomic<int> next_scan_index_{0};
std::atomic<bool> perf_init_done_{false};
std::atomic<bool> sm_header_ready_{false}; // Thread 3 sets after SM header init

// Orchestrator ready queue pointers (set by Thread 3, read by scheduler threads)
Expand Down Expand Up @@ -443,6 +442,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx,
const int MAX_IDLE_ITERATIONS = 50000000;
const int WARN_INTERVAL = 1000000;
bool profiling_enabled = runtime->enable_profiling;
int32_t last_reported_task_count = -1; // -1 triggers first update

// Scheduler profiling counters
uint64_t sched_scan_ns = 0, sched_orch_drain_ns = 0;
Expand Down Expand Up @@ -471,22 +471,39 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx,

bool made_progress = false;

// Update perf header total_tasks after orchestrator sets the final count
if (profiling_enabled && orch_done && !perf_init_done_.load(std::memory_order_acquire)) {
bool expected = false;
if (perf_init_done_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
void* perf_base = (void*)runtime->perf_data_base;
if (perf_base) {
// Continuously update perf header total_tasks as orchestrator makes progress
// Read current_task_index which is updated incrementally by orchestrator after each task submission
int32_t visible_tasks = 0;
if (profiling_enabled) {
void* perf_base = (void*)runtime->perf_data_base;
if (perf_base) {
// Get current visible task count from orchestrator's incremental progress
visible_tasks = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE);

// Use the maximum of total_tasks_ (final count when orch completes) and visible_tasks (incremental)
int32_t current_count = std::max(task_count, visible_tasks);

if (current_count > 0 && current_count != last_reported_task_count) {
PerfDataHeader* perf_hdr = get_perf_header(perf_base);
perf_hdr->total_tasks = static_cast<uint32_t>(task_count);
perf_hdr->total_tasks = static_cast<uint32_t>(current_count);
wmb();

// Only log on first update or when orchestrator completes
if (last_reported_task_count < 0 || task_count > 0) {
DEV_INFO("Thread %d: Updated perf header total_tasks to %d (visible=%d, final=%d)",
thread_idx, current_count, visible_tasks, task_count);
}

last_reported_task_count = current_count;
}
}
}

// Incremental scan: discover root tasks (fanin_count == 0)
{
int32_t visible = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE);
int32_t visible = (profiling_enabled && visible_tasks > 0) ?
visible_tasks :
__atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE);
while (true) {
int32_t idx = next_scan_index_.load(std::memory_order_acquire);
if (idx >= visible) break;
Expand Down Expand Up @@ -984,7 +1001,6 @@ void AicpuExecutor::deinit() {
pto2_init_done_.store(false, std::memory_order_release);
pto2_init_complete_.store(false, std::memory_order_release);
next_scan_index_.store(0, std::memory_order_release);
perf_init_done_.store(false, std::memory_order_release);
sm_header_ready_.store(false, std::memory_order_release);

// Reset core discovery state
Expand Down Expand Up @@ -1076,16 +1092,11 @@ void AicpuExecutor::init_performance_profiling(Runtime* runtime) {
return;
}

PerfDataHeader* header = get_perf_header(perf_base);
DoubleBuffer* buffers = get_double_buffers(perf_base);

// Write total_tasks to shared memory header for Host access
// This is necessary because Runtime object is not copied back from Device to Host
int32_t task_count = total_tasks_.load(std::memory_order_acquire);
header->total_tasks = static_cast<uint32_t>(task_count);
wmb(); // Ensure total_tasks is visible to Host

LOG_INFO("Initializing performance profiling for %d cores, total_tasks=%d", runtime->worker_count, task_count);
// Do NOT write total_tasks here - wait until we have actual task count from orchestrator
// Writing 0 would cause Host to think it's an empty graph and exit collection
LOG_INFO("Initializing performance profiling for %d cores (total_tasks will be written when available)", runtime->worker_count);

// Assign initial buffer (buffer1) to each AICore
for (int i = 0; i < runtime->worker_count; i++) {
Expand Down Expand Up @@ -1240,7 +1251,15 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread
LOG_WARN("Thread %d: Core %d cannot switch, buffer%u status=%u, spinning until Host reads it",
thread_idx, core_id, alternate_buffer_id, static_cast<uint32_t>(alternate_status));

// Spin wait: continuously check alternate buffer status until Host sets it to IDLE
// Time-based timeout: use get_sys_cnt_aicpu for accurate timing
constexpr uint64_t TIMEOUT_SECONDS = 1;
constexpr uint64_t TIMEOUT_CYCLES = TIMEOUT_SECONDS * PLATFORM_PROF_SYS_CNT_FREQ;
constexpr uint64_t WARN_INTERVAL_CYCLES = PLATFORM_PROF_SYS_CNT_FREQ; // 1 second

uint64_t start_time = get_sys_cnt_aicpu();
uint64_t last_warn_time = start_time;
bool timeout = false;

while (true) {
rmb(); // Read barrier: ensure reading latest status modified by Host
alternate_status = *alternate_status_ptr;
Expand All @@ -1250,6 +1269,41 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread
thread_idx, core_id, alternate_buffer_id);
break;
}

uint64_t current_time = get_sys_cnt_aicpu();
uint64_t elapsed = current_time - start_time;

// Check timeout
if (elapsed >= TIMEOUT_CYCLES) {
LOG_ERROR("Thread %d: Core %d buffer%u timeout after %lu seconds (status=%u)",
thread_idx, core_id, alternate_buffer_id, TIMEOUT_SECONDS,
static_cast<uint32_t>(alternate_status));
LOG_ERROR("Host may have stopped collecting performance data");
LOG_ERROR("Forcing buffer%u to IDLE and discarding performance data to prevent deadlock",
alternate_buffer_id);
timeout = true;
break;
}

// Periodic warning every second
if (current_time - last_warn_time >= WARN_INTERVAL_CYCLES) {
uint64_t elapsed_sec = elapsed / PLATFORM_PROF_SYS_CNT_FREQ;
LOG_WARN("Thread %d: Core %d buffer%u still busy after %lu seconds (status=%u)",
thread_idx, core_id, alternate_buffer_id, elapsed_sec,
static_cast<uint32_t>(alternate_status));
last_warn_time = current_time;
}
}

// Handle timeout: force clear alternate buffer and continue
if (timeout) {
// Force reset alternate buffer to IDLE state
alternate_buf->count = 0;
*alternate_status_ptr = BufferStatus::IDLE;
wmb();

LOG_ERROR("Thread %d: Core %d forced buffer%u to IDLE, performance data lost",
thread_idx, core_id, alternate_buffer_id);
}
}

Expand Down
Loading