Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
502 changes: 502 additions & 0 deletions spike/ASYNC_API.md

Large diffs are not rendered by default.

84 changes: 84 additions & 0 deletions spike/src/include/nrt_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,89 @@
#include <cstdint>
#include <string>

extern "C" {

// Underlying tensor and model declaration copied from NRT
// This is a temporary hack for implementing nonblocking/async operations,
// as I will need some underlying info that is not exposed
// Will not need these after explicit async is ready and stable from NRT

#define DX_CACHE_ALIGNED __attribute__((aligned(64)))

typedef enum nrt_tensor_mem_type {
NRT_TENSOR_MEM_TYPE_INVALID = 0,
NRT_TENSOR_MEM_TYPE_MALLOC,
NRT_TENSOR_MEM_TYPE_DMA,
NRT_TENSOR_MEM_TYPE_FAKE,
} nrt_tensor_mem_type_t;

// Memory, host or device that is used by
// a tensor. The memory is ref counted and can be shared among
// multiple tensors.
typedef struct nrt_tensor_storage {
uint32_t hbm_idx;
size_t allocated_size;
nrt_tensor_mem_type_t type;
union {
void *dmem; // dmem associated with addr, for tensor type
// NRT_TENSOR_MEM_TYPE_DMA
uint8_t
*vmem; // malloc'ed memory for tensor type NRT_TENSOR_MEM_TYPE_MALLOC
};
volatile uint64_t ref_count DX_CACHE_ALIGNED;
bool mem_owned_by_tensor;

pthread_mutex_t tensor_op_cv_lock; // Lock for async exec. Used with
// `tensor_op_cv` to block the thread while
// there are still pending execs. If this
// is NULL we are not in async exec mode.
pthread_cond_t tensor_op_cv; // used to block tensor op vars
volatile uint64_t pending_exec_count_read
DX_CACHE_ALIGNED; // count of pending execs that reads this location
volatile uint64_t pending_exec_count_write
DX_CACHE_ALIGNED; // count of pending execs that writes to this location
int32_t vtpb_idx; // same as vcore->vtpb_idx but -1 if no vcore for tensor
// (used for trace api)
} nrt_tensor_storage_t;

typedef struct nrt_tensor {
char *name; // optional name
nrt_tensor_storage_t *sto; // the actual memory represented by the tensor
// don't access directly, use helper functions to ensure correctness
// params below allow a tensor to represent a slice of the memory
// pointed by "sto"
size_t _offset; // offset within the storage
size_t _size; // tensor size
void *extra; // used to store any metadata needed by the runtime

volatile uint64_t ref_count
DX_CACHE_ALIGNED; // refcount for tensor. Only when this is 0 can we free
// the tensor it is incremented by
// `tensor_get_reference` and decremented by
// `tensor_free`. Tensor will automatically be freed in
// `tensor_free` once ref_count is zero.
volatile uint64_t output_completion_count
DX_CACHE_ALIGNED; // used to track the completion count of an output
// tensor. 0 means not complete; 1 and above means the
// number of completions
} nrt_tensor_t;

typedef struct H_NN {
uint32_t id;
} H_NN;

struct nrt_model {
uint32_t start_vnc; // VirtualNeuronCore start index
uint32_t vnc_count; // number of VirtualNeuronCore(s) requested
uint32_t instance_index; // instance index which will execute on the next call
// to nrt_execute
uint32_t instance_count; // number of loaded instances
uint32_t gid; // global id, for debug
char name[256];
H_NN h_nn[]; // kmgr model id (instance_count entries)
};
}

