From 1f13cf5691512cf130299e69c386877ea364a3a8 Mon Sep 17 00:00:00 2001 From: ChaoZheng109 Date: Sat, 14 Feb 2026 11:35:27 +0800 Subject: [PATCH] Fix: resolve performance profiling deadlock with dynamic task count updates Fixes spinlock deadlock in AICPU performance profiling where Device threads hang waiting for Host to read buffers. The root cause was Host exiting collection early due to seeing total_tasks=0 during parallel orchestration. Changes: - Use 0xFFFFFFFF as uninitialized marker for total_tasks instead of 0 - AICPU dynamically updates total_tasks from orchestrator's current_task_index - Host polls and refreshes expected_tasks during collection - Optimize to avoid duplicate atomic reads by reusing visible_tasks - Reduce log frequency to only first update and orchestration completion - Increase profiling timeout from 2s to 30s for large graphs - Add timeout detection in switch_perf_buffer spinlock This enables real-time visibility into orchestrator progress and prevents deadlock when orchestration and scheduling run in parallel. --- src/platform/include/common/platform_config.h | 4 +- src/platform/src/performance_collector.cpp | 35 +++++-- .../host_build_graph/aicpu/aicpu_executor.cpp | 45 ++++++++- .../aicpu/aicpu_executor.cpp | 92 +++++++++++++++---- 4 files changed, 147 insertions(+), 29 deletions(-) diff --git a/src/platform/include/common/platform_config.h b/src/platform/include/common/platform_config.h index 5992f563..7495e78e 100644 --- a/src/platform/include/common/platform_config.h +++ b/src/platform/include/common/platform_config.h @@ -76,7 +76,7 @@ constexpr int PLATFORM_MAX_CORES = * Performance buffer capacity for each double buffer * Number of PerfRecord entries per buffer (ping or pong) */ -constexpr int PLATFORM_PROF_BUFFER_SIZE = 20; +constexpr int PLATFORM_PROF_BUFFER_SIZE = 100; /** * Ready queue capacity for performance data collection @@ -94,7 +94,7 @@ constexpr uint64_t PLATFORM_PROF_SYS_CNT_FREQ = 50000000; // 50 MHz /** * Timeout duration for performance data collection (seconds) */ -constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 2; +constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 30; /** * Number of empty polling iterations before checking timeout diff --git a/src/platform/src/performance_collector.cpp b/src/platform/src/performance_collector.cpp index 53a9145e..3b6effbd 100644 --- a/src/platform/src/performance_collector.cpp +++ b/src/platform/src/performance_collector.cpp @@ -82,6 +82,7 @@ int PerformanceCollector::initialize(Runtime& runtime, header->queue_head = 0; header->queue_tail = 0; header->num_cores = num_aicore; + header->total_tasks = 0xFFFFFFFF; // Special value: uninitialized (AICPU will overwrite with actual count) LOG_DEBUG("Initialized PerfDataHeader:"); LOG_DEBUG(" num_cores: %d", header->num_cores); @@ -131,29 +132,31 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks) // Poll for total_tasks if not provided if (expected_tasks <= 0) { - LOG_INFO("Waiting for AICPU to write total_tasks to PerfDataHeader..."); + LOG_INFO("Waiting for AICPU to write total_tasks in PerfDataHeader..."); idle_start = std::chrono::steady_clock::now(); while (true) { rmb(); - expected_tasks = static_cast(header->total_tasks); + uint32_t raw_total_tasks = header->total_tasks; - if (expected_tasks > 0) { - LOG_INFO("Task count read from PerfDataHeader: %d", expected_tasks); + // Check if AICPU has written a valid task count (> 0 and != 0xFFFFFFFF) + if (raw_total_tasks != 0xFFFFFFFF && raw_total_tasks > 0) { + expected_tasks = static_cast(raw_total_tasks); + LOG_INFO("AICPU reported task count: %d", expected_tasks); break; } auto elapsed = std::chrono::steady_clock::now() - idle_start.value(); if (elapsed >= timeout_duration) { - LOG_ERROR("Timeout waiting for total_tasks from AICPU after %ld seconds", + LOG_ERROR("Timeout waiting for AICPU task count after %ld seconds", std::chrono::duration_cast(elapsed).count()); - LOG_ERROR("AICPU may not have initialized performance profiling"); + LOG_ERROR("AICPU may not have started orchestration or graph is empty"); return; } } } - LOG_DEBUG("Expected tasks: %d", expected_tasks); + LOG_DEBUG("Initial expected tasks: %d", expected_tasks); uint32_t capacity = PLATFORM_PROF_READYQUEUE_SIZE; int total_records_collected = 0; @@ -162,10 +165,23 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks) collected_perf_records_.clear(); idle_start.reset(); int empty_poll_count = 0; + int last_logged_expected = -1; // -1 triggers first update // Poll the ready queue while (total_records_collected < expected_tasks) { rmb(); + + // Dynamically refresh expected_tasks from header (AICPU updates it continuously) + int current_expected = static_cast(header->total_tasks); + if (current_expected > expected_tasks) { + expected_tasks = current_expected; + // Only log first update or when orchestration completes (total_tasks stops changing) + if (last_logged_expected < 0) { + LOG_INFO("Updated expected_tasks to %d (orchestrator progress)", expected_tasks); + last_logged_expected = expected_tasks; + } + } + uint32_t head = header->queue_head; uint32_t tail = header->queue_tail; @@ -227,6 +243,11 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks) buffers_processed++; } + // Log final task count if it changed from initial + if (last_logged_expected >= 0 && expected_tasks != last_logged_expected) { + LOG_INFO("Final expected_tasks: %d (orchestration complete)", expected_tasks); + } + LOG_INFO("Total buffers processed: %d", buffers_processed); LOG_INFO("Total records collected: %d", total_records_collected); diff --git a/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp b/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp index 4b378fa0..eb1b0801 100644 --- a/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp +++ b/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp @@ -877,7 +877,15 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread LOG_WARN("Thread %d: Core %d cannot switch, buffer%u status=%u, spinning until Host reads it", thread_idx, core_id, alternate_buffer_id, static_cast(alternate_status)); - // Spin wait: continuously check alternate buffer status until Host sets it to IDLE + // Time-based timeout: use get_sys_cnt_aicpu for accurate timing + constexpr uint64_t TIMEOUT_SECONDS = 1; + constexpr uint64_t TIMEOUT_CYCLES = TIMEOUT_SECONDS * PLATFORM_PROF_SYS_CNT_FREQ; + constexpr uint64_t WARN_INTERVAL_CYCLES = PLATFORM_PROF_SYS_CNT_FREQ; // 1 second + + uint64_t start_time = get_sys_cnt_aicpu(); + uint64_t last_warn_time = start_time; + bool timeout = false; + while (true) { rmb(); // Read barrier: ensure reading latest status modified by Host alternate_status = *alternate_status_ptr; @@ -887,6 +895,41 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread thread_idx, core_id, alternate_buffer_id); break; } + + uint64_t current_time = get_sys_cnt_aicpu(); + uint64_t elapsed = current_time - start_time; + + // Check timeout + if (elapsed >= TIMEOUT_CYCLES) { + LOG_ERROR("Thread %d: Core %d buffer%u timeout after %lu seconds (status=%u)", + thread_idx, core_id, alternate_buffer_id, TIMEOUT_SECONDS, + static_cast(alternate_status)); + LOG_ERROR("Host may have stopped collecting performance data"); + LOG_ERROR("Forcing buffer%u to IDLE and discarding performance data to prevent deadlock", + alternate_buffer_id); + timeout = true; + break; + } + + // Periodic warning every second + if (current_time - last_warn_time >= WARN_INTERVAL_CYCLES) { + uint64_t elapsed_sec = elapsed / PLATFORM_PROF_SYS_CNT_FREQ; + LOG_WARN("Thread %d: Core %d buffer%u still busy after %lu seconds (status=%u)", + thread_idx, core_id, alternate_buffer_id, elapsed_sec, + static_cast(alternate_status)); + last_warn_time = current_time; + } + } + + // Handle timeout: force clear alternate buffer and continue + if (timeout) { + // Force reset alternate buffer to IDLE state + alternate_buf->count = 0; + *alternate_status_ptr = BufferStatus::IDLE; + wmb(); + + LOG_ERROR("Thread %d: Core %d forced buffer%u to IDLE, performance data lost", + thread_idx, core_id, alternate_buffer_id); } } diff --git a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index e8f5699c..a6f7ef19 100644 --- a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -109,7 +109,6 @@ struct AicpuExecutor { std::atomic pto2_init_done_{false}; std::atomic pto2_init_complete_{false}; // init block finished; others wait for this std::atomic next_scan_index_{0}; - std::atomic perf_init_done_{false}; std::atomic sm_header_ready_{false}; // Thread 3 sets after SM header init // Orchestrator ready queue pointers (set by Thread 3, read by scheduler threads) @@ -443,6 +442,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, const int MAX_IDLE_ITERATIONS = 50000000; const int WARN_INTERVAL = 1000000; bool profiling_enabled = runtime->enable_profiling; + int32_t last_reported_task_count = -1; // -1 triggers first update // Scheduler profiling counters uint64_t sched_scan_ns = 0, sched_orch_drain_ns = 0; @@ -471,22 +471,39 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, bool made_progress = false; - // Update perf header total_tasks after orchestrator sets the final count - if (profiling_enabled && orch_done && !perf_init_done_.load(std::memory_order_acquire)) { - bool expected = false; - if (perf_init_done_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base) { + // Continuously update perf header total_tasks as orchestrator makes progress + // Read current_task_index which is updated incrementally by orchestrator after each task submission + int32_t visible_tasks = 0; + if (profiling_enabled) { + void* perf_base = (void*)runtime->perf_data_base; + if (perf_base) { + // Get current visible task count from orchestrator's incremental progress + visible_tasks = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); + + // Use the maximum of total_tasks_ (final count when orch completes) and visible_tasks (incremental) + int32_t current_count = std::max(task_count, visible_tasks); + + if (current_count > 0 && current_count != last_reported_task_count) { PerfDataHeader* perf_hdr = get_perf_header(perf_base); - perf_hdr->total_tasks = static_cast(task_count); + perf_hdr->total_tasks = static_cast(current_count); wmb(); + + // Only log on first update or when orchestrator completes + if (last_reported_task_count < 0 || task_count > 0) { + DEV_INFO("Thread %d: Updated perf header total_tasks to %d (visible=%d, final=%d)", + thread_idx, current_count, visible_tasks, task_count); + } + + last_reported_task_count = current_count; } } } // Incremental scan: discover root tasks (fanin_count == 0) { - int32_t visible = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); + int32_t visible = (profiling_enabled && visible_tasks > 0) ? + visible_tasks : + __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); while (true) { int32_t idx = next_scan_index_.load(std::memory_order_acquire); if (idx >= visible) break; @@ -984,7 +1001,6 @@ void AicpuExecutor::deinit() { pto2_init_done_.store(false, std::memory_order_release); pto2_init_complete_.store(false, std::memory_order_release); next_scan_index_.store(0, std::memory_order_release); - perf_init_done_.store(false, std::memory_order_release); sm_header_ready_.store(false, std::memory_order_release); // Reset core discovery state @@ -1076,16 +1092,11 @@ void AicpuExecutor::init_performance_profiling(Runtime* runtime) { return; } - PerfDataHeader* header = get_perf_header(perf_base); DoubleBuffer* buffers = get_double_buffers(perf_base); - // Write total_tasks to shared memory header for Host access - // This is necessary because Runtime object is not copied back from Device to Host - int32_t task_count = total_tasks_.load(std::memory_order_acquire); - header->total_tasks = static_cast(task_count); - wmb(); // Ensure total_tasks is visible to Host - - LOG_INFO("Initializing performance profiling for %d cores, total_tasks=%d", runtime->worker_count, task_count); + // Do NOT write total_tasks here - wait until we have actual task count from orchestrator + // Writing 0 would cause Host to think it's an empty graph and exit collection + LOG_INFO("Initializing performance profiling for %d cores (total_tasks will be written when available)", runtime->worker_count); // Assign initial buffer (buffer1) to each AICore for (int i = 0; i < runtime->worker_count; i++) { @@ -1240,7 +1251,15 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread LOG_WARN("Thread %d: Core %d cannot switch, buffer%u status=%u, spinning until Host reads it", thread_idx, core_id, alternate_buffer_id, static_cast(alternate_status)); - // Spin wait: continuously check alternate buffer status until Host sets it to IDLE + // Time-based timeout: use get_sys_cnt_aicpu for accurate timing + constexpr uint64_t TIMEOUT_SECONDS = 1; + constexpr uint64_t TIMEOUT_CYCLES = TIMEOUT_SECONDS * PLATFORM_PROF_SYS_CNT_FREQ; + constexpr uint64_t WARN_INTERVAL_CYCLES = PLATFORM_PROF_SYS_CNT_FREQ; // 1 second + + uint64_t start_time = get_sys_cnt_aicpu(); + uint64_t last_warn_time = start_time; + bool timeout = false; + while (true) { rmb(); // Read barrier: ensure reading latest status modified by Host alternate_status = *alternate_status_ptr; @@ -1250,6 +1269,41 @@ void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread thread_idx, core_id, alternate_buffer_id); break; } + + uint64_t current_time = get_sys_cnt_aicpu(); + uint64_t elapsed = current_time - start_time; + + // Check timeout + if (elapsed >= TIMEOUT_CYCLES) { + LOG_ERROR("Thread %d: Core %d buffer%u timeout after %lu seconds (status=%u)", + thread_idx, core_id, alternate_buffer_id, TIMEOUT_SECONDS, + static_cast(alternate_status)); + LOG_ERROR("Host may have stopped collecting performance data"); + LOG_ERROR("Forcing buffer%u to IDLE and discarding performance data to prevent deadlock", + alternate_buffer_id); + timeout = true; + break; + } + + // Periodic warning every second + if (current_time - last_warn_time >= WARN_INTERVAL_CYCLES) { + uint64_t elapsed_sec = elapsed / PLATFORM_PROF_SYS_CNT_FREQ; + LOG_WARN("Thread %d: Core %d buffer%u still busy after %lu seconds (status=%u)", + thread_idx, core_id, alternate_buffer_id, elapsed_sec, + static_cast(alternate_status)); + last_warn_time = current_time; + } + } + + // Handle timeout: force clear alternate buffer and continue + if (timeout) { + // Force reset alternate buffer to IDLE state + alternate_buf->count = 0; + *alternate_status_ptr = BufferStatus::IDLE; + wmb(); + + LOG_ERROR("Thread %d: Core %d forced buffer%u to IDLE, performance data lost", + thread_idx, core_id, alternate_buffer_id); } }