diff --git a/src/platform/a2a3/aicore/inner_kernel.h b/src/platform/a2a3/aicore/inner_kernel.h index 1a424e7e..cc21b453 100644 --- a/src/platform/a2a3/aicore/inner_kernel.h +++ b/src/platform/a2a3/aicore/inner_kernel.h @@ -65,4 +65,17 @@ __aicore__ inline uint32_t get_physical_core_id() { return static_cast(get_coreid()) & AICORE_COREID_MASK; } +// ============================================================================= +// System Counter +// ============================================================================= + +/** + * Get AICore system counter + * + * @return Hardware counter value (ticks) + */ +__aicore__ __attribute__((always_inline)) inline uint64_t get_sys_cnt_aicore() { + return get_sys_cnt(); +} + #endif // PLATFORM_A2A3_AICORE_INNER_KERNEL_H_ diff --git a/src/platform/a2a3/host/CMakeLists.txt b/src/platform/a2a3/host/CMakeLists.txt index 7dfabdb1..fd3cc1ad 100644 --- a/src/platform/a2a3/host/CMakeLists.txt +++ b/src/platform/a2a3/host/CMakeLists.txt @@ -27,7 +27,7 @@ list(APPEND HOST_RUNTIME_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/host_regs.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/host_log.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/unified_log_host.cpp" - "${CMAKE_CURRENT_SOURCE_DIR}/../../src/performance_collector.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/performance_collector.cpp" ) if(DEFINED CUSTOM_SOURCE_DIRS) foreach(SRC_DIR ${CUSTOM_SOURCE_DIRS}) diff --git a/src/platform/a2a3sim/aicore/inner_kernel.h b/src/platform/a2a3sim/aicore/inner_kernel.h index b63a1b58..3d31272e 100644 --- a/src/platform/a2a3sim/aicore/inner_kernel.h +++ b/src/platform/a2a3sim/aicore/inner_kernel.h @@ -36,21 +36,17 @@ // ============================================================================= /** - * Simulated system counter for performance profiling + * Get simulated AICore system counter * - * Returns monotonic counter value at 1850 MHz frequency. - * Uses std::chrono::high_resolution_clock and converts to counter ticks. - * - * @return Simulated counter value (ticks since epoch) + * @return Simulated counter value (ticks) */ -inline uint64_t get_sys_cnt() { +inline uint64_t get_sys_cnt_aicore() { auto now = std::chrono::high_resolution_clock::now(); uint64_t elapsed_ns = std::chrono::duration_cast( now.time_since_epoch() ).count(); - // Convert nanoseconds to counter ticks at PLATFORM_PROF_SYS_CNT_FREQ - // Split elapsed_ns into seconds and remainder to avoid overflow + // Convert nanoseconds to counter ticks constexpr uint64_t kNsPerSec = std::nano::den; uint64_t seconds = elapsed_ns / kNsPerSec; uint64_t remaining_ns = elapsed_ns % kNsPerSec; diff --git a/src/platform/a2a3sim/host/CMakeLists.txt b/src/platform/a2a3sim/host/CMakeLists.txt index dc14b48e..4f6517ce 100644 --- a/src/platform/a2a3sim/host/CMakeLists.txt +++ b/src/platform/a2a3sim/host/CMakeLists.txt @@ -31,7 +31,7 @@ list(APPEND HOST_RUNTIME_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/platform_compile_info.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/host_log.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/unified_log_host.cpp" - "${CMAKE_CURRENT_SOURCE_DIR}/../../src/performance_collector.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/performance_collector.cpp" ) if(DEFINED CUSTOM_SOURCE_DIRS) diff --git a/src/platform/include/aicore/performance_collector_aicore.h b/src/platform/include/aicore/performance_collector_aicore.h new file mode 100644 index 00000000..c9e7fb9f --- /dev/null +++ b/src/platform/include/aicore/performance_collector_aicore.h @@ -0,0 +1,70 @@ +/** + * @file performance_collector_aicore.h + * @brief AICore performance data collection interface + * + * Provides lightweight performance recording interface for AICore kernels. + * Uses dcci for efficient cache management instead of memory barriers. + */ + +#ifndef PLATFORM_AICORE_PERFORMANCE_COLLECTOR_AICORE_H_ +#define PLATFORM_AICORE_PERFORMANCE_COLLECTOR_AICORE_H_ + +#include "common/perf_profiling.h" +#include "aicore/aicore.h" + +// Include platform-specific timestamp implementation +// Build system selects the correct inner_kernel.h based on platform: +// - src/platform/a2a3/aicore/inner_kernel.h (real hardware) +// - src/platform/a2a3sim/aicore/inner_kernel.h (simulation) +// Both provide unified get_sys_cnt_aicore() interface +#include "inner_kernel.h" + +// ============= Public Interface ============= + +/** + * Record task execution performance data + * + * Writes performance metrics to the provided buffer. Buffer management + * and status tracking are handled by AICPU. + * + * @param perf_buf Performance buffer pointer + * @param task_id Task ID + * @param func_id Function ID + * @param start_time Start timestamp + * @param end_time End timestamp + * @param kernel_ready_time Kernel ready timestamp + * @param core_id Core ID + * @param core_type Core type (AIC/AIV) + */ +__aicore__ __attribute__((always_inline)) +static inline void perf_aicore_record_task( + __gm__ PerfBuffer* perf_buf, + uint32_t task_id, + uint32_t func_id, + uint64_t start_time, + uint64_t end_time, + uint64_t kernel_ready_time, + int core_id, + CoreType core_type) { + + // Read current buffer count + dcci(&perf_buf->count, SINGLE_CACHE_LINE, 0); + uint32_t idx = perf_buf->count; + __gm__ PerfRecord* record = &perf_buf->records[idx]; + + // Write record data + record->start_time = start_time; + record->end_time = end_time; + record->kernel_ready_time = kernel_ready_time; + record->task_id = task_id; + record->func_id = func_id; + record->core_id = core_id; + record->core_type = core_type; + + perf_buf->count = idx + 1; + + // Flush cache to make data visible + dcci(perf_buf, ENTIRE_DATA_CACHE, CACHELINE_OUT); +} + +#endif // PLATFORM_AICORE_PERFORMANCE_COLLECTOR_AICORE_H_ diff --git a/src/platform/include/aicpu/performance_collector_aicpu.h b/src/platform/include/aicpu/performance_collector_aicpu.h new file mode 100644 index 00000000..7a831eba --- /dev/null +++ b/src/platform/include/aicpu/performance_collector_aicpu.h @@ -0,0 +1,81 @@ +/** + * @file performance_collector_aicpu.h + * @brief AICPU performance data collection interface + * + * Provides performance profiling management interface for AICPU side. + * Handles buffer initialization, switching, and flushing. + */ + +#ifndef PLATFORM_AICPU_PERFORMANCE_COLLECTOR_AICPU_H_ +#define PLATFORM_AICPU_PERFORMANCE_COLLECTOR_AICPU_H_ + +#include "common/perf_profiling.h" +#include "runtime.h" + +// Include platform-specific timestamp implementation +// Build system selects the correct inner_aicpu.h based on platform: +// Both provide unified get_sys_cnt_aicpu() interface +#include "device_time.h" + +// ============= Public Interface ============= + +/** + * Initialize performance profiling + * + * Sets up double buffers for each core and initializes tracking state. + * + * @param runtime Runtime instance pointer + */ +void perf_aicpu_init_profiling(Runtime* runtime); + +/** + * Record dispatch and finish timestamps + * + * Updates task record with AICPU-side timing information. + * + * @param record PerfRecord pointer to update + * @param dispatch_time Dispatch timestamp + * @param finish_time Finish timestamp + */ +void perf_aicpu_record_dispatch_and_finish_time(PerfRecord* record, + uint64_t dispatch_time, + uint64_t finish_time); + +/** + * Switch performance buffer when current buffer is full + * + * Checks buffer capacity and switches to alternate buffer if needed. + * + * @param runtime Runtime instance pointer + * @param core_id Core ID + * @param thread_idx Thread index + */ +void perf_aicpu_switch_buffer(Runtime* runtime, int core_id, int thread_idx); + +/** + * Flush remaining performance data + * + * Marks non-empty buffers as ready and enqueues them for host collection. + * + * @param runtime Runtime instance pointer + * @param thread_idx Thread index + * @param cur_thread_cores Array of core IDs managed by this thread + * @param core_num Number of cores managed by this thread + */ +void perf_aicpu_flush_buffers(Runtime* runtime, + int thread_idx, + const int* cur_thread_cores, + int core_num); + +/** + * Update total task count in performance header + * + * Allows dynamic update of total_tasks as orchestrator makes progress. + * Used by tensormap_and_ringbuffer runtime where task count grows incrementally. + * + * @param runtime Runtime instance pointer + * @param total_tasks Current total task count + */ +void perf_aicpu_update_total_tasks(Runtime* runtime, uint32_t total_tasks); + +#endif // PLATFORM_AICPU_PERFORMANCE_COLLECTOR_AICPU_H_ diff --git a/src/platform/include/common/perf_profiling.h b/src/platform/include/common/perf_profiling.h index fb99199d..2c4ee962 100644 --- a/src/platform/include/common/perf_profiling.h +++ b/src/platform/include/common/perf_profiling.h @@ -155,22 +155,24 @@ struct ReadyQueueEntry { * Performance data fixed header * * Located at the start of shared memory, contains: - * 1. Ready queue (FIFO Circular Buffer) + * 1. Per-thread ready queues (FIFO Circular Buffers) * 2. Metadata (core count) * * Ready queue design: - * - Capacity: PLATFORM_PROF_READYQUEUE_SIZE (max 2 buffers ready per core) + * - Per-thread queues: Avoid lock contention between AICPU threads + * - Capacity per queue: PLATFORM_PROF_READYQUEUE_SIZE (full capacity for each thread) * - Implementation: Circular Buffer - * - Producer: AICPU (adds full buffers) - * - Consumer: Host (reads and clears buffers) + * - Producer: AICPU thread (adds full buffers to its own queue) + * - Consumer: Host (reads from all queues) * - Queue empty: head == tail * - Queue full: (tail + 1) % capacity == head */ struct PerfDataHeader { - // Ready queue (FIFO Circular Buffer) - ReadyQueueEntry queue[PLATFORM_PROF_READYQUEUE_SIZE]; // Queue array - volatile uint32_t queue_head; // Consumer read position (Host modifies) - volatile uint32_t queue_tail; // Producer write position (AICPU modifies) + // Per-thread ready queues (FIFO Circular Buffers) + // Each AICPU thread has its own queue to avoid lock contention + ReadyQueueEntry queues[PLATFORM_MAX_AICPU_THREADS][PLATFORM_PROF_READYQUEUE_SIZE]; + volatile uint32_t queue_heads[PLATFORM_MAX_AICPU_THREADS]; // Consumer read positions (Host modifies) + volatile uint32_t queue_tails[PLATFORM_MAX_AICPU_THREADS]; // Producer write positions (AICPU modifies) // Metadata (Host initializes, Device read-only) uint32_t num_cores; // Actual number of cores launched diff --git a/src/platform/include/common/platform_config.h b/src/platform/include/common/platform_config.h index cbffee5f..2909157a 100644 --- a/src/platform/include/common/platform_config.h +++ b/src/platform/include/common/platform_config.h @@ -76,7 +76,7 @@ constexpr int PLATFORM_MAX_CORES = * Performance buffer capacity for each double buffer * Number of PerfRecord entries per buffer (ping or pong) */ -constexpr int PLATFORM_PROF_BUFFER_SIZE = 20; +constexpr int PLATFORM_PROF_BUFFER_SIZE = 1000; /** * Ready queue capacity for performance data collection @@ -94,7 +94,7 @@ constexpr uint64_t PLATFORM_PROF_SYS_CNT_FREQ = 50000000; // 50 MHz /** * Timeout duration for performance data collection (seconds) */ -constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 2; +constexpr int PLATFORM_PROF_TIMEOUT_SECONDS = 30; /** * Number of empty polling iterations before checking timeout diff --git a/src/platform/src/aicpu/performance_collector_aicpu.cpp b/src/platform/src/aicpu/performance_collector_aicpu.cpp new file mode 100644 index 00000000..77f5ad0c --- /dev/null +++ b/src/platform/src/aicpu/performance_collector_aicpu.cpp @@ -0,0 +1,292 @@ +/** + * @file performance_collector_aicpu.cpp + * @brief AICPU performance data collection implementation + */ + +#include "aicpu/performance_collector_aicpu.h" +#include "common/memory_barrier.h" +#include "common/unified_log.h" +#include "common/platform_config.h" + +/** + * Enqueue ready buffer to per-thread queue + * + * @param header PerfDataHeader pointer + * @param thread_idx Thread index + * @param core_index Core index + * @param buffer_id Buffer ID (1 or 2) + * @return 0 on success, -1 if queue full + */ +static int enqueue_ready_buffer(PerfDataHeader* header, + int thread_idx, + uint32_t core_index, + uint32_t buffer_id) { + uint32_t capacity = PLATFORM_PROF_READYQUEUE_SIZE; + uint32_t current_tail = header->queue_tails[thread_idx]; + uint32_t current_head = header->queue_heads[thread_idx]; + + // Check if queue is full + uint32_t next_tail = (current_tail + 1) % capacity; + if (next_tail == current_head) { + return -1; + } + + header->queues[thread_idx][current_tail].core_index = core_index; + header->queues[thread_idx][current_tail].buffer_id = buffer_id; + header->queue_tails[thread_idx] = next_tail; + + return 0; +} + +void perf_aicpu_init_profiling(Runtime* runtime) { + void* perf_base = (void*)runtime->perf_data_base; + if (perf_base == nullptr) { + LOG_ERROR("perf_data_base is NULL, cannot initialize profiling"); + return; + } + + PerfDataHeader* header = get_perf_header(perf_base); + DoubleBuffer* buffers = get_double_buffers(perf_base); + + int32_t task_count = runtime->get_task_count(); + header->total_tasks = static_cast(task_count); + + LOG_INFO("Initializing performance profiling for %d cores", runtime->worker_count); + + // Assign buffer1 to each core for initial writing + for (int i = 0; i < runtime->worker_count; i++) { + Handshake* h = &runtime->workers[i]; + DoubleBuffer* db = &buffers[i]; + + h->perf_records_addr = (uint64_t)&db->buffer1; + db->buffer1_status = BufferStatus::WRITING; + + LOG_DEBUG("Core %d: assigned buffer1 (addr=0x%lx)", i, h->perf_records_addr); + } + + wmb(); + + LOG_INFO("Performance profiling initialized for %d cores", runtime->worker_count); +} + +void perf_aicpu_record_dispatch_and_finish_time(PerfRecord* record, + uint64_t dispatch_time, + uint64_t finish_time) { + rmb(); + + record->dispatch_time = dispatch_time; + record->finish_time = finish_time; + + wmb(); +} + + +void perf_aicpu_switch_buffer(Runtime* runtime, int core_id, int thread_idx) { + void* perf_base = (void*)runtime->perf_data_base; + if (perf_base == nullptr) { + return; + } + + rmb(); + + Handshake* h = &runtime->workers[core_id]; + PerfDataHeader* header = get_perf_header(perf_base); + DoubleBuffer* db = get_core_double_buffer(perf_base, core_id); + + uint64_t current_addr = h->perf_records_addr; + uint64_t buffer1_addr = (uint64_t)&db->buffer1; + uint64_t buffer2_addr = (uint64_t)&db->buffer2; + + uint32_t full_buffer_id = 0; + PerfBuffer* full_buf = nullptr; + volatile BufferStatus* full_status_ptr = nullptr; + PerfBuffer* alternate_buf = nullptr; + volatile BufferStatus* alternate_status_ptr = nullptr; + uint32_t alternate_buffer_id = 0; + + if (current_addr == buffer1_addr) { + full_buffer_id = 1; + full_buf = &db->buffer1; + full_status_ptr = &db->buffer1_status; + alternate_buf = &db->buffer2; + alternate_status_ptr = &db->buffer2_status; + alternate_buffer_id = 2; + } else if (current_addr == buffer2_addr) { + full_buffer_id = 2; + full_buf = &db->buffer2; + full_status_ptr = &db->buffer2_status; + alternate_buf = &db->buffer1; + alternate_status_ptr = &db->buffer1_status; + alternate_buffer_id = 1; + } else { + LOG_ERROR("Thread %d: Core %d has invalid perf_records_addr=0x%lx", + thread_idx, core_id, current_addr); + return; + } + + LOG_INFO("Thread %d: Core %d buffer%u is full (count=%u)", + thread_idx, core_id, full_buffer_id, full_buf->count); + + // Complete performance records by filling fanout information + // Use Runtime's method - each runtime provides its own implementation + runtime->complete_perf_records(full_buf); + + BufferStatus alternate_status = *alternate_status_ptr; + + // Wait if alternate buffer is not ready + if (alternate_status != BufferStatus::IDLE) { + LOG_WARN("Thread %d: Core %d cannot switch, buffer%u status=%u, spinning until Host reads it", + thread_idx, core_id, alternate_buffer_id, static_cast(alternate_status)); + + constexpr uint64_t TIMEOUT_SECONDS = 2; + + uint64_t start_time = get_sys_cnt_aicpu(); + bool timeout = false; + + while (true) { + rmb(); + alternate_status = *alternate_status_ptr; + uint64_t current_time = get_sys_cnt_aicpu(); + uint64_t elapsed = current_time - start_time; + + if (alternate_status == BufferStatus::IDLE) { + LOG_INFO("Thread %d: Core %d buffer%u now idle, proceeding with switch", + thread_idx, core_id, alternate_buffer_id); + break; + } + + if (elapsed >= TIMEOUT_SECONDS * PLATFORM_PROF_SYS_CNT_FREQ) { + LOG_ERROR("Thread %d: Core %d buffer%u timeout after %lu seconds (status=%u)", + thread_idx, core_id, alternate_buffer_id, TIMEOUT_SECONDS, + static_cast(alternate_status)); + LOG_ERROR("Forcing buffer%u to IDLE and discarding performance data to prevent deadlock", + alternate_buffer_id); + timeout = true; + break; + } + } + + // Discard full buffer data on timeout to avoid deadlock + if (timeout) { + full_buf->count = 0; + *full_status_ptr = BufferStatus::WRITING; + + wmb(); + + LOG_ERROR("Thread %d: Core %d timeout - discarded buffer%u data, reusing it for writing", + thread_idx, core_id, full_buffer_id); + + return; + } + } + + *full_status_ptr = BufferStatus::READY; + *alternate_status_ptr = BufferStatus::WRITING; + + // Enqueue full buffer + int enqueue_result = enqueue_ready_buffer(header, thread_idx, core_id, full_buffer_id); + if (enqueue_result != 0) { + LOG_ERROR("Thread %d: Core %d failed to enqueue buffer%u (queue full), data lost!", + thread_idx, core_id, full_buffer_id); + // Revert status changes since we failed to enqueue + *full_status_ptr = BufferStatus::WRITING; + *alternate_status_ptr = BufferStatus::IDLE; + return; + } + + LOG_INFO("Thread %d: Core %d enqueued buffer%u", thread_idx, core_id, full_buffer_id); + + h->perf_records_addr = (uint64_t)alternate_buf; + + LOG_INFO("Thread %d: Core %d switched to buffer%u", + thread_idx, core_id, alternate_buffer_id); +} + +void perf_aicpu_flush_buffers(Runtime* runtime, + int thread_idx, + const int* cur_thread_cores, + int core_num) { + if (!runtime->enable_profiling) { + return; + } + + void* perf_base = (void*)runtime->perf_data_base; + if (perf_base == nullptr) { + return; + } + + rmb(); + + PerfDataHeader* header = get_perf_header(perf_base); + DoubleBuffer* buffers = get_double_buffers(perf_base); + + LOG_INFO("Thread %d: Flushing performance buffers for %d cores", thread_idx, core_num); + + int flushed_count = 0; + + for (int i = 0; i < core_num; i++) { + int core_id = cur_thread_cores[i]; + Handshake* h = &runtime->workers[core_id]; + DoubleBuffer* db = &buffers[core_id]; + + uint64_t current_addr = h->perf_records_addr; + if (current_addr == 0) { + continue; + } + + uint64_t buf1_addr = (uint64_t)&db->buffer1; + uint64_t buf2_addr = (uint64_t)&db->buffer2; + + PerfBuffer* current_buf = nullptr; + volatile BufferStatus* current_status = nullptr; + uint32_t buffer_id = 0; + + if (current_addr == buf1_addr) { + current_buf = &db->buffer1; + current_status = &db->buffer1_status; + buffer_id = 1; + } else if (current_addr == buf2_addr) { + current_buf = &db->buffer2; + current_status = &db->buffer2_status; + buffer_id = 2; + } else { + LOG_WARN("Thread %d: Core %d perf_records_addr=0x%lx doesn't match buffer1=0x%lx or buffer2=0x%lx", + thread_idx, core_id, current_addr, buf1_addr, buf2_addr); + continue; + } + + uint32_t count = current_buf->count; + + if (count > 0) { + runtime->complete_perf_records(current_buf); + + *current_status = BufferStatus::READY; + + int rc = enqueue_ready_buffer(header, thread_idx, core_id, buffer_id); + if (rc == 0) { + LOG_INFO("Thread %d: Core %d flushed buffer%d with %u records", + thread_idx, core_id, buffer_id, count); + flushed_count++; + } else { + LOG_ERROR("Thread %d: Core %d failed to enqueue buffer%d (queue full), data lost!", + thread_idx, core_id, buffer_id); + // Revert status since we failed to enqueue + *current_status = BufferStatus::WRITING; + } + } + } + + LOG_INFO("Thread %d: Performance buffer flush complete, %d buffers flushed", + thread_idx, flushed_count); +} + +void perf_aicpu_update_total_tasks(Runtime* runtime, uint32_t total_tasks) { + void* perf_base = (void*)runtime->perf_data_base; + if (perf_base == nullptr) { + return; + } + + PerfDataHeader* header = get_perf_header(perf_base); + header->total_tasks = total_tasks; + wmb(); +} diff --git a/src/platform/src/performance_collector.cpp b/src/platform/src/host/performance_collector.cpp similarity index 78% rename from src/platform/src/performance_collector.cpp rename to src/platform/src/host/performance_collector.cpp index 57d674cd..88af71c5 100644 --- a/src/platform/src/performance_collector.cpp +++ b/src/platform/src/host/performance_collector.cpp @@ -18,8 +18,6 @@ #include "common/unified_log.h" PerformanceCollector::~PerformanceCollector() { - // Destructor should not call finalize() because callbacks are not available - // User must explicitly call finalize() before destruction if (perf_shared_mem_host_ != nullptr) { LOG_WARN("PerformanceCollector destroyed without finalize()"); } @@ -78,17 +76,23 @@ int PerformanceCollector::initialize(Runtime& runtime, LOG_DEBUG("Simulation mode: host_ptr = dev_ptr = %p", perf_host_ptr); } - // Step 4: Initialize fixed header + // Step 4: Initialize header PerfDataHeader* header = get_perf_header(perf_host_ptr); - memset(header->queue, 0, sizeof(header->queue)); - header->queue_head = 0; - header->queue_tail = 0; + + for (int t = 0; t < PLATFORM_MAX_AICPU_THREADS; t++) { + memset(header->queues[t], 0, sizeof(header->queues[t])); + header->queue_heads[t] = 0; + header->queue_tails[t] = 0; + } + header->num_cores = num_aicore; + header->total_tasks = 0; LOG_DEBUG("Initialized PerfDataHeader:"); LOG_DEBUG(" num_cores: %d", header->num_cores); LOG_DEBUG(" buffer_capacity: %d", PLATFORM_PROF_BUFFER_SIZE); LOG_DEBUG(" queue capacity: %d", PLATFORM_PROF_READYQUEUE_SIZE); + LOG_DEBUG(" num threads: %d", PLATFORM_MAX_AICPU_THREADS); // Step 5: Initialize all DoubleBuffers DoubleBuffer* buffers = get_double_buffers(perf_host_ptr); @@ -103,14 +107,12 @@ int PerformanceCollector::initialize(Runtime& runtime, } LOG_DEBUG("Initialized %d DoubleBuffers (all status=0, idle)", num_aicore); - // Write memory barrier wmb(); // Step 6: Pass to Runtime runtime.perf_data_base = (uint64_t)perf_dev_ptr; LOG_DEBUG("Set runtime.perf_data_base = 0x%lx", runtime.perf_data_base); - // Save pointers perf_shared_mem_dev_ = perf_dev_ptr; perf_shared_mem_host_ = perf_host_ptr; @@ -131,61 +133,76 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks) const auto timeout_duration = std::chrono::seconds(PLATFORM_PROF_TIMEOUT_SECONDS); std::optional idle_start; - // Poll for total_tasks if not provided if (expected_tasks <= 0) { - LOG_INFO("Waiting for AICPU to write total_tasks to PerfDataHeader..."); + LOG_INFO("Waiting for AICPU to write total_tasks in PerfDataHeader..."); idle_start = std::chrono::steady_clock::now(); while (true) { rmb(); - expected_tasks = static_cast(header->total_tasks); + uint32_t raw_total_tasks = header->total_tasks; - if (expected_tasks > 0) { - LOG_INFO("Task count read from PerfDataHeader: %d", expected_tasks); + if (raw_total_tasks > 0) { + expected_tasks = static_cast(raw_total_tasks); + LOG_INFO("AICPU reported task count: %d", expected_tasks); break; } auto elapsed = std::chrono::steady_clock::now() - idle_start.value(); if (elapsed >= timeout_duration) { - LOG_ERROR("Timeout waiting for total_tasks from AICPU after %ld seconds", + LOG_ERROR("Timeout waiting for AICPU task count after %ld seconds", std::chrono::duration_cast(elapsed).count()); - LOG_ERROR("AICPU may not have initialized performance profiling"); + LOG_INFO("AICPU finally reported task count: %d", raw_total_tasks); return; } } } - LOG_DEBUG("Expected tasks: %d", expected_tasks); + LOG_DEBUG("Initial expected tasks: %d", expected_tasks); - uint32_t capacity = PLATFORM_PROF_READYQUEUE_SIZE; int total_records_collected = 0; int buffers_processed = 0; collected_perf_records_.clear(); idle_start.reset(); int empty_poll_count = 0; + int last_logged_expected = -1; + + int current_thread = 0; - // Poll the ready queue while (total_records_collected < expected_tasks) { rmb(); - uint32_t head = header->queue_head; - uint32_t tail = header->queue_tail; - if (head == tail) { - if (!idle_start.has_value()) { - idle_start = std::chrono::steady_clock::now(); + int current_expected = static_cast(header->total_tasks); + if (current_expected > expected_tasks) { + expected_tasks = current_expected; + if (last_logged_expected < 0) { + LOG_INFO("Updated expected_tasks to %d (orchestrator progress)", expected_tasks); + last_logged_expected = expected_tasks; } + } + + uint32_t head = header->queue_heads[current_thread]; + uint32_t tail = header->queue_tails[current_thread]; + + if (head == tail) { + current_thread = (current_thread + 1) % PLATFORM_MAX_AICPU_THREADS; - empty_poll_count++; - if (empty_poll_count >= PLATFORM_PROF_EMPTY_POLLS_CHECK_NUM) { - empty_poll_count = 0; - auto elapsed = std::chrono::steady_clock::now() - idle_start.value(); - if (elapsed >= timeout_duration) { - LOG_WARN("Performance data collection idle timeout after %ld seconds", - std::chrono::duration_cast(elapsed).count()); - LOG_WARN("Collected %d / %d records before timeout", - total_records_collected, expected_tasks); - break; + if (current_thread == 0) { + if (!idle_start.has_value()) { + idle_start = std::chrono::steady_clock::now(); + } + + empty_poll_count++; + if (empty_poll_count >= PLATFORM_PROF_EMPTY_POLLS_CHECK_NUM) { + empty_poll_count = 0; + auto elapsed = std::chrono::steady_clock::now() - idle_start.value(); + if (elapsed >= timeout_duration) { + LOG_ERROR("Performance data collection idle timeout after %ld seconds", + std::chrono::duration_cast(elapsed).count()); + LOG_ERROR("Collected %d / %d records before timeout", + total_records_collected, expected_tasks); + break; + } } } continue; @@ -194,7 +211,7 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks) idle_start.reset(); empty_poll_count = 0; - ReadyQueueEntry entry = header->queue[head]; + ReadyQueueEntry entry = header->queues[current_thread][head]; uint32_t core_index = entry.core_index; uint32_t buffer_id = entry.buffer_id; @@ -203,7 +220,7 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks) break; } - LOG_DEBUG("Processing: core=%u, buffer=%u", core_index, buffer_id); + LOG_DEBUG("Processing: thread=%d, core=%u, buffer=%u", current_thread, core_index, buffer_id); DoubleBuffer* db = &buffers[core_index]; PerfBuffer* buf = nullptr; @@ -221,12 +238,16 @@ void PerformanceCollector::poll_and_collect(int num_aicore, int expected_tasks) buf->count = 0; *status = BufferStatus::IDLE; - wmb(); - - header->queue_head = (head + 1) % capacity; + header->queue_heads[current_thread] = (head + 1) % PLATFORM_PROF_READYQUEUE_SIZE; wmb(); buffers_processed++; + + current_thread = (current_thread + 1) % PLATFORM_MAX_AICPU_THREADS; + } + + if (last_logged_expected >= 0 && expected_tasks != last_logged_expected) { + LOG_INFO("Final expected_tasks: %d (orchestration complete)", expected_tasks); } LOG_INFO("Total buffers processed: %d", buffers_processed); @@ -269,6 +290,11 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { if (record.kernel_ready_time < base_time_cycles) { base_time_cycles = record.kernel_ready_time; } + if (record.dispatch_time < base_time_cycles && record.dispatch_time > 0) { + base_time_cycles = record.dispatch_time; + LOG_WARN("Timestamp violation: dispatch_time (%lu) < base_time (%lu) for task %u, using dispatch_time as new base_time", + record.dispatch_time, base_time_cycles, record.task_id); + } } // Step 5: Generate filename with timestamp (YYYYMMDD_HHMMSS) @@ -302,7 +328,6 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { double dispatch_us = (record.dispatch_time > 0) ? cycles_to_us(record.dispatch_time - base_time_cycles) : 0.0; double finish_us = (record.finish_time > 0) ? cycles_to_us(record.finish_time - base_time_cycles) : 0.0; - // Determine core type string const char* core_type_str = (record.core_type == CoreType::AIC) ? "aic" : "aiv"; outfile << " {\n"; diff --git a/src/runtime/aicpu_build_graph/aicore/aicore_executor.cpp b/src/runtime/aicpu_build_graph/aicore/aicore_executor.cpp index f002616b..b1319238 100644 --- a/src/runtime/aicpu_build_graph/aicore/aicore_executor.cpp +++ b/src/runtime/aicpu_build_graph/aicore/aicore_executor.cpp @@ -44,6 +44,9 @@ __aicore__ __attribute__((always_inline)) static void execute_task(__gm__ Task* // All kernels have signature: void kernel(__gm__ int64_t* args) UnifiedKernelFunc kernel = (UnifiedKernelFunc)task->function_bin_addr; kernel(reinterpret_cast<__gm__ int64_t*>(task->args)); + + // Ensure all memory writes are visible to other cores + pipe_barrier(PIPE_ALL); } __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, int block_idx, CoreType core_type) { diff --git a/src/runtime/host_build_graph/aicore/aicore_executor.cpp b/src/runtime/host_build_graph/aicore/aicore_executor.cpp index 3a7cec4a..b84e55c0 100644 --- a/src/runtime/host_build_graph/aicore/aicore_executor.cpp +++ b/src/runtime/host_build_graph/aicore/aicore_executor.cpp @@ -1,66 +1,11 @@ #include "aicore/aicore.h" #include "runtime.h" #include "common/perf_profiling.h" +#include "aicore/performance_collector_aicore.h" #include "common/platform_config.h" // Platform configuration (C/C++ compatible) typedef void (*KernelFunc)(__gm__ int64_t*); -/** - * @brief Record task execution performance data - * - * This function records the performance metrics of a task execution to the profiling buffer. - * It writes the task timing data, metadata, and marks the buffer as full when needed. - * - * @param my_hank Pointer to the handshake structure containing perf buffer info - * @param task_ptr Pointer to the executed task - * @param start_time Task start timestamp - * @param end_time Task end timestamp - * @param block_idx AICore block index - * @param core_type Core type (AIC or AIV) - * @param kernel_ready_time Kernel ready timestamp (when AICore entered main loop) - */ -__aicore__ __attribute__((always_inline)) static void record_task_performance( - __gm__ Handshake* my_hank, - __gm__ Task* task_ptr, - uint64_t start_time, - uint64_t end_time, - int block_idx, - CoreType core_type, - uint64_t kernel_ready_time) { - - // dcci() for handshake visibility during profiling - dcci((__gm__ uint32_t*)&my_hank->perf_buffer_status, SINGLE_CACHE_LINE, CACHELINE_OUT); - - if (my_hank->perf_buffer_status != 0) { - return; - } - - __gm__ PerfBuffer* perf_buf = (__gm__ PerfBuffer*)my_hank->perf_records_addr; - uint32_t idx = perf_buf->count; - - if (idx < PLATFORM_PROF_BUFFER_SIZE) { - __gm__ PerfRecord* record = (__gm__ PerfRecord*)&perf_buf->records[idx]; - - record->start_time = start_time; - record->end_time = end_time; - record->kernel_ready_time = kernel_ready_time; - record->task_id = task_ptr->task_id; - record->func_id = task_ptr->func_id; - record->core_id = block_idx; - record->core_type = core_type; - - perf_buf->count = idx + 1; - dcci(record, ENTIRE_DATA_CACHE, CACHELINE_OUT); - if (perf_buf->count >= PLATFORM_PROF_BUFFER_SIZE) { - my_hank->perf_buffer_status = 1; - } - } else { - my_hank->perf_buffer_status = 1; - } - // dcci() for handshake visibility during profiling - dcci((__gm__ uint32_t*)&my_hank->perf_buffer_status, SINGLE_CACHE_LINE, CACHELINE_OUT); -} - __aicore__ __attribute__((always_inline)) static void execute_task(__gm__ Task* task) { if (task->function_bin_addr == 0) { return; @@ -91,7 +36,7 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in write_reg(RegId::COND, static_cast(AICoreStatus::IDLE)); bool profiling_enabled = runtime->enable_profiling; - uint64_t kernel_ready_time = get_sys_cnt(); + uint64_t kernel_ready_time = get_sys_cnt_aicore(); // Main loop: poll DATA_MAIN_BASE for task_id volatile uint32_t task_id = 0; @@ -107,14 +52,16 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in if (task_id != 0 && task_id != last_task_id) { write_reg(RegId::COND, static_cast(AICoreStatus::BUSY)); __gm__ Task* task_ptr = &(runtime->tasks[task_id - 1]); - uint64_t start_time = get_sys_cnt(); + uint64_t start_time = get_sys_cnt_aicore(); execute_task(task_ptr); if (profiling_enabled) { - uint64_t end_time = get_sys_cnt(); - record_task_performance(my_hank, task_ptr, start_time, end_time, - block_idx, core_type, kernel_ready_time); + uint64_t end_time = get_sys_cnt_aicore(); + __gm__ PerfBuffer* perf_buf = (__gm__ PerfBuffer*)my_hank->perf_records_addr; + perf_aicore_record_task(perf_buf, task_ptr->task_id, task_ptr->func_id, + start_time, end_time, kernel_ready_time, + block_idx, core_type); } last_task_id = task_id; diff --git a/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp b/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp index 48f2ec30..8d6cb82a 100644 --- a/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp +++ b/src/runtime/host_build_graph/aicpu/aicpu_executor.cpp @@ -9,6 +9,7 @@ #include "aicpu/platform_regs.h" #include "runtime.h" #include "aicpu/device_log.h" +#include "aicpu/performance_collector_aicpu.h" #include "aicpu/device_time.h" #include "aicpu/aicpu_regs.h" // Register-based communication @@ -73,8 +74,8 @@ struct AicpuExecutor { std::atomic finished_count_{0}; // ===== Performance profiling state ===== - std::mutex perf_ready_queue_mutex_; // Protects enqueue_ready_buffer operations uint64_t dispatch_timestamps_[RUNTIME_MAX_WORKER]; // Per-core AICPU dispatch timestamp + uint32_t core_dispatch_counts_[RUNTIME_MAX_WORKER]; // Per-core total dispatched task counter (for buffer management) // ===== Methods ===== int init(Runtime* runtime); @@ -86,13 +87,6 @@ struct AicpuExecutor { void deinit(); void diagnose_stuck_state(Runtime& runtime, int thread_idx, const int* cur_thread_cores, int core_num, Handshake* hank); - - // Performance profiling methods - void init_performance_profiling(Runtime* runtime); - void complete_perf_records(Runtime* runtime, PerfBuffer* perf_buf); - void switch_perf_buffer(Runtime* runtime, int core_id, int thread_idx); - int enqueue_ready_buffer(PerfDataHeader* header, uint32_t core_index, uint32_t buffer_id); - void flush_performance_buffers(Runtime* runtime, int thread_idx, const int* cur_thread_cores, int core_num); }; static AicpuExecutor g_aicpu_executor; @@ -159,9 +153,10 @@ int AicpuExecutor::init(Runtime* runtime) { ready_queue_aiv_head_ = 0; ready_queue_aiv_tail_ = 0; - // Reset per-core dispatch timestamps + // Reset per-core dispatch timestamps and counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; + core_dispatch_counts_[i] = 0; } int aic_count = 0; @@ -189,7 +184,7 @@ int AicpuExecutor::init(Runtime* runtime) { // Performance profiling initialization if (runtime->enable_profiling) { - init_performance_profiling(runtime); + perf_aicpu_init_profiling(runtime); } init_done_.store(true, std::memory_order_release); @@ -474,18 +469,13 @@ int AicpuExecutor::resolve_and_dispatch(Runtime& runtime, int thread_idx, const if (count > 0) { PerfRecord* record = &perf_buf->records[count - 1]; if (record->task_id == static_cast(completed_task_id)) { - record->dispatch_time = dispatch_timestamps_[core_id]; - record->finish_time = finish_ts; - wmb(); + perf_aicpu_record_dispatch_and_finish_time(record, + dispatch_timestamps_[core_id], + finish_ts); } } } - // Check and switch performance buffer if needed - if (profiling_enabled && h->perf_buffer_status == 1) { - switch_perf_buffer(&runtime, core_id, thread_idx); - } - Task* task = runtime.get_task(task_id); LOG_INFO("Thread %d: Core %d completed task %d", thread_idx, core_id, task_id); @@ -548,6 +538,16 @@ int AicpuExecutor::resolve_and_dispatch(Runtime& runtime, int thread_idx, const std::lock_guard lock(ready_queue_aic_mutex_); int count = ready_count_aic_.load(std::memory_order_relaxed); if (count > 0) { + // Check if buffer needs switching before dispatch + if (profiling_enabled) { + if (core_dispatch_counts_[core_id] >= PLATFORM_PROF_BUFFER_SIZE) { + perf_aicpu_switch_buffer(&runtime, core_id, thread_idx); + core_dispatch_counts_[core_id] = 0; + } + core_dispatch_counts_[core_id]++; + dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); + } + // Dequeue from head position (circular) - FIFO order int task_id = ready_queue_aic_[ready_queue_aic_head_]; ready_queue_aic_head_ = (ready_queue_aic_head_ + 1) % RUNTIME_MAX_TASKS; @@ -565,10 +565,7 @@ int AicpuExecutor::resolve_and_dispatch(Runtime& runtime, int thread_idx, const // Write task_id+1 to register write_reg(reg_addr, RegId::DATA_MAIN_BASE, static_cast(task_id + 1)); - - if (runtime.enable_profiling) { - dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); - } + executing_task_ids_[core_id] = task_id; cur_thread_tasks_in_flight++; made_progress = true; @@ -579,6 +576,16 @@ int AicpuExecutor::resolve_and_dispatch(Runtime& runtime, int thread_idx, const std::lock_guard lock(ready_queue_aiv_mutex_); int count = ready_count_aiv_.load(std::memory_order_relaxed); if (count > 0) { + // Check if buffer needs switching before dispatch + if (profiling_enabled) { + if (core_dispatch_counts_[core_id] >= PLATFORM_PROF_BUFFER_SIZE) { + perf_aicpu_switch_buffer(&runtime, core_id, thread_idx); + core_dispatch_counts_[core_id] = 0; + } + core_dispatch_counts_[core_id]++; + dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); + } + // Dequeue from head position (circular) - FIFO order int task_id = ready_queue_aiv_[ready_queue_aiv_head_]; ready_queue_aiv_head_ = (ready_queue_aiv_head_ + 1) % RUNTIME_MAX_TASKS; @@ -597,9 +604,6 @@ int AicpuExecutor::resolve_and_dispatch(Runtime& runtime, int thread_idx, const // Write task_id+1 to register write_reg(reg_addr, RegId::DATA_MAIN_BASE, static_cast(task_id + 1)); - if (runtime.enable_profiling) { - dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); - } executing_task_ids_[core_id] = task_id; cur_thread_tasks_in_flight++; made_progress = true; @@ -651,7 +655,7 @@ int AicpuExecutor::run(Runtime* runtime) { // Flush performance buffers for cores managed by this thread if (runtime->enable_profiling) { - flush_performance_buffers(runtime, thread_idx, cur_thread_cores, thread_cores_num_); + perf_aicpu_flush_buffers(runtime, thread_idx, cur_thread_cores, thread_cores_num_); } LOG_INFO("Thread %d: Completed", thread_idx); @@ -677,9 +681,10 @@ void AicpuExecutor::deinit() { ready_queue_aiv_head_ = 0; ready_queue_aiv_tail_ = 0; - // Reset per-core dispatch timestamps + // Reset per-core dispatch timestamps and counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; + core_dispatch_counts_[i] = 0; } completed_tasks_.store(0, std::memory_order_release); @@ -772,343 +777,6 @@ void AicpuExecutor::diagnose_stuck_state(Runtime& runtime, int thread_idx, LOG_ERROR("========== END DIAGNOSTIC =========="); } -// ============================================================================= -// Performance Profiling Methods -// ============================================================================= - -/** - * Initialize performance profiling for all cores - * - * Called once in init() after core discovery. - * Assigns buffer1 to each core and sets initial states. - */ -void AicpuExecutor::init_performance_profiling(Runtime* runtime) { - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base == nullptr) { - LOG_ERROR("perf_data_base is NULL, cannot initialize profiling"); - return; - } - - PerfDataHeader* header = get_perf_header(perf_base); - DoubleBuffer* buffers = get_double_buffers(perf_base); - - // Write total_tasks to shared memory header for Host access - // This is necessary because Runtime object is not copied back from Device to Host - int32_t task_count = total_tasks_.load(std::memory_order_acquire); - header->total_tasks = static_cast(task_count); - wmb(); // Ensure total_tasks is visible to Host - - LOG_INFO("Initializing performance profiling for %d cores", runtime->worker_count); - - // Assign initial buffer (buffer1) to each AICore - for (int i = 0; i < runtime->worker_count; i++) { - Handshake* h = &runtime->workers[i]; - DoubleBuffer* db = &buffers[i]; - - // Read memory barrier before checking buffer1 status - rmb(); - if (db->buffer1_status != BufferStatus::IDLE) { - LOG_WARN("Core %d: buffer1 not idle (status=%u)", i, static_cast(db->buffer1_status)); - } - - // Assign buffer1 to AICore - h->perf_records_addr = (uint64_t)&db->buffer1; - h->perf_buffer_status = 0; // 0 = can write - - // Write barrier: ensure writes visible to AICore before changing status - wmb(); - db->buffer1_status = BufferStatus::WRITING; - - LOG_INFO("Core %d: assigned buffer1 (addr=0x%lx)", i, h->perf_records_addr); - } - - LOG_INFO("Performance profiling initialized for %d cores", runtime->worker_count); -} - -/** - * Complete performance records by filling fanout information - * - * This function is called by AICPU to fill in fanout information that - * was not recorded by AICore. Duration is NOT calculated here - it will - * be calculated by Host when printing/processing the data. - * - * Called in two places: - * 1. switch_perf_buffer() - when switching buffers during normal operation - * 2. flush_performance_buffers() - when flushing buffers during shutdown - * - * @param runtime Runtime instance for querying Task fanout information - * @param perf_buf PerfBuffer to be completed with fanout data - */ -void AicpuExecutor::complete_perf_records(Runtime* runtime, PerfBuffer* perf_buf) { - uint32_t count = perf_buf->count; - - for (uint32_t i = 0; i < count; i++) { - PerfRecord* record = &perf_buf->records[i]; - uint32_t task_id = record->task_id; - - // Query Task by task_id (O(1) array indexing) - Task* task = runtime->get_task(task_id); - - if (task != nullptr) { - // Fill fanout information (duration will be calculated by Host) - record->fanout_count = task->fanout_count; - - // Copy fanout array - for (int32_t j = 0; j < task->fanout_count && j < RUNTIME_MAX_FANOUT; j++) { - record->fanout[j] = task->fanout[j]; - } - } else { - // Defensive: invalid task_id - record->fanout_count = 0; - } - } - - // Write memory barrier: ensure fanout data is visible to Host - wmb(); -} - -/** - * Switch performance buffer for a core - * - * Called when perf_buffer_status == 1 (buffer full). - * Determines which buffer is full by address comparison, - * then switches to the alternate buffer if available. - * - * @param runtime Runtime instance - * @param core_id AICore ID - * @param thread_idx AICPU thread ID (for logging) - */ -void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread_idx) { - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base == nullptr) { - return; - } - - Handshake* h = &runtime->workers[core_id]; - PerfDataHeader* header = get_perf_header(perf_base); - DoubleBuffer* db = get_core_double_buffer(perf_base, core_id); - - // Determine if current buffer is buffer1 or buffer2 by address comparison - uint64_t current_addr = h->perf_records_addr; - uint64_t buffer1_addr = (uint64_t)&db->buffer1; - uint64_t buffer2_addr = (uint64_t)&db->buffer2; - - uint32_t full_buffer_id = 0; - PerfBuffer* full_buf = nullptr; - volatile BufferStatus* full_status_ptr = nullptr; - PerfBuffer* alternate_buf = nullptr; - volatile BufferStatus* alternate_status_ptr = nullptr; - uint32_t alternate_buffer_id = 0; - - if (current_addr == buffer1_addr) { - // Current buffer is buffer1, it's full - full_buffer_id = 1; - full_buf = &db->buffer1; - full_status_ptr = &db->buffer1_status; - alternate_buf = &db->buffer2; - alternate_status_ptr = &db->buffer2_status; - alternate_buffer_id = 2; - } else if (current_addr == buffer2_addr) { - // Current buffer is buffer2, it's full - full_buffer_id = 2; - full_buf = &db->buffer2; - full_status_ptr = &db->buffer2_status; - alternate_buf = &db->buffer1; - alternate_status_ptr = &db->buffer1_status; - alternate_buffer_id = 1; - } else { - LOG_ERROR("Thread %d: Core %d has invalid perf_records_addr=0x%lx", - thread_idx, core_id, current_addr); - return; - } - - LOG_INFO("Thread %d: Core %d buffer%u is full (count=%u)", - thread_idx, core_id, full_buffer_id, full_buf->count); - - // Complete performance records by filling fanout information - // (called before checking alternate buffer status to make data ready earlier) - complete_perf_records(runtime, full_buf); - - // Read alternate buffer status (rmb needed, since status is modified by Host) - rmb(); - - BufferStatus alternate_status = *alternate_status_ptr; - - // If alternate buffer is not idle, spin wait for Host to finish reading - if (alternate_status != BufferStatus::IDLE) { - LOG_WARN("Thread %d: Core %d cannot switch, buffer%u status=%u, spinning until Host reads it", - thread_idx, core_id, alternate_buffer_id, static_cast(alternate_status)); - - // Spin wait: continuously check alternate buffer status until Host sets it to IDLE - while (true) { - rmb(); // Read barrier: ensure reading latest status modified by Host - alternate_status = *alternate_status_ptr; - - if (alternate_status == BufferStatus::IDLE) { - LOG_INFO("Thread %d: Core %d buffer%u now idle, proceeding with switch", - thread_idx, core_id, alternate_buffer_id); - break; - } - } - } - - // Alternate buffer is idle, can switch - - // Step 1: Enqueue full buffer to ready queue - int enqueue_result = enqueue_ready_buffer(header, core_id, full_buffer_id); - if (enqueue_result != 0) { - LOG_WARN("Thread %d: Core %d failed to enqueue buffer%u (queue full)", - thread_idx, core_id, full_buffer_id); - return; - } - - // Step 2: Change full buffer status to READY (visible to Host) - *full_status_ptr = BufferStatus::READY; - // Step 3: Change alternate buffer status to WRITING (visible to Host) - *alternate_status_ptr = BufferStatus::WRITING; - wmb(); // Write barrier: ensure status changes visible to Host - - LOG_INFO("Thread %d: Core %d enqueued buffer%u", thread_idx, core_id, full_buffer_id); - - // Step 4: Switch perf_records_addr to alternate buffer (visible to AICore) - h->perf_records_addr = (uint64_t)alternate_buf; - - // Step 5: Reset perf_buffer_status = 0 (notify AICore can continue writing) - h->perf_buffer_status = 0; - - LOG_INFO("Thread %d: Core %d switched to buffer%u (status=0)", - thread_idx, core_id, alternate_buffer_id); -} - -/** - * Enqueue a ready buffer to the queue - * - * Thread-safe: Uses mutex to protect queue operations since multiple - * AICPU threads may enqueue concurrently. - * - * @return 0=success, -1=queue full - */ -int AicpuExecutor::enqueue_ready_buffer(PerfDataHeader* header, uint32_t core_index, uint32_t buffer_id) { - std::lock_guard lock(perf_ready_queue_mutex_); - - uint32_t capacity = PLATFORM_PROF_READYQUEUE_SIZE; - - // Read barrier: ensure reading latest tail value - rmb(); - - uint32_t current_tail = header->queue_tail; - uint32_t current_head = header->queue_head; - - // Check if queue is full - uint32_t next_tail = (current_tail + 1) % capacity; - if (next_tail == current_head) { - return -1; // Queue full - } - - // Enqueue entry - header->queue[current_tail].core_index = core_index; - header->queue[current_tail].buffer_id = buffer_id; - header->queue_tail = next_tail; - - // Write memory barrier: ensure data written before updating tail, visible to Host - wmb(); - - return 0; -} - -/** - * Flush performance buffers for cores managed by this thread - * - * Called after shutdown_aicore to ensure all buffers with data - * (even if not full) are enqueued for Host collection. - * - * For each core managed by this thread: - * - Check which buffer is currently assigned (via perf_records_addr) - * - If buffer has data (count > 0), mark it as READY and enqueue - * - * @param runtime Runtime instance - * @param thread_idx Current thread index - * @param cur_thread_cores Array of core IDs managed by this thread - * @param core_num Number of cores managed by this thread - */ -void AicpuExecutor::flush_performance_buffers(Runtime* runtime, int thread_idx, const int* cur_thread_cores, int core_num) { - if (!runtime->enable_profiling) { - return; - } - - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base == nullptr) { - return; - } - - PerfDataHeader* header = get_perf_header(perf_base); - DoubleBuffer* buffers = get_double_buffers(perf_base); - - LOG_INFO("Thread %d: Flushing performance buffers for %d cores", thread_idx, core_num); - - int flushed_count = 0; - - // Only process cores managed by this thread - for (int i = 0; i < core_num; i++) { - int core_id = cur_thread_cores[i]; - Handshake* h = &runtime->workers[core_id]; - DoubleBuffer* db = &buffers[core_id]; - - // Read current buffer address - uint64_t current_addr = h->perf_records_addr; - if (current_addr == 0) { - continue; // No buffer assigned - } - - // Determine which buffer is current - uint64_t buf1_addr = (uint64_t)&db->buffer1; - uint64_t buf2_addr = (uint64_t)&db->buffer2; - - PerfBuffer* current_buf = nullptr; - volatile BufferStatus* current_status = nullptr; - uint32_t buffer_id = 0; - - if (current_addr == buf1_addr) { - current_buf = &db->buffer1; - current_status = &db->buffer1_status; - buffer_id = 1; - } else if (current_addr == buf2_addr) { - current_buf = &db->buffer2; - current_status = &db->buffer2_status; - buffer_id = 2; - } else { - LOG_WARN("Thread %d: Core %d perf_records_addr=0x%lx doesn't match buffer1=0x%lx or buffer2=0x%lx", - thread_idx, core_id, current_addr, buf1_addr, buf2_addr); - continue; - } - - // Read buffer count with memory barrier - rmb(); - uint32_t count = current_buf->count; - - // If buffer has data, enqueue it - if (count > 0) { - // Complete performance records by filling fanout information before flush - complete_perf_records(runtime, current_buf); - - // Mark buffer as READY - *current_status = BufferStatus::READY; - wmb(); - - // Enqueue to ready queue - int rc = enqueue_ready_buffer(header, core_id, buffer_id); - if (rc == 0) { - LOG_INFO("Thread %d: Core %d flushed buffer%d with %u records", thread_idx, core_id, buffer_id, count); - flushed_count++; - } else { - LOG_WARN("Thread %d: Core %d failed to enqueue buffer%d (queue full)", thread_idx, core_id, buffer_id); - } - } - } - - LOG_INFO("Thread %d: Performance buffer flush complete, %d buffers flushed", thread_idx, flushed_count); -} - // ===== Public Entry Point ===== /** diff --git a/src/runtime/host_build_graph/runtime/runtime.cpp b/src/runtime/host_build_graph/runtime/runtime.cpp index 1324acbc..21c7c581 100644 --- a/src/runtime/host_build_graph/runtime/runtime.cpp +++ b/src/runtime/host_build_graph/runtime/runtime.cpp @@ -215,3 +215,28 @@ int Runtime::get_tensor_pair_count() const { void Runtime::clear_tensor_pairs() { tensor_pair_count = 0; } + +// ============================================================================= +// Performance Profiling +// ============================================================================= + +void Runtime::complete_perf_records(PerfBuffer* perf_buf) { + uint32_t count = perf_buf->count; + + for (uint32_t i = 0; i < count; i++) { + PerfRecord* record = &perf_buf->records[i]; + uint32_t task_id = record->task_id; + + // Query Task by task_id (O(1) array indexing) + Task* task = get_task(task_id); + if (task != nullptr) { + record->fanout_count = task->fanout_count; + + for (int32_t j = 0; j < task->fanout_count; j++) { + record->fanout[j] = task->fanout[j]; + } + } else { + record->fanout_count = 0; + } + } +} diff --git a/src/runtime/host_build_graph/runtime/runtime.h b/src/runtime/host_build_graph/runtime/runtime.h index f24e67c7..08b85a06 100644 --- a/src/runtime/host_build_graph/runtime/runtime.h +++ b/src/runtime/host_build_graph/runtime/runtime.h @@ -26,6 +26,7 @@ #include #include "common/core_type.h" +#include "common/perf_profiling.h" #include "common/platform_config.h" // Logging macros using unified logging interface @@ -314,6 +315,20 @@ class Runtime { */ void clear_tensor_pairs(); + // ========================================================================= + // Performance Profiling + // ========================================================================= + + /** + * Fill fanout information for performance records + * + * Extracts task dependency data from the task graph and populates + * fanout arrays in performance records. + * + * @param perf_buf Performance buffer containing records to complete + */ + void complete_perf_records(PerfBuffer* perf_buf); + // ========================================================================= // Device Orchestration (stub for API compatibility) // ========================================================================= diff --git a/src/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp b/src/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp index 91f0e393..5f03bb9f 100644 --- a/src/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp +++ b/src/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp @@ -2,7 +2,7 @@ #include "runtime.h" #include "pto2_dispatch_payload.h" #include "common/perf_profiling.h" -#include "common/memory_barrier.h" +#include "aicore/performance_collector_aicore.h" /** * Unified function pointer type for kernel dispatch @@ -12,73 +12,6 @@ */ typedef void (*UnifiedKernelFunc)(__gm__ int64_t*); -/** - * @brief Record task execution performance data - * - * This function records the performance metrics of a task execution to the profiling buffer. - * It writes the task timing data, metadata, and marks the buffer as full when needed. - * - * Note: Fanout information is filled by AICPU after task completion (not recorded here). - * - * @param my_hank Pointer to the handshake structure containing perf buffer info - * @param payload Pointer to the PTO2DispatchPayload structure - * @param start_time Task start timestamp - * @param end_time Task end timestamp - * @param block_idx AICore block index - * @param core_type Core type (AIC or AIV) - * @param kernel_ready_time Kernel ready timestamp (when AICore entered main loop) - */ -__aicore__ __attribute__((always_inline)) static void record_task_performance( - __gm__ Handshake* my_hank, - __gm__ PTO2DispatchPayload* payload, - uint64_t start_time, - uint64_t end_time, - int block_idx, - CoreType core_type, - uint64_t kernel_ready_time) { - - // Check if buffer is available for writing - if (my_hank->perf_buffer_status != 0) { - return; // Buffer full, skip recording - } - - // Get current performance buffer pointer - __gm__ PerfBuffer* perf_buf = (__gm__ PerfBuffer*)my_hank->perf_records_addr; - - // Get current count (no atomic operation needed - single writer) - rmb(); - uint32_t idx = perf_buf->count; - - // Check if buffer has space - if (idx < PLATFORM_PROF_BUFFER_SIZE) { - // Get pointer to the record slot - __gm__ PerfRecord* record = (__gm__ PerfRecord*)&perf_buf->records[idx]; - - // Write record data (only essential fields, fanout filled by AICPU) - record->start_time = start_time; - record->end_time = end_time; - record->kernel_ready_time = kernel_ready_time; - record->task_id = payload->task_id; // Use payload->task_id - record->func_id = payload->kernel_id; // Use payload->kernel_id - record->core_id = block_idx; - record->core_type = core_type; - - // Increment count after writing record - perf_buf->count = idx + 1; - - // Write memory barrier: ensure performance data is visible to Host - wmb(); - - // Check if buffer is full after this write - if (perf_buf->count >= PLATFORM_PROF_BUFFER_SIZE) { - my_hank->perf_buffer_status = 1; // Notify AICPU: buffer full - } - } else { - // Buffer is already full - my_hank->perf_buffer_status = 1; - } -} - /** * Execute task from PTO2DispatchPayload. * @@ -95,6 +28,9 @@ __aicore__ __attribute__((always_inline)) static void execute_task(__gm__ void* UnifiedKernelFunc kernel = (UnifiedKernelFunc)payload->function_bin_addr; kernel(reinterpret_cast<__gm__ int64_t*>(payload->args)); + + // Ensure all memory writes are visible to other cores + pipe_barrier(PIPE_ALL); } /** @@ -133,7 +69,7 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in // Used for: 1) Startup overhead analysis, 2) Cross-core time alignment uint64_t kernel_ready_time = 0; if (profiling_enabled) { - kernel_ready_time = get_sys_cnt(); + kernel_ready_time = get_sys_cnt_aicore(); } // Phase 3: Main execution loop - poll for tasks until quit signal @@ -153,7 +89,7 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in // Performance profiling: record start time uint64_t start_time = 0; if (profiling_enabled) { - start_time = get_sys_cnt(); + start_time = get_sys_cnt_aicore(); } // Execute the task @@ -161,9 +97,11 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in // Performance profiling: record task execution if (profiling_enabled) { - uint64_t end_time = get_sys_cnt(); - record_task_performance(my_hank, payload, start_time, end_time, - block_idx, core_type, kernel_ready_time); + uint64_t end_time = get_sys_cnt_aicore(); + __gm__ PerfBuffer* perf_buf = (__gm__ PerfBuffer*)my_hank->perf_records_addr; + perf_aicore_record_task(perf_buf, payload->task_id, payload->kernel_id, + start_time, end_time, kernel_ready_time, + block_idx, core_type); } // Mark task as complete (task_status: 0=idle, 1=busy) diff --git a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 553ee9d3..760214d9 100644 --- a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -26,6 +26,7 @@ #include "pto_runtime2_types.h" // Performance profiling headers +#include "aicpu/performance_collector_aicpu.h" #include "common/perf_profiling.h" #include "common/memory_barrier.h" #include "common/unified_log.h" @@ -115,7 +116,6 @@ struct AicpuExecutor { std::atomic pto2_init_done_{false}; std::atomic pto2_init_complete_{false}; // init block finished; others wait for this std::atomic next_scan_index_{0}; - std::atomic perf_init_done_{false}; std::atomic sm_header_ready_{false}; // Thread 3 sets after SM header init // Orchestrator ready queue pointers (set by Thread 3, read by scheduler threads) @@ -129,8 +129,8 @@ struct AicpuExecutor { char orch_so_path_[256]{}; // Path to orchestration SO file for cleanup // ===== Performance profiling state ===== - std::mutex perf_ready_queue_mutex_; // Protects enqueue_ready_buffer operations uint64_t dispatch_timestamps_[RUNTIME_MAX_WORKER]; // Per-core AICPU dispatch timestamp + uint32_t core_dispatch_counts_[RUNTIME_MAX_WORKER]; // Per-core total dispatched task counter (for buffer management) // ===== Methods ===== int init(Runtime* runtime); @@ -142,23 +142,6 @@ struct AicpuExecutor { void deinit(); void diagnose_stuck_state(Runtime* runtime, int thread_idx, const int* cur_thread_cores, int core_num, Handshake* hank); - - // Performance profiling methods - void init_performance_profiling(Runtime* runtime); - void complete_perf_records(Runtime* runtime, PerfBuffer* perf_buf, - PTO2TaskDescriptor* task_descriptors, - PTO2DepListEntry* dep_list_pool, - int32_t window_mask); - void switch_perf_buffer(Runtime* runtime, int core_id, int thread_idx, - PTO2TaskDescriptor* task_descriptors, - PTO2DepListEntry* dep_list_pool, - int32_t window_mask); - int enqueue_ready_buffer(PerfDataHeader* header, uint32_t core_index, uint32_t buffer_id); - void flush_performance_buffers(Runtime* runtime, int thread_idx, - const int* cur_thread_cores, int core_num, - PTO2TaskDescriptor* task_descriptors, - PTO2DepListEntry* dep_list_pool, - int32_t window_mask); }; static AicpuExecutor g_aicpu_executor; @@ -324,9 +307,10 @@ int AicpuExecutor::init(Runtime* runtime) { ready_queue_aiv_head_ = 0; ready_queue_aiv_tail_ = 0; - // Reset per-core dispatch timestamps + // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; + core_dispatch_counts_[i] = 0; } DEV_INFO("Init: PTO2 mode, task count from shared memory"); @@ -431,7 +415,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, // Assign perf buffers to cores early so profiling captures all tasks // (total_tasks written to header later when orchestrator completes) if (runtime->enable_profiling) { - init_performance_profiling(runtime); + perf_aicpu_init_profiling(runtime); } DEV_INFO("Thread %d: one-time init done", thread_idx); @@ -449,6 +433,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, const int MAX_IDLE_ITERATIONS = 50000000; const int WARN_INTERVAL = 1000000; bool profiling_enabled = runtime->enable_profiling; + int32_t last_reported_task_count = 0; // Scheduler profiling counters #if PTO2_ORCH_PROFILING @@ -485,22 +470,20 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, bool made_progress = false; - // Update perf header total_tasks after orchestrator sets the final count - if (profiling_enabled && orch_done && !perf_init_done_.load(std::memory_order_acquire)) { - bool expected = false; - if (perf_init_done_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base) { - PerfDataHeader* perf_hdr = get_perf_header(perf_base); - perf_hdr->total_tasks = static_cast(task_count); - wmb(); - } - } - } - // Incremental scan: discover root tasks (fanin_count == 0) { int32_t visible = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); + + // Update perf header total_tasks if visible tasks have changed + if (profiling_enabled && visible > 0 && visible != last_reported_task_count) { + perf_aicpu_update_total_tasks(runtime, static_cast(visible)); + + DEV_INFO("Thread %d: Updated perf total_tasks to %d%s", + thread_idx, visible, orch_done ? " (final)" : ""); + + last_reported_task_count = visible; + } + while (true) { int32_t idx = next_scan_index_.load(std::memory_order_acquire); if (idx >= visible) break; @@ -584,19 +567,13 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, if (count > 0) { PerfRecord* record = &perf_buf->records[count - 1]; if (record->task_id == static_cast(payload->task_id)) { - record->dispatch_time = dispatch_timestamps_[core_id]; - record->finish_time = finish_ts; - wmb(); + perf_aicpu_record_dispatch_and_finish_time(record, + dispatch_timestamps_[core_id], + finish_ts); } } } - // Check and switch performance buffer if needed - if (profiling_enabled && h->perf_buffer_status == 1) { - switch_perf_buffer(runtime, core_id, thread_idx, - task_descriptors, dep_list_pool, window_mask); - } - int32_t task_id = payload->task_id; PTO2TaskDescriptor* pto2_task = &task_descriptors[task_id & window_mask]; @@ -670,13 +647,21 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; build_pto2_payload(payload, runtime, task, task_descriptors, dep_list_pool, window_size); h->task = reinterpret_cast(payload); - if (runtime->enable_profiling) { + // Performance profiling: check if buffer needs switching + if (profiling_enabled) { dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); + if (core_dispatch_counts_[core_id] >= PLATFORM_PROF_BUFFER_SIZE) { + perf_aicpu_switch_buffer(runtime, core_id, thread_idx); + core_dispatch_counts_[core_id] = 0; + } + core_dispatch_counts_[core_id]++; } h->task_status = 1; cur_thread_tasks_in_flight++; made_progress = true; DEV_DEBUG("Thread %d: Dispatching PTO2 task %d to core %d", thread_idx, task_id, core_id); + + } } } @@ -741,8 +726,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, // Flush performance buffers for cores managed by this thread if (profiling_enabled) { - flush_performance_buffers(runtime, thread_idx, cur_thread_cores, core_num, - task_descriptors, dep_list_pool, window_mask); + perf_aicpu_flush_buffers(runtime, thread_idx, cur_thread_cores, core_num); } return cur_thread_completed; @@ -1004,9 +988,10 @@ void AicpuExecutor::deinit() { ready_queue_aiv_head_ = 0; ready_queue_aiv_tail_ = 0; - // Reset per-core dispatch timestamps + // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; + core_dispatch_counts_[i] = 0; } completed_tasks_.store(0, std::memory_order_release); @@ -1016,7 +1001,6 @@ void AicpuExecutor::deinit() { pto2_init_done_.store(false, std::memory_order_release); pto2_init_complete_.store(false, std::memory_order_release); next_scan_index_.store(0, std::memory_order_release); - perf_init_done_.store(false, std::memory_order_release); sm_header_ready_.store(false, std::memory_order_release); // Reset core discovery state @@ -1090,365 +1074,6 @@ void AicpuExecutor::diagnose_stuck_state(Runtime* runtime, int thread_idx, DEV_ALWAYS("========== END DIAGNOSTIC =========="); } -// ============================================================================= -// Performance Profiling Methods -// ============================================================================= - -/** - * Initialize performance profiling for all cores - * - * Called once in resolve_and_dispatch_pto2() after one-time init. - * Assigns buffer1 to each core and sets initial states. - * Also writes total_tasks to PerfDataHeader for Host access. - */ -void AicpuExecutor::init_performance_profiling(Runtime* runtime) { - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base == nullptr) { - LOG_ERROR("perf_data_base is NULL, cannot initialize profiling"); - return; - } - - PerfDataHeader* header = get_perf_header(perf_base); - DoubleBuffer* buffers = get_double_buffers(perf_base); - - // Write total_tasks to shared memory header for Host access - // This is necessary because Runtime object is not copied back from Device to Host - int32_t task_count = total_tasks_.load(std::memory_order_acquire); - header->total_tasks = static_cast(task_count); - wmb(); // Ensure total_tasks is visible to Host - - LOG_INFO("Initializing performance profiling for %d cores, total_tasks=%d", runtime->worker_count, task_count); - - // Assign initial buffer (buffer1) to each AICore - for (int i = 0; i < runtime->worker_count; i++) { - Handshake* h = &runtime->workers[i]; - DoubleBuffer* db = &buffers[i]; - - // Read memory barrier before checking buffer1 status - rmb(); - if (db->buffer1_status != BufferStatus::IDLE) { - LOG_WARN("Core %d: buffer1 not idle (status=%u)", i, static_cast(db->buffer1_status)); - } - - // Assign buffer1 to AICore - h->perf_records_addr = (uint64_t)&db->buffer1; - h->perf_buffer_status = 0; // 0 = can write - - // Write barrier: ensure writes visible to AICore before changing status - wmb(); - db->buffer1_status = BufferStatus::WRITING; - - LOG_INFO("Core %d: assigned buffer1 (addr=0x%lx)", i, h->perf_records_addr); - } - - LOG_INFO("Performance profiling initialized for %d cores", runtime->worker_count); -} - -/** - * Complete performance records by filling fanout information - * - * This function is called by AICPU to fill in fanout information that - * was not recorded by AICore. Duration is NOT calculated here - it will - * be calculated by Host when printing/processing the data. - * - * Key difference from host_build_graph: fanout is stored as a linked list - * in PTO2 shared memory, not as an array in Task structure. - * - * Called in two places: - * 1. switch_perf_buffer() - when switching buffers during normal operation - * 2. flush_performance_buffers() - when flushing buffers during shutdown - * - * @param runtime Runtime instance (unused but kept for API consistency) - * @param perf_buf PerfBuffer to be completed with fanout data - * @param task_descriptors Pointer to task descriptor array in shared memory - * @param dep_list_pool Pointer to dependency list pool in shared memory - * @param window_mask Mask for computing task slot (window_size - 1) - */ -void AicpuExecutor::complete_perf_records(Runtime* runtime, PerfBuffer* perf_buf, - PTO2TaskDescriptor* task_descriptors, - PTO2DepListEntry* dep_list_pool, - int32_t window_mask) { - (void)runtime; // Unused parameter - uint32_t count = perf_buf->count; - - for (uint32_t i = 0; i < count; i++) { - PerfRecord* record = &perf_buf->records[i]; - int32_t task_id = record->task_id; - - // Get TaskDescriptor from PTO2 shared memory - int32_t slot = task_id & window_mask; - PTO2TaskDescriptor* task = &task_descriptors[slot]; - - // Fill fanout information by traversing the linked list - record->fanout_count = 0; - int32_t fanout_offset = task->fanout_head; - - while (fanout_offset != 0 && record->fanout_count < RUNTIME_MAX_FANOUT) { - PTO2DepListEntry* entry = &dep_list_pool[fanout_offset]; - record->fanout[record->fanout_count++] = entry->task_id; - fanout_offset = entry->next_offset; - } - } - - // Write memory barrier: ensure fanout data is visible to Host - wmb(); -} - -/** - * Switch performance buffer for a core - * - * Called when perf_buffer_status == 1 (buffer full). - * Determines which buffer is full by address comparison, - * then switches to the alternate buffer if available. - * - * @param runtime Runtime instance - * @param core_id AICore ID - * @param thread_idx AICPU thread ID (for logging) - * @param task_descriptors Pointer to task descriptor array in shared memory - * @param dep_list_pool Pointer to dependency list pool in shared memory - * @param window_mask Mask for computing task slot (window_size - 1) - */ -void AicpuExecutor::switch_perf_buffer(Runtime* runtime, int core_id, int thread_idx, - PTO2TaskDescriptor* task_descriptors, - PTO2DepListEntry* dep_list_pool, - int32_t window_mask) { - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base == nullptr) { - return; - } - - Handshake* h = &runtime->workers[core_id]; - PerfDataHeader* header = get_perf_header(perf_base); - DoubleBuffer* db = get_core_double_buffer(perf_base, core_id); - - // Determine if current buffer is buffer1 or buffer2 by address comparison - uint64_t current_addr = h->perf_records_addr; - uint64_t buffer1_addr = (uint64_t)&db->buffer1; - uint64_t buffer2_addr = (uint64_t)&db->buffer2; - - uint32_t full_buffer_id = 0; - PerfBuffer* full_buf = nullptr; - volatile BufferStatus* full_status_ptr = nullptr; - PerfBuffer* alternate_buf = nullptr; - volatile BufferStatus* alternate_status_ptr = nullptr; - uint32_t alternate_buffer_id = 0; - - if (current_addr == buffer1_addr) { - // Current buffer is buffer1, it's full - full_buffer_id = 1; - full_buf = &db->buffer1; - full_status_ptr = &db->buffer1_status; - alternate_buf = &db->buffer2; - alternate_status_ptr = &db->buffer2_status; - alternate_buffer_id = 2; - } else if (current_addr == buffer2_addr) { - // Current buffer is buffer2, it's full - full_buffer_id = 2; - full_buf = &db->buffer2; - full_status_ptr = &db->buffer2_status; - alternate_buf = &db->buffer1; - alternate_status_ptr = &db->buffer1_status; - alternate_buffer_id = 1; - } else { - LOG_ERROR("Thread %d: Core %d has invalid perf_records_addr=0x%lx", - thread_idx, core_id, current_addr); - return; - } - - LOG_INFO("Thread %d: Core %d buffer%u is full (count=%u)", - thread_idx, core_id, full_buffer_id, full_buf->count); - - // Complete performance records by filling fanout information - // (called before checking alternate buffer status to make data ready earlier) - complete_perf_records(runtime, full_buf, task_descriptors, dep_list_pool, window_mask); - - // Read alternate buffer status (rmb needed, since status is modified by Host) - rmb(); - - BufferStatus alternate_status = *alternate_status_ptr; - - // If alternate buffer is not idle, spin wait for Host to finish reading - if (alternate_status != BufferStatus::IDLE) { - LOG_WARN("Thread %d: Core %d cannot switch, buffer%u status=%u, spinning until Host reads it", - thread_idx, core_id, alternate_buffer_id, static_cast(alternate_status)); - - // Spin wait: continuously check alternate buffer status until Host sets it to IDLE - while (true) { - rmb(); // Read barrier: ensure reading latest status modified by Host - alternate_status = *alternate_status_ptr; - - if (alternate_status == BufferStatus::IDLE) { - LOG_INFO("Thread %d: Core %d buffer%u now idle, proceeding with switch", - thread_idx, core_id, alternate_buffer_id); - break; - } - } - } - - // Alternate buffer is idle, can switch - - // Step 1: Enqueue full buffer to ready queue - int enqueue_result = enqueue_ready_buffer(header, core_id, full_buffer_id); - if (enqueue_result != 0) { - LOG_WARN("Thread %d: Core %d failed to enqueue buffer%u (queue full)", - thread_idx, core_id, full_buffer_id); - return; - } - - // Step 2: Change full buffer status to READY (visible to Host) - *full_status_ptr = BufferStatus::READY; - // Step 3: Change alternate buffer status to WRITING (visible to Host) - *alternate_status_ptr = BufferStatus::WRITING; - wmb(); // Write barrier: ensure status changes visible to Host - - LOG_INFO("Thread %d: Core %d enqueued buffer%u", thread_idx, core_id, full_buffer_id); - - // Step 4: Switch perf_records_addr to alternate buffer (visible to AICore) - h->perf_records_addr = (uint64_t)alternate_buf; - - // Step 5: Reset perf_buffer_status = 0 (notify AICore can continue writing) - h->perf_buffer_status = 0; - - LOG_INFO("Thread %d: Core %d switched to buffer%u (status=0)", - thread_idx, core_id, alternate_buffer_id); -} - -/** - * Enqueue a ready buffer to the queue - * - * Thread-safe: Uses mutex to protect queue operations since multiple - * AICPU threads may enqueue concurrently. - * - * @return 0=success, -1=queue full - */ -int AicpuExecutor::enqueue_ready_buffer(PerfDataHeader* header, uint32_t core_index, uint32_t buffer_id) { - std::lock_guard lock(perf_ready_queue_mutex_); - - uint32_t capacity = PLATFORM_PROF_READYQUEUE_SIZE; - - // Read barrier: ensure reading latest tail value - rmb(); - - uint32_t current_tail = header->queue_tail; - uint32_t current_head = header->queue_head; - - // Check if queue is full - uint32_t next_tail = (current_tail + 1) % capacity; - if (next_tail == current_head) { - return -1; // Queue full - } - - // Enqueue entry - header->queue[current_tail].core_index = core_index; - header->queue[current_tail].buffer_id = buffer_id; - header->queue_tail = next_tail; - - // Write memory barrier: ensure data written before updating tail, visible to Host - wmb(); - - return 0; -} - -/** - * Flush performance buffers for cores managed by this thread - * - * Called after shutdown_aicore to ensure all buffers with data - * (even if not full) are enqueued for Host collection. - * - * For each core managed by this thread: - * - Check which buffer is currently assigned (via perf_records_addr) - * - If buffer has data (count > 0), mark it as READY and enqueue - * - * @param runtime Runtime instance - * @param thread_idx Current thread index - * @param cur_thread_cores Array of core IDs managed by this thread - * @param core_num Number of cores managed by this thread - * @param task_descriptors Pointer to task descriptor array in shared memory - * @param dep_list_pool Pointer to dependency list pool in shared memory - * @param window_mask Mask for computing task slot (window_size - 1) - */ -void AicpuExecutor::flush_performance_buffers(Runtime* runtime, int thread_idx, - const int* cur_thread_cores, int core_num, - PTO2TaskDescriptor* task_descriptors, - PTO2DepListEntry* dep_list_pool, - int32_t window_mask) { - if (!runtime->enable_profiling) { - return; - } - - void* perf_base = (void*)runtime->perf_data_base; - if (perf_base == nullptr) { - return; - } - - PerfDataHeader* header = get_perf_header(perf_base); - DoubleBuffer* buffers = get_double_buffers(perf_base); - - LOG_INFO("Thread %d: Flushing performance buffers for %d cores", thread_idx, core_num); - - int flushed_count = 0; - - // Only process cores managed by this thread - for (int i = 0; i < core_num; i++) { - int core_id = cur_thread_cores[i]; - Handshake* h = &runtime->workers[core_id]; - DoubleBuffer* db = &buffers[core_id]; - - // Read current buffer address - uint64_t current_addr = h->perf_records_addr; - if (current_addr == 0) { - continue; // No buffer assigned - } - - // Determine which buffer is current - uint64_t buf1_addr = (uint64_t)&db->buffer1; - uint64_t buf2_addr = (uint64_t)&db->buffer2; - - PerfBuffer* current_buf = nullptr; - volatile BufferStatus* current_status = nullptr; - uint32_t buffer_id = 0; - - if (current_addr == buf1_addr) { - current_buf = &db->buffer1; - current_status = &db->buffer1_status; - buffer_id = 1; - } else if (current_addr == buf2_addr) { - current_buf = &db->buffer2; - current_status = &db->buffer2_status; - buffer_id = 2; - } else { - LOG_WARN("Thread %d: Core %d perf_records_addr=0x%lx doesn't match buffer1=0x%lx or buffer2=0x%lx", - thread_idx, core_id, current_addr, buf1_addr, buf2_addr); - continue; - } - - // Read buffer count with memory barrier - rmb(); - uint32_t count = current_buf->count; - - // If buffer has data, enqueue it - if (count > 0) { - // Complete performance records by filling fanout information before flush - complete_perf_records(runtime, current_buf, task_descriptors, dep_list_pool, window_mask); - - // Mark buffer as READY - *current_status = BufferStatus::READY; - wmb(); - - // Enqueue to ready queue - int rc = enqueue_ready_buffer(header, core_id, buffer_id); - if (rc == 0) { - LOG_INFO("Thread %d: Core %d flushed buffer%d with %u records", thread_idx, core_id, buffer_id, count); - flushed_count++; - } else { - LOG_WARN("Thread %d: Core %d failed to enqueue buffer%d (queue full)", thread_idx, core_id, buffer_id); - } - } - } - - LOG_INFO("Thread %d: Performance buffer flush complete, %d buffers flushed", thread_idx, flushed_count); -} - // ===== Public Entry Point ===== /** diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp index f1d8be3d..a3b7c5bf 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp @@ -6,6 +6,7 @@ */ #include "runtime.h" +#include "pto_shared_memory.h" // ============================================================================= // Constructor @@ -127,3 +128,45 @@ void Runtime::set_function_bin_addr(int func_id, uint64_t addr) { func_id_to_addr_[func_id] = addr; } } + +// ============================================================================= +// Performance Profiling +// ============================================================================= + +void Runtime::complete_perf_records(PerfBuffer* perf_buf) { + // Get PTO2 shared memory context + void* sm_base = get_pto2_gm_sm_ptr(); + if (sm_base == nullptr) { + // No PTO2 context, cannot complete records + return; + } + + // Get PTO2 data structures + PTO2SharedMemoryHeader* header = static_cast(sm_base); + PTO2TaskDescriptor* task_descriptors = reinterpret_cast( + static_cast(sm_base) + header->task_descriptors_offset); + PTO2DepListEntry* dep_list_pool = reinterpret_cast( + static_cast(sm_base) + header->dep_list_pool_offset); + int32_t window_mask = header->task_window_size - 1; + + uint32_t count = perf_buf->count; + + for (uint32_t i = 0; i < count; i++) { + PerfRecord* record = &perf_buf->records[i]; + int32_t task_id = record->task_id; + + // Get TaskDescriptor from PTO2 shared memory + int32_t slot = task_id & window_mask; + PTO2TaskDescriptor* task = &task_descriptors[slot]; + + // Fill fanout information by traversing the linked list + record->fanout_count = 0; + int32_t fanout_offset = task->fanout_head; + + while (fanout_offset != 0 && record->fanout_count < RUNTIME_MAX_FANOUT) { + PTO2DepListEntry* entry = &dep_list_pool[fanout_offset]; + record->fanout[record->fanout_count++] = entry->task_id; + fanout_offset = entry->next_offset; + } + } +} diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/runtime.h b/src/runtime/tensormap_and_ringbuffer/runtime/runtime.h index efb77b92..7c1d0a67 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/runtime.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/runtime.h @@ -22,6 +22,7 @@ #include // for memset #include "common/core_type.h" +#include "common/perf_profiling.h" #include "pto2_dispatch_payload.h" // ============================================================================= @@ -193,6 +194,20 @@ class Runtime { */ void clear_tensor_pairs(); + // ========================================================================= + // Performance Profiling + // ========================================================================= + + /** + * Fill fanout information for performance records + * + * Extracts task dependency data from the task graph and populates + * fanout arrays in performance records. + * + * @param perf_buf Performance buffer containing records to complete + */ + void complete_perf_records(PerfBuffer* perf_buf); + // ========================================================================= // Device orchestration (for AICPU thread 3) // ========================================================================= diff --git a/tools/swimlane_converter.py b/tools/swimlane_converter.py index 29a85376..906321d8 100644 --- a/tools/swimlane_converter.py +++ b/tools/swimlane_converter.py @@ -111,26 +111,67 @@ def print_task_statistics(tasks, func_id_to_name=None): """ from collections import defaultdict - # Group tasks by func_id - func_stats = defaultdict(list) + # Group tasks by func_id with extended metrics + func_stats = defaultdict(lambda: { + 'durations': [], + 'head_overheads': [], + 'tail_overheads': [], + 'schedule_times': [], + 'total_exec_time': 0.0, + 'total_schedule_time': 0.0 + }) + + # Track global min dispatch and max finish times + min_dispatch_time = float('inf') + max_finish_time = float('-inf') + for task in tasks: func_id = task['func_id'] duration = task['duration_us'] - func_stats[func_id].append(duration) + func_stats[func_id]['durations'].append(duration) + + # Calculate new metrics if dispatch_time_us and finish_time_us are available + if 'dispatch_time_us' in task and 'finish_time_us' in task: + dispatch_time = task['dispatch_time_us'] + finish_time = task['finish_time_us'] + start_time = task['start_time_us'] + end_time = task['end_time_us'] + + # Head overhead: start_time_us - dispatch_time_us + head_overhead = start_time - dispatch_time + func_stats[func_id]['head_overheads'].append(head_overhead) + + # Tail overhead: finish_time_us - end_time_us + tail_overhead = finish_time - end_time + func_stats[func_id]['tail_overheads'].append(tail_overhead) + + # Schedule time: finish_time_us - dispatch_time_us + schedule_time = finish_time - dispatch_time + func_stats[func_id]['schedule_times'].append(schedule_time) + + # Accumulate execution time and schedule time for ratio calculation + func_stats[func_id]['total_exec_time'] += duration + func_stats[func_id]['total_schedule_time'] += schedule_time + + # Track global times + min_dispatch_time = min(min_dispatch_time, dispatch_time) + max_finish_time = max(max_finish_time, finish_time) # Print statistics - print("\n" + "=" * 104) + print("\n" + "=" * 160) print("Task Statistics by Function") - print("=" * 104) - print(f"{'Func ID':<8} {'Func Name':<20} {'Count':>8} {'Total (us)':>14} {'Avg (us)':>12} {'Min (us)':>12} {'Max (us)':>12}") - print("-" * 104) + print("=" * 160) + print(f"{'Func_ID':<8} {'Func_Name':<12} {'Count':^6} {'Total_Exec/Sched(us)':^25} {'Avg_Exec/Sched(us)':^23} " + f"{'Min_Exec/Sched(us)':^23} {'Max_Exec/Sched(us)':^23} {'Avg_Head/Tail_OH(us)':^23} {'Exec_%':^8}") + print("-" * 160) # Sort by func_id for consistent output total_count = 0 total_duration = 0.0 for func_id in sorted(func_stats.keys()): - durations = func_stats[func_id] + stats = func_stats[func_id] + durations = stats['durations'] count = len(durations) sum_duration = sum(durations) avg_duration = sum_duration / count @@ -147,12 +188,42 @@ def print_task_statistics(tasks, func_id_to_name=None): else: func_name = f"Func_{func_id}" - print(f"{func_id:<8} {func_name:<20} {count:>8} {sum_duration:>14.2f} {avg_duration:>12.2f} {min_duration:>12.2f} {max_duration:>12.2f}") + # Calculate averages for new metrics + avg_head_overhead = sum(stats['head_overheads']) / len(stats['head_overheads']) if stats['head_overheads'] else 0 + avg_tail_overhead = sum(stats['tail_overheads']) / len(stats['tail_overheads']) if stats['tail_overheads'] else 0 + avg_schedule_time = stats['total_schedule_time'] / count if count > 0 else 0 + min_schedule_time = min(stats['schedule_times']) if stats['schedule_times'] else 0 + max_schedule_time = max(stats['schedule_times']) if stats['schedule_times'] else 0 + total_schedule_time = stats['total_schedule_time'] + + # Calculate execution ratio: total_exec_time / total_schedule_time + exec_ratio = (stats['total_exec_time'] / stats['total_schedule_time'] * 100) if stats['total_schedule_time'] > 0 else 0 + + # Format combined exec/sched values + total_combined = f"{sum_duration:.2f}/{total_schedule_time:.2f}" + avg_combined = f"{avg_duration:.2f}/{avg_schedule_time:.2f}" + min_combined = f"{min_duration:.2f}/{min_schedule_time:.2f}" + max_combined = f"{max_duration:.2f}/{max_schedule_time:.2f}" + overhead_combined = f"{avg_head_overhead:.2f}/{avg_tail_overhead:.2f}" + + print(f"{func_id:<8} {func_name:<12} {count:^6} {total_combined:^25} {avg_combined:^23} " + f"{min_combined:^23} {max_combined:^23} {overhead_combined:^23} {exec_ratio:^7.2f}%") # Print total row - print("-" * 104) - print(f"{'TOTAL':<29} {total_count:>8} {total_duration:>14.2f}") - print("=" * 104) + print("-" * 160) + + # Calculate total schedule time (sum of all schedule times) + total_schedule_sum = sum(stats['total_schedule_time'] for stats in func_stats.values()) + total_combined = f"{total_duration:.2f}/{total_schedule_sum:.2f}" + print(f"{'TOTAL':<21} {total_count:^6} {total_combined:^25}") + + # Print total test execution time + if min_dispatch_time != float('inf') and max_finish_time != float('-inf'): + total_test_time = max_finish_time - min_dispatch_time + print(f"\nTotal Test Time: {total_test_time:.2f} us (from earliest dispatch to latest finish)") + + print("=" * 160) +