namespace spike {

// RAII wrapper for NRT runtime
Expand All @@ -26,6 +109,7 @@ class NrtRuntime {
NrtRuntime &operator=(NrtRuntime &&) = default;

static uint32_t get_visible_nc_count();
static uint32_t get_total_nc_count();

private:
bool initialized_;
Expand Down
265 changes: 264 additions & 1 deletion spike/src/include/spike.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,26 @@
#include "model.h"
#include "nrt_wrapper.h"
#include "tensor.h"
#include "tensor_set.h"

#include <nanobind/nanobind.h>
#include <nanobind/ndarray.h>

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <shared_mutex>
#include <thread>
#include <unordered_map>
#include <variant>
#include <vector>

namespace nb = nanobind;

namespace spike {

// Tensor metadata structure
Expand All @@ -24,6 +39,183 @@ struct ModelTensorInfo {
std::unordered_map<std::string, TensorMetadata> outputs;
};

// NonBlock command structures
struct NonBlockCloseCmd {};

struct NonBlockTensorReadCmd {
uint64_t id;
std::shared_ptr<const NrtTensor> tensor;
size_t offset;
size_t size;
void *data;
std::variant<nb::bytes, nb::ndarray<>> data_obj;
};

struct NonBlockTensorWriteCmd {
uint64_t id;
std::shared_ptr<NrtTensor> tensor;
const void *data;
size_t size;
std::variant<nb::bytes, nb::ndarray<>> data_obj;
size_t offset;
};

struct NonBlockTensorWriteBatchedCmd {
uint64_t id;
uint64_t batch_id;
};

struct NonBlockTensorReadBatchedCmd {
uint64_t id;
uint64_t batch_id;
};

typedef std::variant<NonBlockTensorReadCmd, NonBlockTensorWriteCmd,
NonBlockTensorWriteBatchedCmd,
NonBlockTensorReadBatchedCmd, NonBlockCloseCmd>
NonBlockTensorCmd;

struct NonBlockExecCmd {
uint64_t id;
std::shared_ptr<NrtModel> model;
std::shared_ptr<const NrtTensorSet> input_set;
std::shared_ptr<NrtTensorSet> output_set;
std::optional<std::string> ntff_name;
bool save_trace;
};

typedef std::variant<NonBlockExecCmd, NonBlockCloseCmd>
NonBlockExecOrCloseCmd;

// Thread-safe queue template (blocking and non-blocking versions)
template <typename T, bool is_blocking> class LockedQueue {
public:
LockedQueue() {
if constexpr (!is_blocking) {
size_ = 0;
}
}

void push(const T &value) {
std::unique_lock lk(mtx_);
q_.push(value);
lk.unlock();
if constexpr (is_blocking) {
cv_.notify_one();
} else {
size_.fetch_add(1, std::memory_order_release);
}
}

void push(T &&value) {
std::unique_lock lk(mtx_);
q_.push(std::move(value));
lk.unlock();
if constexpr (is_blocking) {
cv_.notify_one();
} else {
size_.fetch_add(1, std::memory_order_release);
}
}

template <typename... Args> void emplace(Args &&...args) {
std::unique_lock lk(mtx_);
q_.emplace(std::forward<Args>(args)...);
lk.unlock();
if constexpr (is_blocking) {
cv_.notify_one();
} else {
size_.fetch_add(1, std::memory_order_release);
}
}

template <typename T_ = T> std::enable_if_t<is_blocking, T_> pop() {
std::unique_lock lk(mtx_);
cv_.wait(lk, [this]() { return !q_.empty(); });
T ret(std::move(q_.front()));
q_.pop();
lk.unlock();
return ret;
}

// This function is only safe with one consumer
template <typename T_ = T>
std::enable_if_t<!is_blocking, std::optional<T_>> try_pop() {
if (size_.load(std::memory_order_acquire) >= 1) {
size_.fetch_sub(1, std::memory_order_release);
std::lock_guard lk(mtx_);
T ret(std::move(q_.front()));
q_.pop();
return ret;
} else {
return std::nullopt;
}
}

private:
struct Empty {};

std::queue<T> q_;
std::mutex mtx_;
std::conditional_t<is_blocking, std::condition_variable, Empty> cv_;
std::conditional_t<is_blocking, Empty, std::atomic_int64_t> size_;
};

// NonBlock result structures (exposed to Python)
struct NonBlockTensorReadResult {
uint64_t id;
std::variant<nb::bytes, nb::ndarray<>> data;
std::optional<std::variant<SpikeError, NrtError>> err;
};

struct NonBlockTensorWriteResult {
uint64_t id;
std::optional<std::variant<SpikeError, NrtError>> err;
};

struct NonBlockExecResult {
uint64_t id;
std::optional<std::variant<SpikeError, NrtError>> err;
};

typedef std::variant<NonBlockTensorReadResult, NonBlockTensorWriteResult,
NonBlockExecResult>
NonBlockResult;

// NonBlock internal result structures (used in worker threads to avoid GIL)
// These hold shared_ptrs to keep resources alive until convert_internal_result()
// transfers nanobind objects to Result types. This two-phase pattern ensures
// nanobind/Python object destructors run in GIL context, not in worker threads.
struct NonBlockTensorReadInternalResult {
uint64_t id;
std::variant<nb::bytes, nb::ndarray<>> data_obj;
std::optional<std::variant<SpikeError, NrtError>> err;

std::shared_ptr<const NrtTensor> tensor;
};

struct NonBlockTensorWriteInternalResult {
uint64_t id;
std::optional<std::variant<SpikeError, NrtError>> err;

std::shared_ptr<NrtTensor> tensor;
std::variant<nb::bytes, nb::ndarray<>> data_obj;
};

struct NonBlockExecInternalResult {
uint64_t id;
std::optional<std::variant<SpikeError, NrtError>> err;

std::shared_ptr<const NrtModel> model;
std::shared_ptr<const NrtTensorSet> input_set;
std::shared_ptr<NrtTensorSet> output_set;
};

typedef std::variant<NonBlockTensorReadInternalResult,
NonBlockTensorWriteInternalResult,
NonBlockExecInternalResult>
NonBlockInternalResult;

// Main Spike class - Python interface
class Spike {
public:
Expand Down Expand Up @@ -72,17 +264,88 @@ class Spike {
std::optional<std::string> ntff_name = std::nullopt,
bool save_trace = false);

// Nonblocking operations
void init_nonblock();

uint64_t tensor_write_nonblock(std::shared_ptr<NrtTensor> tensor,
nb::bytes data_obj, size_t offset = 0);
uint64_t tensor_write_nonblock(std::shared_ptr<NrtTensor> tensor,
nb::ndarray<> data_obj,
size_t offset = 0);
uint64_t tensor_write_nonblock(std::shared_ptr<NrtTensor> tensor,
const void *data, size_t size,
size_t offset);

uint64_t tensor_read_nonblock(std::shared_ptr<const NrtTensor> tensor,
size_t offset = 0, size_t size = 0);
uint64_t tensor_read_nonblock(std::shared_ptr<const NrtTensor> tensor,
nb::ndarray<> dest, size_t offset = 0,
size_t size = 0);

uint64_t tensor_write_nonblock_batched_prepare(
std::vector<std::shared_ptr<NrtTensor>> tensors,
std::vector<nb::ndarray<>> data_objs,
std::optional<std::vector<size_t>> offsets);
uint64_t tensor_write_nonblock_batched_start(uint64_t batch_id);

uint64_t tensor_read_nonblock_batched_prepare(
std::vector<std::shared_ptr<const NrtTensor>> tensors,
std::vector<nb::ndarray<>> dests,
std::optional<std::vector<size_t>> offsets,
std::optional<std::vector<size_t>> sizes);
uint64_t tensor_read_nonblock_batched_start(uint64_t batch_id);

uint64_t
execute_nonblock(std::shared_ptr<NrtModel> model,
std::shared_ptr<const NrtTensorSet> input_set,
std::shared_ptr<NrtTensorSet> output_set,
std::optional<std::string> ntff_name = std::nullopt,
bool save_trace = false);

std::optional<NonBlockResult> try_poll();

NrtTensorSet create_tensor_set(
const std::unordered_map<std::string, std::shared_ptr<const NrtTensor>>
&tensor_map);

// Wrap existing NRT objects (for interop with external code)
NrtModel wrap_model(nrt_model_t *ptr);
NrtTensor wrap_tensor(nrt_tensor_t *ptr);
NrtTensorSet wrap_tensor_set(nrt_tensor_set_t *ptr);

// Model introspection
ModelTensorInfo get_tensor_info(NrtModel &model);

private:
int verbose_level_;
std::unique_ptr<NrtRuntime> runtime_;

// Nonblock support
uint64_t next_non_block_id_ = 0;
uint64_t next_batch_id_ = 0;

std::vector<std::thread> tensor_threads_;
std::vector<std::thread> exec_threads_;
std::vector<LockedQueue<NonBlockTensorCmd, true>> tensor_queues_;
std::vector<LockedQueue<NonBlockExecOrCloseCmd, true>> exec_queues_;
LockedQueue<NonBlockInternalResult, false> noti_queue_;

std::unordered_map<uint64_t, std::vector<NonBlockTensorWriteCmd>>
tensor_write_batched_cmds_;
std::shared_mutex tensor_write_batched_cmds_mtx_;

std::unordered_map<uint64_t, std::vector<NonBlockTensorReadCmd>>
tensor_read_batched_cmds_;
std::shared_mutex tensor_read_batched_cmds_mtx_;

void loop_execute(int core_id);
void loop_tensor(int core_id);

// Helper methods
NrtTensorSet create_tensor_sets(
NrtTensorSet create_tensor_set(
const std::unordered_map<std::string, NrtTensor &> &tensor_map);
std::string dtype_to_string(nrt_dtype_t dtype);
NonBlockResult convert_internal_result(NonBlockInternalResult &internal_res_);
};

} // namespace spike
Expand Down
Loading
Loading